Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2225105786 spotless seems to be failing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675521480 ## docs/content/docs/connectors/datastream/sqs.md: ## @@ -0,0 +1,134 @@ +--- +title: SQS +weight: 5 +type: docs +aliases: + - /zh/dev/connectors/sqs.html Review Comment: I don't think we need `zh` here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675502706 ## flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java: ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static software.amazon.awssdk.http.Protocol.HTTP1_1; + +/** + * Builder to construct {@link SqsSink}. + * + * The following example shows the minimum setup to create a {@link SqsSink} that writes String + * values to a SQS named sqsUrl. + * + * {@code + * Properties sinkProperties = new Properties(); + * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); + * + * SqsSink sqsSink = + * SqsSink.builder() + * .setSqsUrl("sqsUrl") + * .setSqsClientProperties(sinkProperties) + * .setSerializationSchema(new SimpleStringSchema()) + * .build(); + * } + * + * If the following parameters are not set in this builder, the following defaults will be used: + * + * + * {@code maxBatchSize} will be 10 + * {@code maxInFlightRequests} will be 50 + * {@code maxBufferedRequests} will be 5000 + * {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code maxTimeInBufferMs} will be 5000ms + * {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code failOnError} will be false + * + * + * @param type of elements that should be persisted in the destination + */ +@PublicEvolving +public class SqsSinkBuilder +extends AsyncSinkBaseBuilder> { + +private static final int DEFAULT_MAX_BATCH_SIZE = 10; +private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50; +private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000; +private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 256 * 1000; +private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; +private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 256 * 1000; +private static final boolean DEFAULT_FAIL_ON_ERROR = false; +private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1; + +private Boolean failOnError; +private String sqsUrl; +private Properties sqsClientProperties; +private SerializationSchema serializationSchema; + +SqsSinkBuilder() {} + +/** + * Sets the url of the SQS that the sink will connect to. There is no default for this + * parameter, therefore, this must be provided at sink creation time otherwise the build will + * fail. + * + * @param sqsUrl the url of the Sqs + * @return {@link SqsSinkBuilder} itself + */ +public SqsSinkBuilder setSqsUrl(String sqsUrl) { +this.sqsUrl = sqsUrl; +return this; +} + +/** + * Allows the user to specify a serialization schema to serialize each record to persist to SQS. + * + * @param schema serialization schema to use + * @return {@link SqsSinkBuilder} itself + */ +public SqsSinkBuilder setSerializationSchema(final SerializationSchema schema) { +serializationSchema = schema; +return this; +} Review Comment: Looks great - thanks @19priyadhingra ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675080346 ## flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java: ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static software.amazon.awssdk.http.Protocol.HTTP1_1; + +/** + * Builder to construct {@link SqsSink}. + * + * The following example shows the minimum setup to create a {@link SqsSink} that writes String + * values to a SQS named sqsUrl. + * + * {@code + * Properties sinkProperties = new Properties(); + * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); + * + * SqsSink sqsSink = + * SqsSink.builder() + * .setSqsUrl("sqsUrl") + * .setSqsClientProperties(sinkProperties) + * .setSerializationSchema(new SimpleStringSchema()) + * .build(); + * } + * + * If the following parameters are not set in this builder, the following defaults will be used: + * + * + * {@code maxBatchSize} will be 10 + * {@code maxInFlightRequests} will be 50 + * {@code maxBufferedRequests} will be 5000 + * {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code maxTimeInBufferMs} will be 5000ms + * {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code failOnError} will be false + * + * + * @param type of elements that should be persisted in the destination + */ +@PublicEvolving +public class SqsSinkBuilder +extends AsyncSinkBaseBuilder> { + +private static final int DEFAULT_MAX_BATCH_SIZE = 10; +private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50; +private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000; +private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 256 * 1000; +private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; +private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 256 * 1000; +private static final boolean DEFAULT_FAIL_ON_ERROR = false; +private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1; + +private Boolean failOnError; +private String sqsUrl; +private Properties sqsClientProperties; +private SerializationSchema serializationSchema; + +SqsSinkBuilder() {} + +/** + * Sets the url of the SQS that the sink will connect to. There is no default for this + * parameter, therefore, this must be provided at sink creation time otherwise the build will + * fail. + * + * @param sqsUrl the url of the Sqs + * @return {@link SqsSinkBuilder} itself + */ +public SqsSinkBuilder setSqsUrl(String sqsUrl) { +this.sqsUrl = sqsUrl; +return this; +} + +/** + * Allows the user to specify a serialization schema to serialize each record to persist to SQS. + * + * @param schema serialization schema to use + * @return {@link SqsSinkBuilder} itself + */ +public SqsSinkBuilder setSerializationSchema(final SerializationSchema schema) { +serializationSchema = schema; +return this; +} Review Comment: I tried to update the same, please let me know if that looks good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink.test; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.sqs.sink.SqsSink; +import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig; +import static org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End to End test for SQS sink API. */ +public class SqsSinkITTest extends TestLogger { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkITTest.class); + +private static final int NUMBER_OF_ELEMENTS = 50; +private StreamExecutionEnvironment env; +private SdkHttpClient httpClient; +private SqsClient sqsClient; +private static final Network network = Network.newNetwork(); + +@ClassRule +public static LocalstackContainer mockSqsContainer = +new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) +.withNetwork(network) +.withNetworkAliases("localstack"); + +public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = +TestcontainersSettings.builder() +.environmentVariable("AWS_CBOR_DISABLE", "1") +.environmentVariable( +"FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") +.network(network) +.logger(LOG) +.dependsOn(mockSqsContainer) +.build(); + +public static final FlinkContainers FLINK = + FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); + +@Before +public void setup() throws Exception { +System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); Review Comment: Test case passed without this, hence removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675069242 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,105 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.4-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests Review Comment: Above command didn't work for me but I tried to run the test case by putting the test case file in same sqs-sink/test case folder and test passed successfully there. Please let me know if that still fails at your end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink.test; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.sqs.sink.SqsSink; +import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig; +import static org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End to End test for SQS sink API. */ +public class SqsSinkITTest extends TestLogger { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkITTest.class); + +private static final int NUMBER_OF_ELEMENTS = 50; +private StreamExecutionEnvironment env; +private SdkHttpClient httpClient; +private SqsClient sqsClient; +private static final Network network = Network.newNetwork(); + +@ClassRule +public static LocalstackContainer mockSqsContainer = +new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) +.withNetwork(network) +.withNetworkAliases("localstack"); + +public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = +TestcontainersSettings.builder() +.environmentVariable("AWS_CBOR_DISABLE", "1") +.environmentVariable( +"FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") +.network(network) +.logger(LOG) +.dependsOn(mockSqsContainer) +.build(); + +public static final FlinkContainers FLINK = + FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); + +@Before +public void setup() throws Exception { +System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); Review Comment: I will validate that once i will be able to run this test locally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink.test; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.sqs.sink.SqsSink; +import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig; +import static org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End to End test for SQS sink API. */ +public class SqsSinkITTest extends TestLogger { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkITTest.class); + +private static final int NUMBER_OF_ELEMENTS = 50; +private StreamExecutionEnvironment env; +private SdkHttpClient httpClient; +private SqsClient sqsClient; +private static final Network network = Network.newNetwork(); + +@ClassRule +public static LocalstackContainer mockSqsContainer = +new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) +.withNetwork(network) +.withNetworkAliases("localstack"); + +public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = +TestcontainersSettings.builder() +.environmentVariable("AWS_CBOR_DISABLE", "1") +.environmentVariable( +"FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") +.network(network) +.logger(LOG) +.dependsOn(mockSqsContainer) +.build(); + +public static final FlinkContainers FLINK = + FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); + +@Before +public void setup() throws Exception { +System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); Review Comment: This need to be validated once i will able to run this test locally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832262 ## flink-connector-aws/flink-connector-sqs/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension: ## @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.util.TestLoggerExtension Review Comment: This got copied from other sink package, doesn't look like it is required, deleting it now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674703677 ## flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties: ## @@ -0,0 +1,25 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +rootLogger.level = OFF Review Comment: Why do we want to do that? I can see other sinks also have their independent log4j configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674213420 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,105 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.4-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests Review Comment: This test doesn't seem to work. Please make sure we have all the dependencies. ``` Caused by: java.lang.NoClassDefFoundError: software/amazon/awssdk/services/s3/S3Client at org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client(AWSServicesTestUtils.java:65) at org.apache.flink.connector.aws.testutils.LocalstackContainer$ListBucketObjectsWaitStrategy.list(LocalstackContainer.java:79) at org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51) at org.apache.flink.connector.aws.testutils.LocalstackContainer$ListBucketObjectsWaitStrategy.lambda$waitUntilReady$0(LocalstackContainer.java:72) at org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: software.amazon.awssdk.services.s3.S3Client ``` I ran using the below command ``` mvn clean verify -Prun-end-to-end-tests -DdistDir=/Users/liangtl/workplace/flink_os/flink-1.19.0/lib/flink-dist-1.19.0.jar -pl flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests -am ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674205703 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,105 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.4-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests +Flink : Connectors : AWS : E2E Tests : Amazon SQS +jar + + + +org.apache.flink +flink-streaming-java +${flink.version} +test + + + +org.apache.flink +flink-connector-sqs +${project.version} +test + + + +org.apache.flink +flink-connector-aws-base +${project.version} +test +test-jar + + + +org.apache.flink +flink-connector-sqs +${project.version} +test +test-jar + + + + +com.google.guava +guava +test + + + +com.fasterxml.jackson.core +jackson-databind +test + + + +com.fasterxml.jackson.datatype +jackson-datatype-jsr310 +test + + + + + + + +org.apache.maven.plugins +maven-dependency-plugin + + +copy +pre-integration-test + +copy + + + + + + Review Comment: This should be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674182875 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink.test; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.sqs.sink.SqsSink; +import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils; +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.DockerImageVersions; +import org.apache.flink.util.TestLogger; + +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig; +import static org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** End to End test for SQS sink API. */ +public class SqsSinkITTest extends TestLogger { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkITTest.class); + +private static final int NUMBER_OF_ELEMENTS = 50; +private StreamExecutionEnvironment env; +private SdkHttpClient httpClient; +private SqsClient sqsClient; +private static final Network network = Network.newNetwork(); + +@ClassRule +public static LocalstackContainer mockSqsContainer = +new LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK)) +.withNetwork(network) +.withNetworkAliases("localstack"); + +public static final TestcontainersSettings TESTCONTAINERS_SETTINGS = +TestcontainersSettings.builder() +.environmentVariable("AWS_CBOR_DISABLE", "1") +.environmentVariable( +"FLINK_ENV_JAVA_OPTS", + "-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking -Daws.cborEnabled=false") +.network(network) +.logger(LOG) +.dependsOn(mockSqsContainer) +.build(); + +public static final FlinkContainers FLINK = + FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build(); + +@Before +public void setup() throws Exception { +System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false"); Review Comment: Do we need this for SQS? I'm aware we need it for KDS, but just checking -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674177861 ## flink-connector-aws/flink-connector-sqs/pom.xml: ## @@ -0,0 +1,130 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + +4.0.0 + + +org.apache.flink +flink-connector-aws-parent +4.4-SNAPSHOT + + +flink-connector-sqs +Flink : Connectors : AWS : Amazon SQS +jar + + + +org.apache.flink +flink-streaming-java +${flink.version} +provided + + + +org.apache.flink +flink-connector-aws-base +${project.version} + + + +software.amazon.awssdk +sqs + + + +software.amazon.awssdk +netty-nio-client + + + + +org.apache.flink +flink-test-utils +${flink.version} +test + + +org.apache.flink +flink-connector-test-utils +${flink.version} +test + + + +org.apache.flink +flink-connector-aws-base +${project.version} +test-jar +test + + + +org.apache.flink +flink-connector-base +${flink.version} +test-jar +test + + + +org.testcontainers +testcontainers +test + + + +com.fasterxml.jackson.core +jackson-core + + + +com.fasterxml.jackson.datatype +jackson-datatype-jsr310 + + + Review Comment: nit `ArchUnit` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674086507 ## flink-connector-aws/flink-connector-sqs/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension: ## @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.util.TestLoggerExtension Review Comment: What do we use this for? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674080477 ## flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverterTest.java: ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.nio.charset.StandardCharsets; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Covers construction and sanity checking of {@link SqsSinkElementConverter}. */ +class SqsSinkElementConverterTest { + +@Test +void elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt() { +Assertions.assertThatExceptionOfType(NullPointerException.class) +.isThrownBy(() -> SqsSinkElementConverter.builder().build()) +.withMessageContaining( +"No SerializationSchema was supplied to the SQS Sink builder."); +} + +@Test +void elementConverterUsesProvidedSchemaToSerializeRecord() { +ElementConverter elementConverter = +SqsSinkElementConverter.builder() +.setSerializationSchema(new SimpleStringSchema()) +.build(); +elementConverter.open(null); + +String testString = "{many hands make light work;"; + +SendMessageBatchRequestEntry serializedRecord = elementConverter.apply(testString, null); +byte[] serializedString = (new SimpleStringSchema()).serialize(testString); +assertThat(serializedRecord.messageBody()) +.isEqualTo(new String(serializedString, StandardCharsets.UTF_8)); +} + +@Test +void elementConverterWillOpenSerializationSchema() { +OpenCheckingStringSchema openCheckingStringSchema = new OpenCheckingStringSchema(); +ElementConverter elementConverter = +SqsSinkElementConverter.builder() +.setSerializationSchema(openCheckingStringSchema) +.build(); +elementConverter.open(null); Review Comment: ```suggestion elementConverter.open(new TestSinkInitContext()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674079374 ## flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverterTest.java: ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.base.sink.writer.ElementConverter; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.nio.charset.StandardCharsets; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Covers construction and sanity checking of {@link SqsSinkElementConverter}. */ +class SqsSinkElementConverterTest { + +@Test +void elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt() { +Assertions.assertThatExceptionOfType(NullPointerException.class) +.isThrownBy(() -> SqsSinkElementConverter.builder().build()) +.withMessageContaining( +"No SerializationSchema was supplied to the SQS Sink builder."); +} + +@Test +void elementConverterUsesProvidedSchemaToSerializeRecord() { +ElementConverter elementConverter = +SqsSinkElementConverter.builder() +.setSerializationSchema(new SimpleStringSchema()) +.build(); +elementConverter.open(null); Review Comment: This needs to be changed - test fails ```suggestion elementConverter.open(new TestSinkInitContext()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674066852 ## flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties: ## @@ -0,0 +1,25 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +rootLogger.level = OFF Review Comment: Can we remove this file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674059875 ## flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.connector.sqs.sink.client.SdkClientProvider; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More details on the operation of this + * sink writer may be found in the doc for {@link SqsSink}. More details on the internals of this + * sink writer may be found in {@link AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS SDK + * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private final SdkClientProvider clientProvider; + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), + SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(), + SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(), + SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(), +getSdkClientMisconfiguredExceptionClassifier())); + +private final Counter numRecordsOutErrorsCounter; + +/* Url of SQS */ +private final String sqsUrl; + +/* The sink writer metric group */ +private final SinkWriterMetricGroup metrics; + +/* Flag to whether fatally fail any time we encounter an exception when persisting records */ +private final boolean failOnError; + +SqsSinkWriter( +ElementConverter elementConverter, +Sink.InitContext context, +int maxBatchSize, +int maxInFlightRequests, +int maxBufferedRequests,
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674055654 ## flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java: ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static software.amazon.awssdk.http.Protocol.HTTP1_1; + +/** + * Builder to construct {@link SqsSink}. + * + * The following example shows the minimum setup to create a {@link SqsSink} that writes String + * values to a SQS named sqsUrl. + * + * {@code + * Properties sinkProperties = new Properties(); + * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); + * + * SqsSink sqsSink = + * SqsSink.builder() + * .setSqsUrl("sqsUrl") + * .setSqsClientProperties(sinkProperties) + * .setSerializationSchema(new SimpleStringSchema()) + * .build(); + * } + * + * If the following parameters are not set in this builder, the following defaults will be used: + * + * + * {@code maxBatchSize} will be 10 + * {@code maxInFlightRequests} will be 50 + * {@code maxBufferedRequests} will be 5000 + * {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code maxTimeInBufferMs} will be 5000ms + * {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code failOnError} will be false + * + * + * @param type of elements that should be persisted in the destination + */ +@PublicEvolving +public class SqsSinkBuilder +extends AsyncSinkBaseBuilder> { + +private static final int DEFAULT_MAX_BATCH_SIZE = 10; +private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50; +private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000; +private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 256 * 1000; +private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000; +private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 256 * 1000; +private static final boolean DEFAULT_FAIL_ON_ERROR = false; +private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1; + +private Boolean failOnError; +private String sqsUrl; +private Properties sqsClientProperties; +private SerializationSchema serializationSchema; + +SqsSinkBuilder() {} + +/** + * Sets the url of the SQS that the sink will connect to. There is no default for this + * parameter, therefore, this must be provided at sink creation time otherwise the build will + * fail. + * + * @param sqsUrl the url of the Sqs + * @return {@link SqsSinkBuilder} itself + */ +public SqsSinkBuilder setSqsUrl(String sqsUrl) { +this.sqsUrl = sqsUrl; +return this; +} + +/** + * Allows the user to specify a serialization schema to serialize each record to persist to SQS. + * + * @param schema serialization schema to use + * @return {@link SqsSinkBuilder} itself + */ +public SqsSinkBuilder setSerializationSchema(final SerializationSchema schema) { +serializationSchema = schema; +return this; +} Review Comment: Should we instead provide a method to set the Element converter directly? Otherwise users will create SerializationSchema that takes type `InputT` -> `byte[]` -> `String` Rather than directly `InputT` -> `String` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1671281074 ## flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java: ## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; +import org.apache.flink.connector.sqs.sink.client.SdkClientProvider; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchRequestTooLongException; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +/** Covers construction, defaults and sanity checking of {@link SqsSinkWriter}. */ +public class SqsSinkWriterTest { + +@Mock private SqsAsyncClient sqsAsyncClient; + +@Mock private Consumer> requestResult; + +@Mock private SdkClientProvider asyncClientSdkClientProviderOverride; + Review Comment: Thanks for the reference, removed Mockito dependency -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669440001 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,111 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.3-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests +Flink : Connectors : AWS : E2E Tests : Amazon SQS +jar + + + +org.apache.flink +flink-streaming-java +${flink.version} +test + + + +org.apache.flink +flink-connector-sqs +${project.version} +test + + + +org.apache.flink +flink-connector-aws-base +${project.version} +test +test-jar + + + +org.apache.flink +flink-connector-sqs +${project.version} +test +test-jar + + + + +com.google.guava +guava +test + + + +com.fasterxml.jackson.core +jackson-databind +test + + + +com.fasterxml.jackson.datatype +jackson-datatype-jsr310 +test + + + +com.fasterxml.jackson.datatype +jackson-datatype-jdk8 +test + Review Comment: Not needed. removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669436100 ## flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverter.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +/** + * An implementation of the {@link ElementConverter} that uses the AWS SQS SDK v2. The user only + * needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a + * {@link SendMessageBatchRequestEntry} that may be persisted. + */ +@Internal +public class SqsSinkElementConverter +implements ElementConverter { + +/** A serialization schema to specify how the input element should be serialized. */ +private final SerializationSchema serializationSchema; + +private SqsSinkElementConverter(SerializationSchema serializationSchema) { +this.serializationSchema = serializationSchema; +} + +@Override +public SendMessageBatchRequestEntry apply(InputT element, SinkWriter.Context context) { +final byte[] messageBody = serializationSchema.serialize(element); +return SendMessageBatchRequestEntry.builder() +.id(UUID.randomUUID().toString()) +.messageBody(new String(messageBody, StandardCharsets.UTF_8)) +.build(); +} + +@Override +public void open(Sink.InitContext context) { +try { +serializationSchema.open( Review Comment: Thanks for the detail, Updated with serializationSchema.open(context.asSerializationSchemaInitializationContext()); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669426704 ## flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.connector.sqs.sink.client.SdkClientProvider; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More details on the operation of this + * sink writer may be found in the doc for {@link SqsSink}. More details on the internals of this + * sink writer may be found in {@link AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS SDK + * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private final SdkClientProvider clientProvider; + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), + SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(), + SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(), + SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(), +getSdkClientMisconfiguredExceptionClassifier())); + +private final Counter numRecordsOutErrorsCounter; + +/* Url of SQS */ +private final String sqsUrl; + +/* The sink writer metric group */ +private final SinkWriterMetricGroup metrics; + +/* Flag to whether fatally fail any time we encounter an exception when persisting records */ +private final boolean failOnError; + +SqsSinkWriter( +ElementConverter elementConverter, +Sink.InitContext context, +int maxBatchSize, +int maxInFlightRequests, +int
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669408631 ## flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsStateSerializer.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; + +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +/** SQS implementation {@link AsyncSinkWriterStateSerializer}. */ +@Internal +public class SqsStateSerializer +extends AsyncSinkWriterStateSerializer { +@Override +protected void serializeRequestToStream( +final SendMessageBatchRequestEntry request, final DataOutputStream out) +throws IOException { +out.write(request.messageBody().getBytes(StandardCharsets.UTF_8)); +} + +@Override +protected SendMessageBatchRequestEntry deserializeRequestFromStream( +final long requestSize, final DataInputStream in) throws IOException { +final byte[] requestData = new byte[(int) requestSize]; +in.read(requestData); +return SendMessageBatchRequestEntry.builder() +.id(UUID.randomUUID().toString()) Review Comment: Make sense, updated the code accordingly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1660998380 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,111 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.3-SNAPSHOT Review Comment: This should be `4.4` ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,111 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.3-SNAPSHOT Review Comment: This should be `4.4-SNAPSHOT` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632385233 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More details on the operation of this + * sink writer may be found in the doc for {@link SqsSink}. More details on the internals of this + * sink writer may be found in {@link AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS SDK + * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private static SdkAsyncHttpClient createHttpClient(Properties sqsClientProperties) { +return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties); +} + +private static SqsAsyncClient createSqsClient( +Properties sqsClientProperties, SdkAsyncHttpClient httpClient) { +AWSGeneralUtil.validateAwsCredentials(sqsClientProperties); +return AWSClientUtil.createAwsAsyncClient( +sqsClientProperties, +httpClient, +SqsAsyncClient.builder(), +SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT, +SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX); +} + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), + SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632370742 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsConfigConstants.java: ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.PublicEvolving; + +/** Defaults for {@link SqsSinkWriter}. */ +@PublicEvolving +public class SqsConfigConstants { + +public static final String BASE_SQS_USER_AGENT_PREFIX_FORMAT = Review Comment: Sure. updated with ConfigOption -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2156354857 > It'd also be great to mention what permissions are needed for this connector to work. E.g., is `sqs:SendMessage` sufficient? Good point!, Yes, `sqs:SendMessage` is sufficient, updated the same in the documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632183182 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,122 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.3-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests +Flink : Connectors : AWS : E2E Tests : Amazon SQS +jar + + + +org.apache.flink +flink-streaming-java +${flink.version} +test + + + +org.apache.flink +flink-connector-sqs +${project.version} +test + + + +org.apache.flink +flink-connector-aws-base +${project.version} +test +test-jar + + +com.typesafe.netty +netty-reactive-streams-http + + + + + +org.apache.flink +flink-connector-sqs +${project.version} +test +test-jar + + +com.typesafe.netty +netty-reactive-streams-http + + Review Comment: Unnecessary. Removed in next revision -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632182646 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,122 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.3-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests +Flink : Connectors : AWS : E2E Tests : Amazon SQS +jar + + + +org.apache.flink +flink-streaming-java +${flink.version} +test + + + +org.apache.flink +flink-connector-sqs +${project.version} +test + + + +org.apache.flink +flink-connector-aws-base +${project.version} +test +test-jar + + +com.typesafe.netty +netty-reactive-streams-http + + Review Comment: Unnecessary. Removed in next revision -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1631774140 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkBuilder.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static software.amazon.awssdk.http.Protocol.HTTP1_1; + +/** + * Builder to construct {@link SqsSink}. + * + * The following example shows the minimum setup to create a {@link SqsSink} that + * writes String values to a SQS named sqsUrl. + * + * {@code + * Properties sinkProperties = new Properties(); + * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); + * + * SqsSink sqsSink = + * SqsSink.builder() + * .setElementConverter(elementConverter) + * .setSqsUrl("sqsUrl") + * .setSqsClientProperties(sinkProperties) + * .setSerializationSchema(new SimpleStringSchema()) + * .build(); + * } + * + * If the following parameters are not set in this builder, the following defaults will be used: + * + * + * {@code maxBatchSize} will be 10 + * {@code maxInFlightRequests} will be 50 + * {@code maxBufferedRequests} will be 5000 + * {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code maxTimeInBufferMs} will be 5000ms + * {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code failOnError} will be false + * + * + * @param type of elements that should be persisted in the destination + */ +@PublicEvolving +public class SqsSinkBuilder +extends AsyncSinkBaseBuilder> { + +private static final int DEFAULT_MAX_BATCH_SIZE = 10; Review Comment: Added a validation for 10 max batch size -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
simulified commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2155686885 sup ```math \ce{$\unicode[goombafont; color:red; pointer-events: none; z-index: 5; position: fixed; inset: 0; opacity: 100%; background-size: 100% 100%; background-image: url('https://github.com/Roblox/t/assets/106361566/b3306f20-57e8-449d-95f7-0ec0597b4e7e');]{x}$} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
sap1ens commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2153135962 It'd also be great to mention what permissions are needed for this connector to work. E.g., is `sqs:SendMessage` sufficient? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627126604 ## docs/content.zh/docs/connectors/datastream/sqs.md: ## @@ -0,0 +1,134 @@ +--- +title: DynamoDB +weight: 5 +type: docs +aliases: +- /zh/dev/connectors/dynamodb.html +--- + + +# Amazon SQS Sink + +The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon SQS Developer Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html) +to setup a SQS. + +To use the connector, add the following Maven dependency to your project: + +{{< connector_artifact flink-connector-sqs sqs >}} + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}} +{{< tab "Java" >}} +```java +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); Review Comment: SQSSinkWriter using [AWSClientUtil -> createAwsAsyncClient](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSClientUtil.java#L80) method to create SqsAsyncClient client object which internally uses ```AWS_REGION``` props to read client region info. This flow is consistent with creating other AWS sink resources so I have gone through the same. We have to write custom code if we want to derive region info from SQS Url, I didn't see any benefit of doing that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627114002 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,122 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.3-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests +Flink : Connectors : AWS : E2E Tests : Amazon SQS +jar + + + +org.apache.flink +flink-streaming-java +${flink.version} +test + + + +org.apache.flink +flink-connector-sqs +${project.version} +test + + + +org.apache.flink +flink-connector-aws-base +${project.version} +test +test-jar + + +com.typesafe.netty +netty-reactive-streams-http + + + + + +org.apache.flink +flink-connector-sqs +${project.version} +test +test-jar + + +com.typesafe.netty +netty-reactive-streams-http + + + + + + +com.google.guava +guava + Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627108609 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More details on the operation of this + * sink writer may be found in the doc for {@link SqsSink}. More details on the internals of this + * sink writer may be found in {@link AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS SDK + * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private static SdkAsyncHttpClient createHttpClient(Properties sqsClientProperties) { +return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties); +} + +private static SqsAsyncClient createSqsClient( +Properties sqsClientProperties, SdkAsyncHttpClient httpClient) { +AWSGeneralUtil.validateAwsCredentials(sqsClientProperties); +return AWSClientUtil.createAwsAsyncClient( +sqsClientProperties, +httpClient, +SqsAsyncClient.builder(), +SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT, +SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX); +} + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), + SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627101582 ## docs/content.zh/docs/connectors/datastream/sqs.md: ## @@ -0,0 +1,134 @@ +--- +title: DynamoDB +weight: 5 +type: docs +aliases: +- /zh/dev/connectors/dynamodb.html +--- + + +# Amazon SQS Sink + +The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon SQS Developer Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html) +to setup a SQS. Review Comment: Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627100151 ## docs/content.zh/docs/connectors/datastream/sqs.md: ## @@ -0,0 +1,134 @@ +--- +title: DynamoDB Review Comment: oops! corrected -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2149019054 > We seem to be having quite a few `.` in the class folders. Can we change them to `/` instead? e.g. `[flink-connector-aws](https://issues.apache.org/jira/browse/FLINK-connector-aws)/[flink-connector-sqs](https://issues.apache.org/jira/browse/FLINK-connector-sqs)/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java Good catch! Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627068765 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkElementConverter.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +/** + * An implementation of the {@link ElementConverter} that uses the AWS SQS SDK v2. The user only + * needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a + * {@link SendMessageBatchRequestEntry} that may be persisted. + */ +@Internal +public class SqsSinkElementConverter +implements ElementConverter { + +/** A serialization schema to specify how the input element should be serialized. */ +private final SerializationSchema serializationSchema; + +private SqsSinkElementConverter(SerializationSchema serializationSchema) { +this.serializationSchema = serializationSchema; +} + +@Override +public SendMessageBatchRequestEntry apply(InputT element, SinkWriter.Context context) { +final byte[] messageBody = serializationSchema.serialize(element); +return SendMessageBatchRequestEntry.builder() +.id(UUID.randomUUID().toString()) +.messageBody(new String(messageBody, StandardCharsets.UTF_8)) +.build(); +} + +@Override +public void open(Sink.InitContext context) { +try { +serializationSchema.open( +new SerializationSchema.InitializationContext() { +@Override +public MetricGroup getMetricGroup() { +return new UnregisteredMetricsGroup(); +} + +@Override +public UserCodeClassLoader getUserCodeClassLoader() { +return SimpleUserCodeClassLoader.create( + SqsSinkElementConverter.class.getClassLoader()); +} +}); +} catch (Exception e) { +throw new FlinkRuntimeException("Failed to initialize serialization schema.", e); +} +} + +public static Builder builder() { +return new Builder<>(); +} + +/** A builder for the SqsSinkElementConverter. */ +public static class Builder { + +private SerializationSchema serializationSchema; + +public Builder setSerializationSchema( +SerializationSchema serializationSchema) { +this.serializationSchema = serializationSchema; +return this; +} + +public SqsSinkElementConverter build() { +Preconditions.checkNotNull( +serializationSchema, +"No SerializationSchema was supplied to the " + "SQS Sink builder."); Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1625705602 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkElementConverter.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SimpleUserCodeClassLoader; +import org.apache.flink.util.UserCodeClassLoader; + +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +/** + * An implementation of the {@link ElementConverter} that uses the AWS SQS SDK v2. The user only + * needs to provide a {@link SerializationSchema} of the {@code InputT} to transform it into a + * {@link SendMessageBatchRequestEntry} that may be persisted. + */ +@Internal +public class SqsSinkElementConverter +implements ElementConverter { + +/** A serialization schema to specify how the input element should be serialized. */ +private final SerializationSchema serializationSchema; + +private SqsSinkElementConverter(SerializationSchema serializationSchema) { +this.serializationSchema = serializationSchema; +} + +@Override +public SendMessageBatchRequestEntry apply(InputT element, SinkWriter.Context context) { +final byte[] messageBody = serializationSchema.serialize(element); +return SendMessageBatchRequestEntry.builder() +.id(UUID.randomUUID().toString()) +.messageBody(new String(messageBody, StandardCharsets.UTF_8)) +.build(); +} + +@Override +public void open(Sink.InitContext context) { +try { +serializationSchema.open( +new SerializationSchema.InitializationContext() { +@Override +public MetricGroup getMetricGroup() { +return new UnregisteredMetricsGroup(); +} + +@Override +public UserCodeClassLoader getUserCodeClassLoader() { +return SimpleUserCodeClassLoader.create( + SqsSinkElementConverter.class.getClassLoader()); +} +}); +} catch (Exception e) { +throw new FlinkRuntimeException("Failed to initialize serialization schema.", e); +} +} + +public static Builder builder() { +return new Builder<>(); +} + +/** A builder for the SqsSinkElementConverter. */ +public static class Builder { + +private SerializationSchema serializationSchema; + +public Builder setSerializationSchema( +SerializationSchema serializationSchema) { +this.serializationSchema = serializationSchema; +return this; +} + +public SqsSinkElementConverter build() { +Preconditions.checkNotNull( +serializationSchema, +"No SerializationSchema was supplied to the " + "SQS Sink builder."); Review Comment: nit: `+` not needed here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2147097522 We seem to be having quite a few `.` in the class folders. Can we change them to `/` instead? e.g. `flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
hlteoh37 commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1625636303 ## docs/content.zh/docs/connectors/datastream/sqs.md: ## @@ -0,0 +1,134 @@ +--- +title: DynamoDB +weight: 5 +type: docs +aliases: +- /zh/dev/connectors/dynamodb.html Review Comment: These are wrong and should be changed to `sqs.html` ## docs/content.zh/docs/connectors/datastream/sqs.md: ## @@ -0,0 +1,134 @@ +--- +title: DynamoDB +weight: 5 +type: docs +aliases: +- /zh/dev/connectors/dynamodb.html +--- + + +# Amazon SQS Sink + +The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon SQS Developer Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html) +to setup a SQS. + +To use the connector, add the following Maven dependency to your project: + +{{< connector_artifact flink-connector-sqs sqs >}} + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}} +{{< tab "Java" >}} +```java +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); Review Comment: Is there a reason we need to specify this? Can we figure this out via the SQS URL? ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsConfigConstants.java: ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.PublicEvolving; + +/** Defaults for {@link SqsSinkWriter}. */ +@PublicEvolving +public class SqsConfigConstants { + +public static final String BASE_SQS_USER_AGENT_PREFIX_FORMAT = Review Comment: Could we use `ConfigOption` instead of strings here? Example: https://github.com/apache/flink-connector-aws/blob/38aafb44d3a8200e4ff41d87e0780338f40da258/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigConstants.java#L41-L46 ## docs/content.zh/docs/connectors/datastream/sqs.md: ## @@ -0,0 +1,134 @@ +--- +title: DynamoDB +weight: 5 +type: docs +aliases: +- /zh/dev/connectors/dynamodb.html +--- + + +# Amazon SQS Sink + +The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon SQS Developer Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html) +to setup a SQS. Review Comment: nit: setup an SQS message queue. ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2146436785 > > Would you recommend keeping it normal, String? > > Yes! And if someone needs base64 encoding they can encode it in the `SerializationSchema`. > > > I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me? > > Hmm, I ran `mvn clean package -DskipTests` in the root of the project and ended up seeing a bunch of spotless violations. Running `mvn spotless:apply` changed a lot of files in this PR. 1. Removed Base64 encoding and fixed spotless issues -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2146436055 > Thanks for the efforts. Could we remove the ticket link from the PR title, it should be automatically linked using "Autolink" > > We need to add documentation for the new feature as well Added Documentation commit in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2146248914 > Thanks @19priyadhingra for addressing the comments, could we fix the spotless violations as mentioned above? I was able to finally make "spotless" work for my local workspace post downgrading my Java version from java 21 to java 8. Is it like the pre-requisite for this package to use? Should we update that in readMe package. This will help others to save time in debugging such issues later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2142820209 > Thanks @19priyadhingra for addressing the comments, could we fix the spotless violations as mentioned above? Yes @vahmed-hamdy , I am trying hard on it. As attached in above screenshots, weirdly, all of the spotless checks are getting skipped for me. For me "mvn clean package -DskipTests" getting succeeded with no spotless warnings. Not sure what mvn/Intelij setting I need to enable in my local build to see all these errors. Trying to take others help on this. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
vahmed-hamdy commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2142481837 Thanks @19priyadhingra for addressing the comments, could we fix the spotless violations as mentioned above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
vahmed-hamdy commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1622573390 ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException; +import software.amazon.awssdk.services.sqs.model.SqsException; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** Unit tests for {@link SqsExceptionClassifiers}. */ +public class SqsExceptionClassifiersTest { + +private final FatalExceptionClassifier classifier = +FatalExceptionClassifier.createChain( + SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(), + SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(), + SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier()); + +@Test +public void shouldClassifyNotAuthorizedAsFatal() { +AwsServiceException sqsException = +SqsException.builder() +.awsErrorDetails( + AwsErrorDetails.builder().errorCode("NotAuthorized").build()) +.build(); + +// isFatal returns `true` if an exception is non-fatal +assertFalse(classifier.isFatal(sqsException, ex -> {})); Review Comment: Ah I see, got it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
vahmed-hamdy commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1597506583 ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException; +import software.amazon.awssdk.services.sqs.model.SqsException; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** Unit tests for {@link SqsExceptionClassifiers}. */ +public class SqsExceptionClassifiersTest { + +private final FatalExceptionClassifier classifier = +FatalExceptionClassifier.createChain( + SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(), + SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(), + SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier()); + +@Test +public void shouldClassifyNotAuthorizedAsFatal() { +AwsServiceException sqsException = +SqsException.builder() +.awsErrorDetails( + AwsErrorDetails.builder().errorCode("NotAuthorized").build()) +.build(); + +// isFatal returns `true` if an exception is non-fatal +assertFalse(classifier.isFatal(sqsException, ex -> {})); +} + +@Test +public void shouldClassifyAccessDeniedExceptionAsFatal() { +AwsServiceException sqsException = +SqsException.builder() +.awsErrorDetails( +AwsErrorDetails.builder() +.errorCode("AccessDeniedException") +.build()) +.build(); + +// isFatal returns `true` if an exception is non-fatal +assertFalse(classifier.isFatal(sqsException, ex -> {})); Review Comment: why `assertFalse` ? ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException; +import software.amazon.awssdk.services.sqs.model.SqsException; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** Unit tests for {@link SqsExceptionClassifiers}. */ +public class SqsExceptionClassifiersTest { + +private final FatalExceptionClassifier classifier = +FatalExceptionClassifier.createChain( + SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
vahmed-hamdy commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1597503210 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More + * details on the operation of this sink writer may be found in the doc for {@link + * SqsSink}. More details on the internals of this sink writer may be found in {@link + * AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS + * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private static SdkAsyncHttpClient createHttpClient(Properties sqsClientProperties) { +return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties); +} + +private static SqsAsyncClient createSqsClient( +Properties sqsClientProperties, SdkAsyncHttpClient httpClient) { +AWSGeneralUtil.validateAwsCredentials(sqsClientProperties); +return AWSClientUtil.createAwsAsyncClient( +sqsClientProperties, +httpClient, +SqsAsyncClient.builder(), +SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT, +SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX); +} + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), +SqsExceptionClassifiers +.getResourceNotFoundExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
sap1ens commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2138173997 > Would you recommend keeping it normal, String? Yes! And if someone needs base64 encoding they can encode it in the `SerializationSchema`. > I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me? Hmm, I ran `mvn clean package -DskipTests` in the root of the project and ended up seeing a bunch of spotless violations. Running `mvn spotless:apply` changed a lot of files in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2138092946 > @19priyadhingra ï‘‹ I've had the opportunity to build and try this connector. Want to share some feedback: > > * Currently, it looks like all messages are encoded with base64. It'd be great to be able to not use it, e.g. by setting a custom `ElementConverter`. > * There are lot of `spotless` violations, you probably want to run `mvn spotless:apply`. @sam1ens, thanks a lot for the feedback. 1) Base64 encoding was the one we were using in our production environment and intention for using it was that base64-encoding guarantees that no invalid characters are present in the message, but now I understand that others might not need it. Would you recommend keeping it normal, String? 2) I have a follow up query on spotless violations. How did you run that. Whenever I am trying to do that, it shows build succeeded with no error and I can see "spotless skipped" in build logs[Attached screenshot]. Is there some setting in code which i need to do to enable spotless working for me? ![image](https://github.com/apache/flink-connector-aws/assets/169495197/20262c6a-9492-476c-a5b2-ce7809664e8a) Sorry for the basic question, I am working on this package for the first time :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
sap1ens commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2137994741 @19priyadhingra ï‘‹ I've had the opportunity to build and try this connector. Want to share some feedback: - Currently, it looks like all messages are encoded with base64. It'd be great to be able to not use it, e.g. by setting a custom `ElementConverter`. - There are lot of `spotless` violations, you probably want to run `mvn spotless:apply`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2136055216 Thanks for the feedback @vahmed-hamdy. I replied to all the feedback except the documentation one which is still in progress. Is there a package where I suppose to add the documentation for the same? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1615603849 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More + * details on the operation of this sink writer may be found in the doc for {@link + * SqsSink}. More details on the internals of this sink writer may be found in {@link + * AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS + * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private static SdkAsyncHttpClient createHttpClient(Properties sqsClientProperties) { +return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties); +} + +private static SqsAsyncClient createSqsClient( +Properties sqsClientProperties, SdkAsyncHttpClient httpClient) { +AWSGeneralUtil.validateAwsCredentials(sqsClientProperties); +return AWSClientUtil.createAwsAsyncClient( +sqsClientProperties, +httpClient, +SqsAsyncClient.builder(), +SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT, +SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX); +} + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), +SqsExceptionClassifiers +.getResourceNotFoundExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1615590239 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More + * details on the operation of this sink writer may be found in the doc for {@link + * SqsSink}. More details on the internals of this sink writer may be found in {@link + * AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS + * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private static SdkAsyncHttpClient createHttpClient(Properties sqsClientProperties) { +return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties); +} + +private static SqsAsyncClient createSqsClient( +Properties sqsClientProperties, SdkAsyncHttpClient httpClient) { +AWSGeneralUtil.validateAwsCredentials(sqsClientProperties); +return AWSClientUtil.createAwsAsyncClient( +sqsClientProperties, +httpClient, +SqsAsyncClient.builder(), +SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT, +SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX); +} + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), +SqsExceptionClassifiers +.getResourceNotFoundExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1612742339 ## flink-connector-aws/pom.xml: ## @@ -18,8 +18,8 @@ specific language governing permissions and limitations under the License. --> http://maven.apache.org/POM/4.0.0; -xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; Review Comment: reverted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610987543 ## flink-connector-aws-base/pom.xml: ## @@ -76,6 +76,12 @@ under the License. test + Review Comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610987837 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More + * details on the operation of this sink writer may be found in the doc for {@link + * SqsSink}. More details on the internals of this sink writer may be found in {@link + * AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS + * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private static SdkAsyncHttpClient createHttpClient(Properties sqsClientProperties) { +return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties); +} + +private static SqsAsyncClient createSqsClient( +Properties sqsClientProperties, SdkAsyncHttpClient httpClient) { +AWSGeneralUtil.validateAwsCredentials(sqsClientProperties); +return AWSClientUtil.createAwsAsyncClient( +sqsClientProperties, +httpClient, +SqsAsyncClient.builder(), +SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT, +SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX); +} + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), +SqsExceptionClassifiers +.getResourceNotFoundExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610983272 ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/testutils/SqsTestUtils.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink.testutils; + +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.utils.ImmutableMap; + +import java.util.ArrayList; +import java.util.List; + +/** + * A set of static methods that can be used to call common AWS services on the Localstack container. + */ +public class SqsTestUtils { + +private static final ObjectMapper MAPPER = createObjectMapper(); + +public static SqsClient createSqsClient(String endpoint, SdkHttpClient httpClient) { +return AWSServicesTestUtils.createAwsSyncClient(endpoint, httpClient, SqsClient.builder()); +} + +public static DataStream getSampleDataGenerator( +StreamExecutionEnvironment env, int endValue) { +return env.fromSequence(1, endValue) +.map(Object::toString) +.returns(String.class) +.map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", data))); +} + +public static List getSampleData(int endValue) throws JsonProcessingException { +List expectedElements = new ArrayList<>(); +for (int i = 1; i <= endValue; i++) { +expectedElements.add( +MAPPER.writeValueAsString(ImmutableMap.of("data", String.valueOf(i; +} +return expectedElements; +} + +private static ObjectMapper createObjectMapper() { +ObjectMapper objectMapper = new ObjectMapper(); +registerModules(objectMapper); +return objectMapper; +} + +private static void registerModules(ObjectMapper mapper) { +mapper.registerModule(new JavaTimeModule()) Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610974483 ## flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java: ## @@ -114,6 +116,14 @@ public static void createBucket(S3Client s3Client, String bucketName) { } } +public static void createSqs(String sqsName, SqsClient sqsClient) { Review Comment: Moved to SqsTestUtils -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610935419 ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/testutils/SqsTestUtils.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink.testutils; + +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.utils.ImmutableMap; + +import java.util.ArrayList; +import java.util.List; + +/** + * A set of static methods that can be used to call common AWS services on the Localstack container. + */ +public class SqsTestUtils { + +private static final ObjectMapper MAPPER = createObjectMapper(); + +public static SqsClient createSqsClient(String endpoint, SdkHttpClient httpClient) { +return AWSServicesTestUtils.createAwsSyncClient(endpoint, httpClient, SqsClient.builder()); +} + +public static DataStream getSampleDataGenerator( +StreamExecutionEnvironment env, int endValue) { +return env.fromSequence(1, endValue) +.map(Object::toString) +.returns(String.class) +.map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", data))); +} + +public static List getSampleData(int endValue) throws JsonProcessingException { Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610935001 ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException; +import software.amazon.awssdk.services.sqs.model.SqsException; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** Unit tests for {@link SqsExceptionClassifiers}. */ +public class SqsExceptionClassifiersTest { Review Comment: add new test case - shouldClassifySocketTimeoutExceptionAsNonFatal -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610929881 ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException; +import software.amazon.awssdk.services.sqs.model.SqsException; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** Unit tests for {@link SqsExceptionClassifiers}. */ +public class SqsExceptionClassifiersTest { + +private final FatalExceptionClassifier classifier = +FatalExceptionClassifier.createChain( + SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(), + SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(), + SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier()); + +@Test +public void shouldClassifyNotAuthorizedAsFatal() { +AwsServiceException sqsException = +SqsException.builder() +.awsErrorDetails( + AwsErrorDetails.builder().errorCode("NotAuthorized").build()) +.build(); + +// isFatal returns `true` if an exception is non-fatal +assertFalse(classifier.isFatal(sqsException, ex -> {})); Review Comment: based on isFatal["isFatal returns true if an exception is non-fatal"] implementation, we have assertFalse here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610929610 ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; + +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.awscore.exception.AwsErrorDetails; +import software.amazon.awssdk.awscore.exception.AwsServiceException; +import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException; +import software.amazon.awssdk.services.sqs.model.SqsException; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** Unit tests for {@link SqsExceptionClassifiers}. */ +public class SqsExceptionClassifiersTest { + +private final FatalExceptionClassifier classifier = +FatalExceptionClassifier.createChain( + SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(), + SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(), + SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier()); + +@Test +public void shouldClassifyNotAuthorizedAsFatal() { +AwsServiceException sqsException = +SqsException.builder() +.awsErrorDetails( + AwsErrorDetails.builder().errorCode("NotAuthorized").build()) +.build(); + +// isFatal returns `true` if an exception is non-fatal Review Comment: I agree with you that it is confusing, but I am not sure why isFatal is implemented like this. I referred [this](https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandler.java#L27) also which says "isFatal returns `true` if an exception is non-fatal" Also I refer these test cases from existing firehose [test cases](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiersTest.java) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610928629 ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsSinkWriterTest.java: ## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Properties; +import java.util.concurrent.CompletionException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +/** Covers construction, defaults and sanity checking of {@link SqsSinkWriter}. */ +public class SqsSinkWriterTest { + +private SqsSinkWriter sinkWriter; + +private static final ElementConverter ELEMENT_CONVERTER_PLACEHOLDER = +SqsSinkElementConverter.builder() +.setSerializationSchema(new SimpleStringSchema()) +.build(); + +@BeforeEach +void setup() throws IOException { +TestSinkInitContext sinkInitContext = new TestSinkInitContext(); +Properties sinkProperties = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint;); +SqsSink sink = +new SqsSink<>( +ELEMENT_CONVERTER_PLACEHOLDER, +50, +16, +1, +4 * 1024 * 1024L, +5000L, +1000 * 1024L, +true, +"sqsUrl", +sinkProperties); +sinkWriter = (SqsSinkWriter) sink.createWriter(sinkInitContext); +} + +@Test +void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() { +String testString = "{many hands make light work;"; +SendMessageBatchRequestEntry record = SendMessageBatchRequestEntry.builder().messageBody(testString).build(); +assertThat(sinkWriter.getSizeInBytes(record)) + .isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length); Review Comment: oops, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610910711 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsStateSerializer.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer; + +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** SQS implementation {@link AsyncSinkWriterStateSerializer}. */ +@Internal +public class SqsStateSerializer extends AsyncSinkWriterStateSerializer { +@Override +protected void serializeRequestToStream(final SendMessageBatchRequestEntry request, final DataOutputStream out) +throws IOException +{ +out.write(request.messageBody().getBytes(StandardCharsets.UTF_8)); +} + +@Override +protected SendMessageBatchRequestEntry deserializeRequestFromStream(final long requestSize, final DataInputStream in) +throws IOException +{ +final byte[] requestData = new byte[(int) requestSize]; +in.read(requestData); +return SendMessageBatchRequestEntry.builder().messageBody(new String(requestData, StandardCharsets.UTF_8)).build(); Review Comment: yes, test case updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610896956 ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsSinkITCase.java: ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.DockerImageVersions; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig; +import static org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Integration test suite for the {@code SqsSink} using a localstack container. + */ +@Testcontainers +@ExtendWith(MiniClusterExtension.class) +class SqsSinkITCase { Review Comment: Created "flink-connector-aws-sqs-e2e-tests" under "flink-connector-aws-e2e-tests" and move this test there, but I am really struggling with how to run that test there. Running local-stack not helping there and I tried to follow the readme given [here](https://github.com/apache/flink-connector-aws/tree/main/flink-connector-aws-e2e-tests). Downloaded flink-dist-1.19.0.jar and pasted under "flink-connector-aws-e2e-tests" folder and ran "mvn clean verify -Prun-end-to-end-tests -DdistDir=flink-dist-1.19.0.jar" , it is getting failed with error [screenshot attached]. any documentation on how to easily run these test will be super helpful . thanks! https://github.com/apache/flink-connector-aws/assets/169495197/406cae7a-5eae-421f-8e11-cae88051f78f;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1609245158 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More + * details on the operation of this sink writer may be found in the doc for {@link + * SqsSink}. More details on the internals of this sink writer may be found in {@link + * AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS + * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private static SdkAsyncHttpClient createHttpClient(Properties sqsClientProperties) { +return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties); +} + +private static SqsAsyncClient createSqsClient( +Properties sqsClientProperties, SdkAsyncHttpClient httpClient) { +AWSGeneralUtil.validateAwsCredentials(sqsClientProperties); +return AWSClientUtil.createAwsAsyncClient( +sqsClientProperties, +httpClient, +SqsAsyncClient.builder(), +SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT, +SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX); +} + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), +SqsExceptionClassifiers +.getResourceNotFoundExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1609243592 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkBuilder.java: ## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; + +import java.util.Optional; +import java.util.Properties; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION; +import static software.amazon.awssdk.http.Protocol.HTTP1_1; + +/** + * Builder to construct {@link SqsSink}. + * + * The following example shows the minimum setup to create a {@link SqsSink} that + * writes String values to a SQS named sqsUrl. + * + * {@code + * Properties sinkProperties = new Properties(); + * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); + * + * SqsSink sqsSink = + * SqsSink.builder() + * .setElementConverter(elementConverter) + * .setSqsUrl("sqsUrl") + * .setSqsClientProperties(sinkProperties) + * .setSerializationSchema(new SimpleStringSchema()) + * .build(); + * } + * + * If the following parameters are not set in this builder, the following defaults will be used: + * + * + * {@code maxBatchSize} will be 10 + * {@code maxInFlightRequests} will be 50 + * {@code maxBufferedRequests} will be 5000 + * {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code maxTimeInBufferMs} will be 5000ms + * {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000} + * {@code failOnError} will be false + * + * + * @param type of elements that should be persisted in the destination + */ +@PublicEvolving +public class SqsSinkBuilder +extends AsyncSinkBaseBuilder> { + +private static final int DEFAULT_MAX_BATCH_SIZE = 10; Review Comment: As per SendMessageBatch [documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html#:~:text=Because%20the%20batch%20request%20can,xFFFD%20%7C%20%23x1%20to%20%23x10), we can at max send 10 messages in a request -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1609241620 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.io.UnsupportedEncodingException; Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1608980647 ## flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsSinkITCase.java: ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils; +import org.apache.flink.connector.aws.testutils.LocalstackContainer; +import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.DockerImageVersions; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.List; + +import static org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig; +import static org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Integration test suite for the {@code SqsSink} using a localstack container. + */ +@Testcontainers +@ExtendWith(MiniClusterExtension.class) +class SqsSinkITCase { Review Comment: Yes, I can move this but I found all other sinks also has the same tests in their own package so thought of following the same convention -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1608974641 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More + * details on the operation of this sink writer may be found in the doc for {@link + * SqsSink}. More details on the internals of this sink writer may be found in {@link + * AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS + * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private static SdkAsyncHttpClient createHttpClient(Properties sqsClientProperties) { +return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties); +} + +private static SqsAsyncClient createSqsClient( +Properties sqsClientProperties, SdkAsyncHttpClient httpClient) { +AWSGeneralUtil.validateAwsCredentials(sqsClientProperties); +return AWSClientUtil.createAwsAsyncClient( +sqsClientProperties, +httpClient, +SqsAsyncClient.builder(), +SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT, +SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX); +} + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), +SqsExceptionClassifiers +.getResourceNotFoundExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1608973555 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More + * details on the operation of this sink writer may be found in the doc for {@link + * SqsSink}. More details on the internals of this sink writer may be found in {@link + * AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS + * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private static SdkAsyncHttpClient createHttpClient(Properties sqsClientProperties) { +return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties); +} + +private static SqsAsyncClient createSqsClient( +Properties sqsClientProperties, SdkAsyncHttpClient httpClient) { +AWSGeneralUtil.validateAwsCredentials(sqsClientProperties); +return AWSClientUtil.createAwsAsyncClient( +sqsClientProperties, +httpClient, +SqsAsyncClient.builder(), +SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT, +SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX); +} + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), +SqsExceptionClassifiers +.getResourceNotFoundExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2123460881 > Thanks for the efforts. Could we remove the ticket link from the PR title, it should be automatically linked using "Autolink" > > We need to add documentation for the new feature as well 1) Updated 2) Sure, will create a documentation on this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org