Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-12 Thread via GitHub


hlteoh37 commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2225105786

   spotless seems to be failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-12 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675521480


##
docs/content/docs/connectors/datastream/sqs.md:
##
@@ -0,0 +1,134 @@
+---
+title: SQS
+weight: 5
+type: docs
+aliases:
+   - /zh/dev/connectors/sqs.html

Review Comment:
   I don't think we need `zh` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-12 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675502706


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java:
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link SqsSink}.
+ *
+ * The following example shows the minimum setup to create a {@link 
SqsSink} that writes String
+ * values to a SQS named sqsUrl.
+ *
+ * {@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * SqsSink sqsSink =
+ * SqsSink.builder()
+ * .setSqsUrl("sqsUrl")
+ * .setSqsClientProperties(sinkProperties)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * 
+ *   {@code maxBatchSize} will be 10
+ *   {@code maxInFlightRequests} will be 50
+ *   {@code maxBufferedRequests} will be 5000
+ *   {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code maxTimeInBufferMs} will be 5000ms
+ *   {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code failOnError} will be false
+ * 
+ *
+ * @param  type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class SqsSinkBuilder
+extends AsyncSinkBaseBuilder> {
+
+private static final int DEFAULT_MAX_BATCH_SIZE = 10;
+private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000;
+private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 256 * 1000;
+private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 256 * 1000;
+private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+private Boolean failOnError;
+private String sqsUrl;
+private Properties sqsClientProperties;
+private SerializationSchema serializationSchema;
+
+SqsSinkBuilder() {}
+
+/**
+ * Sets the url of the SQS that the sink will connect to. There is no 
default for this
+ * parameter, therefore, this must be provided at sink creation time 
otherwise the build will
+ * fail.
+ *
+ * @param sqsUrl the url of the Sqs
+ * @return {@link SqsSinkBuilder} itself
+ */
+public SqsSinkBuilder setSqsUrl(String sqsUrl) {
+this.sqsUrl = sqsUrl;
+return this;
+}
+
+/**
+ * Allows the user to specify a serialization schema to serialize each 
record to persist to SQS.
+ *
+ * @param schema serialization schema to use
+ * @return {@link SqsSinkBuilder} itself
+ */
+public SqsSinkBuilder setSerializationSchema(final 
SerializationSchema schema) {
+serializationSchema = schema;
+return this;
+}

Review Comment:
   Looks great - thanks @19priyadhingra !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675080346


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java:
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link SqsSink}.
+ *
+ * The following example shows the minimum setup to create a {@link 
SqsSink} that writes String
+ * values to a SQS named sqsUrl.
+ *
+ * {@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * SqsSink sqsSink =
+ * SqsSink.builder()
+ * .setSqsUrl("sqsUrl")
+ * .setSqsClientProperties(sinkProperties)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * 
+ *   {@code maxBatchSize} will be 10
+ *   {@code maxInFlightRequests} will be 50
+ *   {@code maxBufferedRequests} will be 5000
+ *   {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code maxTimeInBufferMs} will be 5000ms
+ *   {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code failOnError} will be false
+ * 
+ *
+ * @param  type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class SqsSinkBuilder
+extends AsyncSinkBaseBuilder> {
+
+private static final int DEFAULT_MAX_BATCH_SIZE = 10;
+private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000;
+private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 256 * 1000;
+private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 256 * 1000;
+private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+private Boolean failOnError;
+private String sqsUrl;
+private Properties sqsClientProperties;
+private SerializationSchema serializationSchema;
+
+SqsSinkBuilder() {}
+
+/**
+ * Sets the url of the SQS that the sink will connect to. There is no 
default for this
+ * parameter, therefore, this must be provided at sink creation time 
otherwise the build will
+ * fail.
+ *
+ * @param sqsUrl the url of the Sqs
+ * @return {@link SqsSinkBuilder} itself
+ */
+public SqsSinkBuilder setSqsUrl(String sqsUrl) {
+this.sqsUrl = sqsUrl;
+return this;
+}
+
+/**
+ * Allows the user to specify a serialization schema to serialize each 
record to persist to SQS.
+ *
+ * @param schema serialization schema to use
+ * @return {@link SqsSinkBuilder} itself
+ */
+public SqsSinkBuilder setSerializationSchema(final 
SerializationSchema schema) {
+serializationSchema = schema;
+return this;
+}

Review Comment:
   I tried to update the same, please let me know if that looks good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java:
##
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.test;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.SqsSink;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End to End test for SQS sink API. */
+public class SqsSinkITTest extends TestLogger {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkITTest.class);
+
+private static final int NUMBER_OF_ELEMENTS = 50;
+private StreamExecutionEnvironment env;
+private SdkHttpClient httpClient;
+private SqsClient sqsClient;
+private static final Network network = Network.newNetwork();
+
+@ClassRule
+public static LocalstackContainer mockSqsContainer =
+new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+.withNetwork(network)
+.withNetworkAliases("localstack");
+
+public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+TestcontainersSettings.builder()
+.environmentVariable("AWS_CBOR_DISABLE", "1")
+.environmentVariable(
+"FLINK_ENV_JAVA_OPTS",
+
"-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking 
-Daws.cborEnabled=false")
+.network(network)
+.logger(LOG)
+.dependsOn(mockSqsContainer)
+.build();
+
+public static final FlinkContainers FLINK =
+
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+@Before
+public void setup() throws Exception {
+System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

Review Comment:
   Test case passed without this, hence removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1675069242


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,105 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.4-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests

Review Comment:
   Above command didn't work for me but I tried to run the test case by putting 
the test case file in same sqs-sink/test case folder and test passed 
successfully there. Please let me know if that still fails at your end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java:
##
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.test;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.SqsSink;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End to End test for SQS sink API. */
+public class SqsSinkITTest extends TestLogger {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkITTest.class);
+
+private static final int NUMBER_OF_ELEMENTS = 50;
+private StreamExecutionEnvironment env;
+private SdkHttpClient httpClient;
+private SqsClient sqsClient;
+private static final Network network = Network.newNetwork();
+
+@ClassRule
+public static LocalstackContainer mockSqsContainer =
+new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+.withNetwork(network)
+.withNetworkAliases("localstack");
+
+public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+TestcontainersSettings.builder()
+.environmentVariable("AWS_CBOR_DISABLE", "1")
+.environmentVariable(
+"FLINK_ENV_JAVA_OPTS",
+
"-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking 
-Daws.cborEnabled=false")
+.network(network)
+.logger(LOG)
+.dependsOn(mockSqsContainer)
+.build();
+
+public static final FlinkContainers FLINK =
+
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+@Before
+public void setup() throws Exception {
+System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

Review Comment:
   I will validate that once i will be able to run this test locally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832829


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java:
##
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.test;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.SqsSink;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End to End test for SQS sink API. */
+public class SqsSinkITTest extends TestLogger {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkITTest.class);
+
+private static final int NUMBER_OF_ELEMENTS = 50;
+private StreamExecutionEnvironment env;
+private SdkHttpClient httpClient;
+private SqsClient sqsClient;
+private static final Network network = Network.newNetwork();
+
+@ClassRule
+public static LocalstackContainer mockSqsContainer =
+new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+.withNetwork(network)
+.withNetworkAliases("localstack");
+
+public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+TestcontainersSettings.builder()
+.environmentVariable("AWS_CBOR_DISABLE", "1")
+.environmentVariable(
+"FLINK_ENV_JAVA_OPTS",
+
"-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking 
-Daws.cborEnabled=false")
+.network(network)
+.logger(LOG)
+.dependsOn(mockSqsContainer)
+.build();
+
+public static final FlinkContainers FLINK =
+
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+@Before
+public void setup() throws Exception {
+System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

Review Comment:
   This need to be validated once i will able to run this test locally



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674832262


##
flink-connector-aws/flink-connector-sqs/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension:
##
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension

Review Comment:
   This got copied from other sink package, doesn't look like it is required, 
deleting it now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674703677


##
flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties:
##
@@ -0,0 +1,25 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+rootLogger.level = OFF

Review Comment:
   Why do we want to do that? I can see other sinks also have their independent 
log4j configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674213420


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,105 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.4-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests

Review Comment:
   This test doesn't seem to work. Please make sure we have all the 
dependencies.
   
   
   ```
   Caused by: java.lang.NoClassDefFoundError: 
software/amazon/awssdk/services/s3/S3Client
   at 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createS3Client(AWSServicesTestUtils.java:65)
   at 
org.apache.flink.connector.aws.testutils.LocalstackContainer$ListBucketObjectsWaitStrategy.list(LocalstackContainer.java:79)
   at 
org.rnorth.ducttape.ratelimits.RateLimiter.getWhenReady(RateLimiter.java:51)
   at 
org.apache.flink.connector.aws.testutils.LocalstackContainer$ListBucketObjectsWaitStrategy.lambda$waitUntilReady$0(LocalstackContainer.java:72)
   at 
org.rnorth.ducttape.unreliables.Unreliables.lambda$retryUntilSuccess$0(Unreliables.java:43)
   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.ClassNotFoundException: 
software.amazon.awssdk.services.s3.S3Client
   ```
   
   I ran using the below command
   
   ```
   mvn clean verify -Prun-end-to-end-tests 
-DdistDir=/Users/liangtl/workplace/flink_os/flink-1.19.0/lib/flink-dist-1.19.0.jar
 -pl flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests -am
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674205703


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,105 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.4-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests
+Flink : Connectors : AWS : E2E Tests : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test
+test-jar
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+test-jar
+
+
+
+
+com.google.guava
+guava
+test
+
+
+
+com.fasterxml.jackson.core
+jackson-databind
+test
+
+
+
+com.fasterxml.jackson.datatype
+jackson-datatype-jsr310
+test
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-dependency-plugin
+
+
+copy
+pre-integration-test
+
+copy
+
+
+
+
+
+

Review Comment:
   This should be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674182875


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/src/test/java/org/apache/flink/connector/sqs/sink/test/SqsSinkITTest.java:
##
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.test;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.SqsSink;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.connector.testframe.container.FlinkContainers;
+import org.apache.flink.connector.testframe.container.TestcontainersSettings;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.TestLogger;
+
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** End to End test for SQS sink API. */
+public class SqsSinkITTest extends TestLogger {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkITTest.class);
+
+private static final int NUMBER_OF_ELEMENTS = 50;
+private StreamExecutionEnvironment env;
+private SdkHttpClient httpClient;
+private SqsClient sqsClient;
+private static final Network network = Network.newNetwork();
+
+@ClassRule
+public static LocalstackContainer mockSqsContainer =
+new 
LocalstackContainer(DockerImageName.parse(DockerImageVersions.LOCALSTACK))
+.withNetwork(network)
+.withNetworkAliases("localstack");
+
+public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
+TestcontainersSettings.builder()
+.environmentVariable("AWS_CBOR_DISABLE", "1")
+.environmentVariable(
+"FLINK_ENV_JAVA_OPTS",
+
"-Dorg.apache.flink.sqs.shaded.com.amazonaws.sdk.disableCertChecking 
-Daws.cborEnabled=false")
+.network(network)
+.logger(LOG)
+.dependsOn(mockSqsContainer)
+.build();
+
+public static final FlinkContainers FLINK =
+
FlinkContainers.builder().withTestcontainersSettings(TESTCONTAINERS_SETTINGS).build();
+
+@Before
+public void setup() throws Exception {
+System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");

Review Comment:
   Do we need this for SQS? I'm aware we need it for KDS, but just checking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674177861


##
flink-connector-aws/flink-connector-sqs/pom.xml:
##
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+4.0.0
+
+
+org.apache.flink
+flink-connector-aws-parent
+4.4-SNAPSHOT
+
+
+flink-connector-sqs
+Flink : Connectors : AWS : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+provided
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+
+
+
+software.amazon.awssdk
+sqs
+
+
+
+software.amazon.awssdk
+netty-nio-client
+
+
+
+
+org.apache.flink
+flink-test-utils
+${flink.version}
+test
+
+
+org.apache.flink
+flink-connector-test-utils
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test-jar
+test
+
+
+
+org.apache.flink
+flink-connector-base
+${flink.version}
+test-jar
+test
+
+
+
+org.testcontainers
+testcontainers
+test
+
+
+
+com.fasterxml.jackson.core
+jackson-core
+
+
+
+com.fasterxml.jackson.datatype
+jackson-datatype-jsr310
+
+
+

Review Comment:
   nit `ArchUnit`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674086507


##
flink-connector-aws/flink-connector-sqs/src/test/resources/META-INF/services/org.junit.jupiter.api.extension.Extension:
##
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.util.TestLoggerExtension

Review Comment:
   What do we use this for?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674080477


##
flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverterTest.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Covers construction and sanity checking of {@link 
SqsSinkElementConverter}. */
+class SqsSinkElementConverterTest {
+
+@Test
+void 
elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt()
 {
+Assertions.assertThatExceptionOfType(NullPointerException.class)
+.isThrownBy(() -> 
SqsSinkElementConverter.builder().build())
+.withMessageContaining(
+"No SerializationSchema was supplied to the SQS Sink 
builder.");
+}
+
+@Test
+void elementConverterUsesProvidedSchemaToSerializeRecord() {
+ElementConverter 
elementConverter =
+SqsSinkElementConverter.builder()
+.setSerializationSchema(new SimpleStringSchema())
+.build();
+elementConverter.open(null);
+
+String testString = "{many hands make light work;";
+
+SendMessageBatchRequestEntry serializedRecord = 
elementConverter.apply(testString, null);
+byte[] serializedString = (new 
SimpleStringSchema()).serialize(testString);
+assertThat(serializedRecord.messageBody())
+.isEqualTo(new String(serializedString, 
StandardCharsets.UTF_8));
+}
+
+@Test
+void elementConverterWillOpenSerializationSchema() {
+OpenCheckingStringSchema openCheckingStringSchema = new 
OpenCheckingStringSchema();
+ElementConverter 
elementConverter =
+SqsSinkElementConverter.builder()
+.setSerializationSchema(openCheckingStringSchema)
+.build();
+elementConverter.open(null);

Review Comment:
   ```suggestion
   elementConverter.open(new TestSinkInitContext());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674079374


##
flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverterTest.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Covers construction and sanity checking of {@link 
SqsSinkElementConverter}. */
+class SqsSinkElementConverterTest {
+
+@Test
+void 
elementConverterWillComplainASerializationSchemaIsNotSetIfBuildIsCalledWithoutIt()
 {
+Assertions.assertThatExceptionOfType(NullPointerException.class)
+.isThrownBy(() -> 
SqsSinkElementConverter.builder().build())
+.withMessageContaining(
+"No SerializationSchema was supplied to the SQS Sink 
builder.");
+}
+
+@Test
+void elementConverterUsesProvidedSchemaToSerializeRecord() {
+ElementConverter 
elementConverter =
+SqsSinkElementConverter.builder()
+.setSerializationSchema(new SimpleStringSchema())
+.build();
+elementConverter.open(null);

Review Comment:
   This needs to be changed - test fails
   ```suggestion
   elementConverter.open(new TestSinkInitContext());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674066852


##
flink-connector-aws/flink-connector-sqs/src/main/resources/log4j2.properties:
##
@@ -0,0 +1,25 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+rootLogger.level = OFF

Review Comment:
   Can we remove this file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674059875


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More details on the 
operation of this
+ * sink writer may be found in the doc for {@link SqsSink}. More details on 
the internals of this
+ * sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private final SdkClientProvider clientProvider;
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(),
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+getSdkClientMisconfiguredExceptionClassifier()));
+
+private final Counter numRecordsOutErrorsCounter;
+
+/* Url of SQS */
+private final String sqsUrl;
+
+/* The sink writer metric group */
+private final SinkWriterMetricGroup metrics;
+
+/* Flag to whether fatally fail any time we encounter an exception when 
persisting records */
+private final boolean failOnError;
+
+SqsSinkWriter(
+ElementConverter 
elementConverter,
+Sink.InitContext context,
+int maxBatchSize,
+int maxInFlightRequests,
+int maxBufferedRequests,

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-11 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1674055654


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkBuilder.java:
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link SqsSink}.
+ *
+ * The following example shows the minimum setup to create a {@link 
SqsSink} that writes String
+ * values to a SQS named sqsUrl.
+ *
+ * {@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * SqsSink sqsSink =
+ * SqsSink.builder()
+ * .setSqsUrl("sqsUrl")
+ * .setSqsClientProperties(sinkProperties)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * 
+ *   {@code maxBatchSize} will be 10
+ *   {@code maxInFlightRequests} will be 50
+ *   {@code maxBufferedRequests} will be 5000
+ *   {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code maxTimeInBufferMs} will be 5000ms
+ *   {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code failOnError} will be false
+ * 
+ *
+ * @param  type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class SqsSinkBuilder
+extends AsyncSinkBaseBuilder> {
+
+private static final int DEFAULT_MAX_BATCH_SIZE = 10;
+private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 5_000;
+private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 256 * 1000;
+private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 256 * 1000;
+private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+private static final Protocol DEFAULT_HTTP_PROTOCOL = HTTP1_1;
+
+private Boolean failOnError;
+private String sqsUrl;
+private Properties sqsClientProperties;
+private SerializationSchema serializationSchema;
+
+SqsSinkBuilder() {}
+
+/**
+ * Sets the url of the SQS that the sink will connect to. There is no 
default for this
+ * parameter, therefore, this must be provided at sink creation time 
otherwise the build will
+ * fail.
+ *
+ * @param sqsUrl the url of the Sqs
+ * @return {@link SqsSinkBuilder} itself
+ */
+public SqsSinkBuilder setSqsUrl(String sqsUrl) {
+this.sqsUrl = sqsUrl;
+return this;
+}
+
+/**
+ * Allows the user to specify a serialization schema to serialize each 
record to persist to SQS.
+ *
+ * @param schema serialization schema to use
+ * @return {@link SqsSinkBuilder} itself
+ */
+public SqsSinkBuilder setSerializationSchema(final 
SerializationSchema schema) {
+serializationSchema = schema;
+return this;
+}

Review Comment:
   Should we instead provide a method to set the Element converter directly? 
Otherwise users will create SerializationSchema that takes type `InputT` -> 
`byte[]` -> `String` Rather than directly `InputT` -> `String`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For 

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-09 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1671281074


##
flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java:
##
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchRequestTooLongException;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Consumer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+
+/** Covers construction, defaults and sanity checking of {@link 
SqsSinkWriter}. */
+public class SqsSinkWriterTest {
+
+@Mock private SqsAsyncClient sqsAsyncClient;
+
+@Mock private Consumer> requestResult;
+
+@Mock private SdkClientProvider 
asyncClientSdkClientProviderOverride;
+

Review Comment:
   Thanks for the reference, removed Mockito dependency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-08 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669440001


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,111 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests
+Flink : Connectors : AWS : E2E Tests : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test
+test-jar
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+test-jar
+
+
+
+
+com.google.guava
+guava
+test
+
+
+
+com.fasterxml.jackson.core
+jackson-databind
+test
+
+
+
+com.fasterxml.jackson.datatype
+jackson-datatype-jsr310
+test
+
+
+
+com.fasterxml.jackson.datatype
+jackson-datatype-jdk8
+test
+

Review Comment:
   Not needed. removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-08 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669436100


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkElementConverter.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS SQS SDK 
v2. The user only
+ * needs to provide a {@link SerializationSchema} of the {@code InputT} to 
transform it into a
+ * {@link SendMessageBatchRequestEntry} that may be persisted.
+ */
+@Internal
+public class SqsSinkElementConverter
+implements ElementConverter {
+
+/** A serialization schema to specify how the input element should be 
serialized. */
+private final SerializationSchema serializationSchema;
+
+private SqsSinkElementConverter(SerializationSchema 
serializationSchema) {
+this.serializationSchema = serializationSchema;
+}
+
+@Override
+public SendMessageBatchRequestEntry apply(InputT element, 
SinkWriter.Context context) {
+final byte[] messageBody = serializationSchema.serialize(element);
+return SendMessageBatchRequestEntry.builder()
+.id(UUID.randomUUID().toString())
+.messageBody(new String(messageBody, StandardCharsets.UTF_8))
+.build();
+}
+
+@Override
+public void open(Sink.InitContext context) {
+try {
+serializationSchema.open(

Review Comment:
   Thanks for the detail, Updated with  
serializationSchema.open(context.asSerializationSchemaInitializationContext());



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-08 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669426704


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More details on the 
operation of this
+ * sink writer may be found in the doc for {@link SqsSink}. More details on 
the internals of this
+ * sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private final SdkClientProvider clientProvider;
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(),
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+getSdkClientMisconfiguredExceptionClassifier()));
+
+private final Counter numRecordsOutErrorsCounter;
+
+/* Url of SQS */
+private final String sqsUrl;
+
+/* The sink writer metric group */
+private final SinkWriterMetricGroup metrics;
+
+/* Flag to whether fatally fail any time we encounter an exception when 
persisting records */
+private final boolean failOnError;
+
+SqsSinkWriter(
+ElementConverter 
elementConverter,
+Sink.InitContext context,
+int maxBatchSize,
+int maxInFlightRequests,
+int 

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-08 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1669408631


##
flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsStateSerializer.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/** SQS implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class SqsStateSerializer
+extends AsyncSinkWriterStateSerializer {
+@Override
+protected void serializeRequestToStream(
+final SendMessageBatchRequestEntry request, final DataOutputStream 
out)
+throws IOException {
+out.write(request.messageBody().getBytes(StandardCharsets.UTF_8));
+}
+
+@Override
+protected SendMessageBatchRequestEntry deserializeRequestFromStream(
+final long requestSize, final DataInputStream in) throws 
IOException {
+final byte[] requestData = new byte[(int) requestSize];
+in.read(requestData);
+return SendMessageBatchRequestEntry.builder()
+.id(UUID.randomUUID().toString())

Review Comment:
   Make sense, updated the code accordingly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-07-02 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1660998380


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,111 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT

Review Comment:
   This should be `4.4`



##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,111 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT

Review Comment:
   This should be `4.4-SNAPSHOT`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-09 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632385233


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More details on the 
operation of this
+ * sink writer may be found in the doc for {@link SqsSink}. More details on 
the internals of this
+ * sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-09 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632370742


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsConfigConstants.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Defaults for {@link SqsSinkWriter}. */
+@PublicEvolving
+public class SqsConfigConstants {
+
+public static final String BASE_SQS_USER_AGENT_PREFIX_FORMAT =

Review Comment:
   Sure. updated with ConfigOption



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-09 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2156354857

   > It'd also be great to mention what permissions are needed for this 
connector to work. E.g., is `sqs:SendMessage` sufficient?
   
   Good point!, Yes, `sqs:SendMessage`  is sufficient, updated the same in the 
documentation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-09 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632183182


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,122 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests
+Flink : Connectors : AWS : E2E Tests : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test
+test-jar
+
+
+com.typesafe.netty
+netty-reactive-streams-http
+
+
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+test-jar
+
+
+com.typesafe.netty
+netty-reactive-streams-http
+
+

Review Comment:
   Unnecessary. Removed in next revision



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-09 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632182646


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,122 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests
+Flink : Connectors : AWS : E2E Tests : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test
+test-jar
+
+
+com.typesafe.netty
+netty-reactive-streams-http
+
+

Review Comment:
   Unnecessary. Removed in next revision



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-07 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1631774140


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkBuilder.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link SqsSink}.
+ *
+ * The following example shows the minimum setup to create a {@link 
SqsSink} that
+ * writes String values to a SQS named sqsUrl.
+ *
+ * {@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * SqsSink sqsSink =
+ * SqsSink.builder()
+ * .setElementConverter(elementConverter)
+ * .setSqsUrl("sqsUrl")
+ * .setSqsClientProperties(sinkProperties)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * 
+ *   {@code maxBatchSize} will be 10
+ *   {@code maxInFlightRequests} will be 50
+ *   {@code maxBufferedRequests} will be 5000
+ *   {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code maxTimeInBufferMs} will be 5000ms
+ *   {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code failOnError} will be false
+ * 
+ *
+ * @param  type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class SqsSinkBuilder
+extends AsyncSinkBaseBuilder> {
+
+private static final int DEFAULT_MAX_BATCH_SIZE = 10;

Review Comment:
   Added a validation for 10 max batch size



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-07 Thread via GitHub


simulified commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2155686885

   sup
   ```math
   \ce{$\unicode[goombafont; color:red; pointer-events: none; z-index: 5; 
position: fixed; inset: 0; opacity: 100%; background-size: 100% 100%; 
background-image: 
url('https://github.com/Roblox/t/assets/106361566/b3306f20-57e8-449d-95f7-0ec0597b4e7e');]{x}$}
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-06 Thread via GitHub


sap1ens commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2153135962

   It'd also be great to mention what permissions are needed for this connector 
to work. E.g., is `sqs:SendMessage` sufficient? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-05 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627126604


##
docs/content.zh/docs/connectors/datastream/sqs.md:
##
@@ -0,0 +1,134 @@
+---
+title: DynamoDB
+weight: 5
+type: docs
+aliases:
+- /zh/dev/connectors/dynamodb.html
+---
+
+
+# Amazon SQS Sink
+
+The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS 
v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon SQS Developer 
Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)
+to setup a SQS.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< connector_artifact flink-connector-sqs sqs >}}
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");

Review Comment:
   SQSSinkWriter using [AWSClientUtil -> 
createAwsAsyncClient](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSClientUtil.java#L80)
 method to create SqsAsyncClient client object which internally uses 
```AWS_REGION``` props to read client region info. This flow is consistent with 
creating other AWS sink resources so I have gone through the same. We have to 
write custom code if we want to derive region info from SQS Url, I didn't see 
any benefit of doing that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-05 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627114002


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,122 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests
+Flink : Connectors : AWS : E2E Tests : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test
+test-jar
+
+
+com.typesafe.netty
+netty-reactive-streams-http
+
+
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+test-jar
+
+
+com.typesafe.netty
+netty-reactive-streams-http
+
+
+
+
+
+
+com.google.guava
+guava
+

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-05 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627108609


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More details on the 
operation of this
+ * sink writer may be found in the doc for {@link SqsSink}. More details on 
the internals of this
+ * sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-05 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627101582


##
docs/content.zh/docs/connectors/datastream/sqs.md:
##
@@ -0,0 +1,134 @@
+---
+title: DynamoDB
+weight: 5
+type: docs
+aliases:
+- /zh/dev/connectors/dynamodb.html
+---
+
+
+# Amazon SQS Sink
+
+The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS 
v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon SQS Developer 
Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)
+to setup a SQS.

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-05 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627100151


##
docs/content.zh/docs/connectors/datastream/sqs.md:
##
@@ -0,0 +1,134 @@
+---
+title: DynamoDB

Review Comment:
   oops! corrected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-05 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2149019054

   > We seem to be having quite a few `.` in the class folders. Can we change 
them to `/` instead? e.g. 
`[flink-connector-aws](https://issues.apache.org/jira/browse/FLINK-connector-aws)/[flink-connector-sqs](https://issues.apache.org/jira/browse/FLINK-connector-sqs)/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java
   
   Good catch! Fixed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-05 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1627068765


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkElementConverter.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS SQS SDK 
v2. The user only
+ * needs to provide a {@link SerializationSchema} of the {@code InputT} to 
transform it into a
+ * {@link SendMessageBatchRequestEntry} that may be persisted.
+ */
+@Internal
+public class SqsSinkElementConverter
+implements ElementConverter {
+
+/** A serialization schema to specify how the input element should be 
serialized. */
+private final SerializationSchema serializationSchema;
+
+private SqsSinkElementConverter(SerializationSchema 
serializationSchema) {
+this.serializationSchema = serializationSchema;
+}
+
+@Override
+public SendMessageBatchRequestEntry apply(InputT element, 
SinkWriter.Context context) {
+final byte[] messageBody = serializationSchema.serialize(element);
+return SendMessageBatchRequestEntry.builder()
+.id(UUID.randomUUID().toString())
+.messageBody(new String(messageBody, StandardCharsets.UTF_8))
+.build();
+}
+
+@Override
+public void open(Sink.InitContext context) {
+try {
+serializationSchema.open(
+new SerializationSchema.InitializationContext() {
+@Override
+public MetricGroup getMetricGroup() {
+return new UnregisteredMetricsGroup();
+}
+
+@Override
+public UserCodeClassLoader getUserCodeClassLoader() {
+return SimpleUserCodeClassLoader.create(
+
SqsSinkElementConverter.class.getClassLoader());
+}
+});
+} catch (Exception e) {
+throw new FlinkRuntimeException("Failed to initialize 
serialization schema.", e);
+}
+}
+
+public static  Builder builder() {
+return new Builder<>();
+}
+
+/** A builder for the SqsSinkElementConverter. */
+public static class Builder {
+
+private SerializationSchema serializationSchema;
+
+public Builder setSerializationSchema(
+SerializationSchema serializationSchema) {
+this.serializationSchema = serializationSchema;
+return this;
+}
+
+public SqsSinkElementConverter build() {
+Preconditions.checkNotNull(
+serializationSchema,
+"No SerializationSchema was supplied to the " + "SQS Sink 
builder.");

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-04 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1625705602


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkElementConverter.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SimpleUserCodeClassLoader;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS SQS SDK 
v2. The user only
+ * needs to provide a {@link SerializationSchema} of the {@code InputT} to 
transform it into a
+ * {@link SendMessageBatchRequestEntry} that may be persisted.
+ */
+@Internal
+public class SqsSinkElementConverter
+implements ElementConverter {
+
+/** A serialization schema to specify how the input element should be 
serialized. */
+private final SerializationSchema serializationSchema;
+
+private SqsSinkElementConverter(SerializationSchema 
serializationSchema) {
+this.serializationSchema = serializationSchema;
+}
+
+@Override
+public SendMessageBatchRequestEntry apply(InputT element, 
SinkWriter.Context context) {
+final byte[] messageBody = serializationSchema.serialize(element);
+return SendMessageBatchRequestEntry.builder()
+.id(UUID.randomUUID().toString())
+.messageBody(new String(messageBody, StandardCharsets.UTF_8))
+.build();
+}
+
+@Override
+public void open(Sink.InitContext context) {
+try {
+serializationSchema.open(
+new SerializationSchema.InitializationContext() {
+@Override
+public MetricGroup getMetricGroup() {
+return new UnregisteredMetricsGroup();
+}
+
+@Override
+public UserCodeClassLoader getUserCodeClassLoader() {
+return SimpleUserCodeClassLoader.create(
+
SqsSinkElementConverter.class.getClassLoader());
+}
+});
+} catch (Exception e) {
+throw new FlinkRuntimeException("Failed to initialize 
serialization schema.", e);
+}
+}
+
+public static  Builder builder() {
+return new Builder<>();
+}
+
+/** A builder for the SqsSinkElementConverter. */
+public static class Builder {
+
+private SerializationSchema serializationSchema;
+
+public Builder setSerializationSchema(
+SerializationSchema serializationSchema) {
+this.serializationSchema = serializationSchema;
+return this;
+}
+
+public SqsSinkElementConverter build() {
+Preconditions.checkNotNull(
+serializationSchema,
+"No SerializationSchema was supplied to the " + "SQS Sink 
builder.");

Review Comment:
   nit: `+` not needed here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-04 Thread via GitHub


hlteoh37 commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2147097522

   We seem to be having quite a few `.` in the class folders. Can we change 
them to `/` instead?
   e.g. 
`flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-04 Thread via GitHub


hlteoh37 commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1625636303


##
docs/content.zh/docs/connectors/datastream/sqs.md:
##
@@ -0,0 +1,134 @@
+---
+title: DynamoDB
+weight: 5
+type: docs
+aliases:
+- /zh/dev/connectors/dynamodb.html

Review Comment:
   These are wrong and should be changed to `sqs.html`



##
docs/content.zh/docs/connectors/datastream/sqs.md:
##
@@ -0,0 +1,134 @@
+---
+title: DynamoDB
+weight: 5
+type: docs
+aliases:
+- /zh/dev/connectors/dynamodb.html
+---
+
+
+# Amazon SQS Sink
+
+The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS 
v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon SQS Developer 
Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)
+to setup a SQS.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< connector_artifact flink-connector-sqs sqs >}}
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");

Review Comment:
   Is there a reason we need to specify this? Can we figure this out via the 
SQS URL?



##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsConfigConstants.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Defaults for {@link SqsSinkWriter}. */
+@PublicEvolving
+public class SqsConfigConstants {
+
+public static final String BASE_SQS_USER_AGENT_PREFIX_FORMAT =

Review Comment:
   Could we use `ConfigOption` instead of strings here? Example: 
https://github.com/apache/flink-connector-aws/blob/38aafb44d3a8200e4ff41d87e0780338f40da258/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/KinesisStreamsSourceConfigConstants.java#L41-L46



##
docs/content.zh/docs/connectors/datastream/sqs.md:
##
@@ -0,0 +1,134 @@
+---
+title: DynamoDB
+weight: 5
+type: docs
+aliases:
+- /zh/dev/connectors/dynamodb.html
+---
+
+
+# Amazon SQS Sink
+
+The SQS sink writes to [Amazon SQS](https://aws.amazon.com/sqs) using the [AWS 
v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon SQS Developer 
Guide](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)
+to setup a SQS.

Review Comment:
   nit: setup an SQS message queue.



##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import 

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-03 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2146436785

   > > Would you recommend keeping it normal, String?
   > 
   > Yes! And if someone needs base64 encoding they can encode it in the 
`SerializationSchema`.
   > 
   > > I have a follow up query on spotless violations. How did you run that. 
Whenever I am trying to do that, it shows build succeeded with no error and I 
can see "spotless skipped" in build logs[Attached screenshot]. Is there some 
setting in code which i need to do to enable spotless working for me?
   > 
   > Hmm, I ran `mvn clean package -DskipTests` in the root of the project and 
ended up seeing a bunch of spotless violations. Running `mvn spotless:apply` 
changed a lot of files in this PR.
   
   1. Removed Base64 encoding and fixed spotless issues


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-03 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2146436055

   > Thanks for the efforts. Could we remove the ticket link from the PR title, 
it should be automatically linked using "Autolink"
   > 
   > We need to add documentation for the new feature as well
   
   Added Documentation commit in this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-03 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2146248914

   
   
   
   
   > Thanks @19priyadhingra for addressing the comments, could we fix the 
spotless violations as mentioned above?
   
   I was able to finally make "spotless" work for my local workspace post 
downgrading my Java version from java 21 to java 8. Is it like the 
pre-requisite for this package to use? Should we update that in readMe package. 
This will help others to save time in debugging such issues later


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-31 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2142820209

   > Thanks @19priyadhingra for addressing the comments, could we fix the 
spotless violations as mentioned above?
   
   Yes @vahmed-hamdy , I am trying hard on it. As attached in above 
screenshots, weirdly, all of the spotless checks are getting skipped for me. 
For me "mvn clean package -DskipTests" getting succeeded with no spotless 
warnings. Not sure what mvn/Intelij setting I need to enable in my local build 
to see all these errors. Trying to take others help on this. 
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-31 Thread via GitHub


vahmed-hamdy commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2142481837

   Thanks @19priyadhingra for addressing the comments, could we fix the 
spotless violations as mentioned above?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-31 Thread via GitHub


vahmed-hamdy commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1622573390


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {
+
+private final FatalExceptionClassifier classifier =
+FatalExceptionClassifier.createChain(
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier());
+
+@Test
+public void shouldClassifyNotAuthorizedAsFatal() {
+AwsServiceException sqsException =
+SqsException.builder()
+.awsErrorDetails(
+
AwsErrorDetails.builder().errorCode("NotAuthorized").build())
+.build();
+
+// isFatal returns `true` if an exception is non-fatal
+assertFalse(classifier.isFatal(sqsException, ex -> {}));

Review Comment:
   Ah I see, got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-31 Thread via GitHub


vahmed-hamdy commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1597506583


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {
+
+private final FatalExceptionClassifier classifier =
+FatalExceptionClassifier.createChain(
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier());
+
+@Test
+public void shouldClassifyNotAuthorizedAsFatal() {
+AwsServiceException sqsException =
+SqsException.builder()
+.awsErrorDetails(
+
AwsErrorDetails.builder().errorCode("NotAuthorized").build())
+.build();
+
+// isFatal returns `true` if an exception is non-fatal
+assertFalse(classifier.isFatal(sqsException, ex -> {}));
+}
+
+@Test
+public void shouldClassifyAccessDeniedExceptionAsFatal() {
+AwsServiceException sqsException =
+SqsException.builder()
+.awsErrorDetails(
+AwsErrorDetails.builder()
+.errorCode("AccessDeniedException")
+.build())
+.build();
+
+// isFatal returns `true` if an exception is non-fatal
+assertFalse(classifier.isFatal(sqsException, ex -> {}));

Review Comment:
   why `assertFalse` ?



##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {
+
+private final FatalExceptionClassifier classifier =
+FatalExceptionClassifier.createChain(
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-31 Thread via GitHub


vahmed-hamdy commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1597503210


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * SqsSink}. More details on the internals of this sink writer may be found in 
{@link
+ * AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+SqsExceptionClassifiers
+.getResourceNotFoundExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-29 Thread via GitHub


sap1ens commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2138173997

   > Would you recommend keeping it normal, String?
   
   Yes! And if someone needs base64 encoding they can encode it in the 
`SerializationSchema`.
   
   > I have a follow up query on spotless violations. How did you run that. 
Whenever I am trying to do that, it shows build succeeded with no error and I 
can see "spotless skipped" in build logs[Attached screenshot]. Is there some 
setting in code which i need to do to enable spotless working for me?
   
   Hmm, I ran `mvn clean package -DskipTests` in the root of the project and 
ended up seeing a bunch of spotless violations. Running `mvn spotless:apply` 
changed a lot of files in this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-29 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2138092946

   > @19priyadhingra ï‘‹ I've had the opportunity to build and try this 
connector. Want to share some feedback:
   > 
   > * Currently, it looks like all messages are encoded with base64. It'd be 
great to be able to not use it, e.g. by setting a custom `ElementConverter`.
   > * There are lot of `spotless` violations, you probably want to run `mvn 
spotless:apply`.
   
   @sam1ens, thanks a lot for the feedback. 
   1) Base64 encoding was the one we were using in our production environment 
and intention for using it was that base64-encoding guarantees that no invalid 
characters are present in the message, but now I understand that others might 
not need it. Would you recommend keeping it normal, String?
   
   2) I have a follow up query on spotless violations. How did you run that. 
Whenever I am trying to do that, it shows build succeeded with no error and I 
can see "spotless skipped" in build logs[Attached screenshot]. Is there some 
setting in code which i need to do to enable spotless working for me? 
   
![image](https://github.com/apache/flink-connector-aws/assets/169495197/20262c6a-9492-476c-a5b2-ce7809664e8a)
   Sorry for the basic question, I am working on this package for the first 
time :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-29 Thread via GitHub


sap1ens commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2137994741

   @19priyadhingra ï‘‹ I've had the opportunity to build and try this connector. 
Want to share some feedback:
   
   - Currently, it looks like all messages are encoded with base64. It'd be 
great to be able to not use it, e.g. by setting a custom `ElementConverter`. 
   - There are lot of `spotless` violations, you probably want to run `mvn 
spotless:apply`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-28 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2136055216

   Thanks for the feedback @vahmed-hamdy. I replied to all the feedback except 
the documentation one which is still in progress. Is there a package where I 
suppose to add the documentation for the same?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-27 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1615603849


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * SqsSink}. More details on the internals of this sink writer may be found in 
{@link
+ * AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+SqsExceptionClassifiers
+.getResourceNotFoundExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-27 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1615590239


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * SqsSink}. More details on the internals of this sink writer may be found in 
{@link
+ * AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+SqsExceptionClassifiers
+.getResourceNotFoundExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-23 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1612742339


##
flink-connector-aws/pom.xml:
##
@@ -18,8 +18,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 http://maven.apache.org/POM/4.0.0;
-xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;

Review Comment:
   reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610987543


##
flink-connector-aws-base/pom.xml:
##
@@ -76,6 +76,12 @@ under the License.
 test
 
 
+

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610987837


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * SqsSink}. More details on the internals of this sink writer may be found in 
{@link
+ * AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+SqsExceptionClassifiers
+.getResourceNotFoundExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610983272


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/testutils/SqsTestUtils.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.testutils;
+
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the 
Localstack container.
+ */
+public class SqsTestUtils {
+
+private static final ObjectMapper MAPPER = createObjectMapper();
+
+public static SqsClient createSqsClient(String endpoint, SdkHttpClient 
httpClient) {
+return AWSServicesTestUtils.createAwsSyncClient(endpoint, httpClient, 
SqsClient.builder());
+}
+
+public static DataStream getSampleDataGenerator(
+StreamExecutionEnvironment env, int endValue) {
+return env.fromSequence(1, endValue)
+.map(Object::toString)
+.returns(String.class)
+.map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", 
data)));
+}
+
+public static List getSampleData(int endValue) throws 
JsonProcessingException {
+List expectedElements = new ArrayList<>();
+for (int i = 1; i <= endValue; i++) {
+expectedElements.add(
+MAPPER.writeValueAsString(ImmutableMap.of("data", 
String.valueOf(i;
+}
+return expectedElements;
+}
+
+private static ObjectMapper createObjectMapper() {
+ObjectMapper objectMapper = new ObjectMapper();
+registerModules(objectMapper);
+return objectMapper;
+}
+
+private static void registerModules(ObjectMapper mapper) {
+mapper.registerModule(new JavaTimeModule())

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610974483


##
flink-connector-aws-base/src/test/java/org/apache/flink/connector/aws/testutils/AWSServicesTestUtils.java:
##
@@ -114,6 +116,14 @@ public static void createBucket(S3Client s3Client, String 
bucketName) {
 }
 }
 
+public static void createSqs(String sqsName, SqsClient sqsClient) {

Review Comment:
   Moved to SqsTestUtils



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610935419


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/testutils/SqsTestUtils.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink.testutils;
+
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.utils.ImmutableMap;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A set of static methods that can be used to call common AWS services on the 
Localstack container.
+ */
+public class SqsTestUtils {
+
+private static final ObjectMapper MAPPER = createObjectMapper();
+
+public static SqsClient createSqsClient(String endpoint, SdkHttpClient 
httpClient) {
+return AWSServicesTestUtils.createAwsSyncClient(endpoint, httpClient, 
SqsClient.builder());
+}
+
+public static DataStream getSampleDataGenerator(
+StreamExecutionEnvironment env, int endValue) {
+return env.fromSequence(1, endValue)
+.map(Object::toString)
+.returns(String.class)
+.map(data -> MAPPER.writeValueAsString(ImmutableMap.of("data", 
data)));
+}
+
+public static List getSampleData(int endValue) throws 
JsonProcessingException {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610935001


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {

Review Comment:
   add new test case - shouldClassifySocketTimeoutExceptionAsNonFatal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610929881


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {
+
+private final FatalExceptionClassifier classifier =
+FatalExceptionClassifier.createChain(
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier());
+
+@Test
+public void shouldClassifyNotAuthorizedAsFatal() {
+AwsServiceException sqsException =
+SqsException.builder()
+.awsErrorDetails(
+
AwsErrorDetails.builder().errorCode("NotAuthorized").build())
+.build();
+
+// isFatal returns `true` if an exception is non-fatal
+assertFalse(classifier.isFatal(sqsException, ex -> {}));

Review Comment:
   based on isFatal["isFatal returns true if an exception is non-fatal"] 
implementation, we have assertFalse here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610929610


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsExceptionClassifiersTest.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.services.sqs.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.sqs.model.SqsException;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit tests for {@link SqsExceptionClassifiers}. */
+public class SqsExceptionClassifiersTest {
+
+private final FatalExceptionClassifier classifier =
+FatalExceptionClassifier.createChain(
+
SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(),
+
SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier());
+
+@Test
+public void shouldClassifyNotAuthorizedAsFatal() {
+AwsServiceException sqsException =
+SqsException.builder()
+.awsErrorDetails(
+
AwsErrorDetails.builder().errorCode("NotAuthorized").build())
+.build();
+
+// isFatal returns `true` if an exception is non-fatal

Review Comment:
   I agree with you that it is confusing, but I am not sure why isFatal is 
implemented like this. I referred 
[this](https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/sink/throwable/AWSExceptionHandler.java#L27)
 also which says "isFatal returns `true` if an exception is non-fatal"
   
   Also I refer these test cases from existing firehose [test 
cases](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/AWSFirehoseExceptionClassifiersTest.java)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610928629


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsSinkWriterTest.java:
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.concurrent.CompletionException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Covers construction, defaults and sanity checking of {@link 
SqsSinkWriter}. */
+public class SqsSinkWriterTest {
+
+private SqsSinkWriter sinkWriter;
+
+private static final ElementConverter ELEMENT_CONVERTER_PLACEHOLDER =
+SqsSinkElementConverter.builder()
+.setSerializationSchema(new SimpleStringSchema())
+.build();
+
+@BeforeEach
+void setup() throws IOException {
+TestSinkInitContext sinkInitContext = new TestSinkInitContext();
+Properties sinkProperties = 
AWSServicesTestUtils.createConfig("https://fake_aws_endpoint;);
+SqsSink sink =
+new SqsSink<>(
+ELEMENT_CONVERTER_PLACEHOLDER,
+50,
+16,
+1,
+4 * 1024 * 1024L,
+5000L,
+1000 * 1024L,
+true,
+"sqsUrl",
+sinkProperties);
+sinkWriter = (SqsSinkWriter) 
sink.createWriter(sinkInitContext);
+}
+
+@Test
+void getSizeInBytesReturnsSizeOfBlobBeforeBase64Encoding() {
+String testString = "{many hands make light work;";
+SendMessageBatchRequestEntry record = 
SendMessageBatchRequestEntry.builder().messageBody(testString).build();
+assertThat(sinkWriter.getSizeInBytes(record))
+
.isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);

Review Comment:
   oops, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610910711


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsStateSerializer.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriterStateSerializer;
+
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/** SQS implementation {@link AsyncSinkWriterStateSerializer}. */
+@Internal
+public class SqsStateSerializer extends 
AsyncSinkWriterStateSerializer {
+@Override
+protected void serializeRequestToStream(final SendMessageBatchRequestEntry 
request, final DataOutputStream out)
+throws IOException
+{
+out.write(request.messageBody().getBytes(StandardCharsets.UTF_8));
+}
+
+@Override
+protected SendMessageBatchRequestEntry deserializeRequestFromStream(final 
long requestSize, final DataInputStream in)
+throws IOException
+{
+final byte[] requestData = new byte[(int) requestSize];
+in.read(requestData);
+return SendMessageBatchRequestEntry.builder().messageBody(new 
String(requestData, StandardCharsets.UTF_8)).build();

Review Comment:
   yes, test case updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-22 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1610896956


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsSinkITCase.java:
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Integration test suite for the {@code SqsSink} using a localstack container.
+ */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+class SqsSinkITCase {

Review Comment:
   Created "flink-connector-aws-sqs-e2e-tests" under 
"flink-connector-aws-e2e-tests" and move this test there, but I am really 
struggling with how to run that test there. Running local-stack not helping 
there and I tried to follow the readme given 
[here](https://github.com/apache/flink-connector-aws/tree/main/flink-connector-aws-e2e-tests).
 Downloaded flink-dist-1.19.0.jar and pasted under 
"flink-connector-aws-e2e-tests" folder and ran "mvn clean verify 
-Prun-end-to-end-tests -DdistDir=flink-dist-1.19.0.jar" , it is getting failed 
with error [screenshot attached]. any documentation on how to easily run these 
test will be super helpful . thanks!
   
   https://github.com/apache/flink-connector-aws/assets/169495197/406cae7a-5eae-421f-8e11-cae88051f78f;>
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-21 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1609245158


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * SqsSink}. More details on the internals of this sink writer may be found in 
{@link
+ * AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+SqsExceptionClassifiers
+.getResourceNotFoundExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-21 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1609243592


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkBuilder.java:
##
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+
+import software.amazon.awssdk.http.Protocol;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import static 
org.apache.flink.connector.aws.config.AWSConfigConstants.HTTP_PROTOCOL_VERSION;
+import static software.amazon.awssdk.http.Protocol.HTTP1_1;
+
+/**
+ * Builder to construct {@link SqsSink}.
+ *
+ * The following example shows the minimum setup to create a {@link 
SqsSink} that
+ * writes String values to a SQS named sqsUrl.
+ *
+ * {@code
+ * Properties sinkProperties = new Properties();
+ * sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+ *
+ * SqsSink sqsSink =
+ * SqsSink.builder()
+ * .setElementConverter(elementConverter)
+ * .setSqsUrl("sqsUrl")
+ * .setSqsClientProperties(sinkProperties)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * 
+ *   {@code maxBatchSize} will be 10
+ *   {@code maxInFlightRequests} will be 50
+ *   {@code maxBufferedRequests} will be 5000
+ *   {@code maxBatchSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code maxTimeInBufferMs} will be 5000ms
+ *   {@code maxRecordSizeInBytes} will be 256 KB i.e. {@code 256 * 1000}
+ *   {@code failOnError} will be false
+ * 
+ *
+ * @param  type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class SqsSinkBuilder
+extends AsyncSinkBaseBuilder> {
+
+private static final int DEFAULT_MAX_BATCH_SIZE = 10;

Review Comment:
   As per  SendMessageBatch 
[documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html#:~:text=Because%20the%20batch%20request%20can,xFFFD%20%7C%20%23x1%20to%20%23x10),
 we can at max send 10 messages in a request



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-21 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1609241620


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.io.UnsupportedEncodingException;

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-21 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1608980647


##
flink-connector-aws/flink-connector-sqs/src/test/java/org.apache.flink/connector.sqs/sink/SqsSinkITCase.java:
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.LocalstackContainer;
+import org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.DockerImageVersions;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.core.SdkSystemSetting;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.flink.connector.aws.testutils.AWSServicesTestUtils.createConfig;
+import static 
org.apache.flink.connector.sqs.sink.testutils.SqsTestUtils.createSqsClient;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Integration test suite for the {@code SqsSink} using a localstack container.
+ */
+@Testcontainers
+@ExtendWith(MiniClusterExtension.class)
+class SqsSinkITCase {

Review Comment:
   Yes, I can move this but I found all other sinks also has the same tests in 
their own package so thought of following the same convention



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-21 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1608974641


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * SqsSink}. More details on the internals of this sink writer may be found in 
{@link
+ * AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+SqsExceptionClassifiers
+.getResourceNotFoundExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-21 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1608973555


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * SqsSink}. More details on the internals of this sink writer may be found in 
{@link
+ * AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+SqsExceptionClassifiers
+.getResourceNotFoundExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-05-21 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2123460881

   > Thanks for the efforts. Could we remove the ticket link from the PR title, 
it should be automatically linked using "Autolink"
   > 
   > We need to add documentation for the new feature as well
   
   1) Updated
   2) Sure, will create a documentation on this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org