Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-09 Thread via GitHub


flinkbot commented on PR #24919:
URL: https://github.com/apache/flink/pull/24919#issuecomment-2157312543

   
   ## CI report:
   
   * 14cf0fc58dfb9553ffe703622b6223a54549aacd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-09 Thread via GitHub


ammar-master opened a new pull request, #24919:
URL: https://github.com/apache/flink/pull/24919

   
   
   
   ## What is the purpose of the change
   
   Additional configuration options for internal and REST SSL setup.
   
   
   ## Brief change log
   
   * SSL configuration option for internal and REST SSL keystore/truststore type
   * Use new option when initializing SSLHandlerFactory
   * Extend CustomSSLEngineProvider used by Pekko
   
   
   ## Verifying this change
   
 - Unit tests to ensure new configuration options can be picked up
 - Manually verified the change by running a cluster using a BKS keystore 
and truststore that is not available by default.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-09 Thread via GitHub


master-ammar closed pull request #24918: [FLINK-35371][security] Add 
configuration for SSL keystore and truststore type
URL: https://github.com/apache/flink/pull/24918


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-09 Thread via GitHub


flinkbot commented on PR #24918:
URL: https://github.com/apache/flink/pull/24918#issuecomment-2157270012

   
   ## CI report:
   
   * db7977869fe099d7cf1cf428bec048e608dbdd60 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35371) Allow the keystore and truststore type to configured for SSL

2024-06-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35371:
---
Labels: SSL pull-request-available  (was: SSL)

> Allow the keystore and truststore type to configured for SSL
> 
>
> Key: FLINK-35371
> URL: https://issues.apache.org/jira/browse/FLINK-35371
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Network
>Affects Versions: 1.19.0
>Reporter: Ammar Master
>Assignee: Ammar Master
>Priority: Minor
>  Labels: SSL, pull-request-available
>
> Flink always creates a keystore and trustore using the [default 
> type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236]
>  defined in the JDK, which in most cases is JKS.
> {code}
> KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
> {code}
> We should add other configuration options to set the type explicitly to 
> support other custom formats, and match the options provided by other 
> applications by 
> [Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS]
>  and 
> [Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format]
>  already. The default would continue to be specified by the JDK.
>  
> The SSLContext for the REST API can read the configuration option directly, 
> and we need to add extra logic to the 
> [CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java]
>  for Pekko.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]

2024-06-09 Thread via GitHub


master-ammar opened a new pull request, #24918:
URL: https://github.com/apache/flink/pull/24918

   
   
   
   
   ## What is the purpose of the change
   
   Additional configuration options for internal and REST SSL setup.
   
   
   ## Brief change log
   
   * SSL configuration option for internal and REST SSL keystore/truststore type
   * Use new option when initializing SSLHandlerFactory
   * Extend CustomSSLEngineProvider used by Pekko
   
   
   ## Verifying this change
   
 - Unit tests to ensure new configuration options can be picked up
 - Manually verified the change by running a cluster using a BKS keystore 
and truststore that is not available by default.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-06-09 Thread via GitHub


superdiaodiao commented on PR #24773:
URL: https://github.com/apache/flink/pull/24773#issuecomment-2157065404

   > Thanks for the contribution @superdiaodiao 
   > thanks for the review @HuangXingBo , @davidradl 
   > 
   > it looks ok to me, I will test it a bit more and in case of succeed will 
merge it
   
   Thank you~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


vahmed-hamdy commented on PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#issuecomment-2156823709

   @jeyhunkarimov thanks for the feedback, I added a negative test as well
   @snuyanzin Thanks for the review, All comments addressed as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]

2024-06-09 Thread via GitHub


snuyanzin commented on PR #24773:
URL: https://github.com/apache/flink/pull/24773#issuecomment-2156817722

   Thanks for the contribution @superdiaodiao 
   thanks for the review @HuangXingBo , @davidradl 
   
   it looks ok to me, I will test it a bit more and in case of succeed will 
merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35520) master can't compile as license check failed

2024-06-09 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853540#comment-17853540
 ] 

Sergey Nuyanzin commented on FLINK-35520:
-

That seem to be trickier than I thought...
I thought there was an issue only with licensing however now it looks like 
license issue is fixed however python tests were succeeded with this PR...

> master can't compile as license check failed
> 
>
> Key: FLINK-35520
> URL: https://issues.apache.org/jira/browse/FLINK-35520
> Project: Flink
>  Issue Type: Bug
>  Components: Build System / CI
>Affects Versions: 1.20.0
>Reporter: Weijie Guo
>Priority: Critical
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60024=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=45808



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


vahmed-hamdy commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632401145


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java:
##
@@ -20,12 +20,14 @@
 import com.google.api.gax.core.CredentialsProvider;
 import com.google.auth.Credentials;
 
+import java.io.Serializable;
+
 /**
  * A CredentialsProvider that simply provides the right credentials that are 
to be used for
  * connecting to an emulator. NOTE: The Google provided NoCredentials and 
NoCredentialsProvider do
  * not behave as expected. See 
https://github.com/googleapis/gax-java/issues/1148
  */
-public final class EmulatorCredentialsProvider implements CredentialsProvider {
+public final class EmulatorCredentialsProvider implements CredentialsProvider, 
Serializable {

Review Comment:
   Unlike PubSubSinkV1 we use CredentialsProvider as argument instead of 
Credentials, hence the class itself is passed to the Job and needs to be 
serialized, previously Credentials were the argument and hence this wasn't 
needed to be serialized



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


vahmed-hamdy commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632400893


##
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##
@@ -0,0 +1,114 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {

Review Comment:
   Yes, My bad. Removed now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


vahmed-hamdy commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632400806


##
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##
@@ -0,0 +1,114 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {
+
+private static final String PROJECT_ID = "test-project";
+
+private static final String TOPIC_ID = "test-topic";
+
+private static final String SUBSCRIPTION_ID = "test-subscription";
+
+private StreamExecutionEnvironment env;
+
+@Container
+private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
+new PubSubEmulatorContainer(
+
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));
+
+private PubsubHelper pubSubHelper;
+
+@BeforeEach
+void setUp() throws IOException {
+env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+pubSubHelper = new 
PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint());
+
+pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID);
+pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, 
PROJECT_ID, TOPIC_ID);
+}
+
+@AfterEach
+void tearDown() throws IOException {
+pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID);
+pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID);
+pubSubHelper.close();
+}
+
+@Test
+void pubSubSinkV2DeliversRecords() throws Exception {
+String[] elements = new String[] {"test1", "test2", "test3"};
+DataStreamSource stream =
+env.fromSource(
+new DataGeneratorSource<>(
+new FromElementsGeneratorFunction<>(
+BasicTypeInfo.STRING_TYPE_INFO, 
elements),
+elements.length,
+TypeInformation.of(String.class)),
+WatermarkStrategy.noWatermarks(),
+"DataGeneratorSource");
+
+GcpPublisherConfig gcpPublisherConfig =
+GcpPublisherConfig.builder()
+
.setCredentialsProvider(EmulatorCredentialsProvider.create())
+.setTransportChannelProvider(
+new TestChannelProvider(
+
PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()))
+.build();
+
+PubSubSinkV2 sink =
+PubSubSinkV2.builder()
+.setProjectId(PROJECT_ID)
+.setTopicId(TOPIC_ID)
+.setSerializationSchema(new SimpleStringSchema())
+.setGcpPublisherConfig(gcpPublisherConfig)
+.setFailOnError(true)
+.build();
+stream.sinkTo(sink);
+int maxNumberOfMessages = 100;

Review 

Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


snuyanzin commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632391517


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java:
##
@@ -20,12 +20,14 @@
 import com.google.api.gax.core.CredentialsProvider;
 import com.google.auth.Credentials;
 
+import java.io.Serializable;
+
 /**
  * A CredentialsProvider that simply provides the right credentials that are 
to be used for
  * connecting to an emulator. NOTE: The Google provided NoCredentials and 
NoCredentialsProvider do
  * not behave as expected. See 
https://github.com/googleapis/gax-java/issues/1148
  */
-public final class EmulatorCredentialsProvider implements CredentialsProvider {
+public final class EmulatorCredentialsProvider implements CredentialsProvider, 
Serializable {

Review Comment:
   I wonder why do we need it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


snuyanzin commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632391131


##
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##
@@ -0,0 +1,114 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {
+
+private static final String PROJECT_ID = "test-project";
+
+private static final String TOPIC_ID = "test-topic";
+
+private static final String SUBSCRIPTION_ID = "test-subscription";
+
+private StreamExecutionEnvironment env;
+
+@Container
+private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
+new PubSubEmulatorContainer(
+
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));
+
+private PubsubHelper pubSubHelper;
+
+@BeforeEach
+void setUp() throws IOException {
+env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+pubSubHelper = new 
PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint());
+
+pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID);
+pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, 
PROJECT_ID, TOPIC_ID);
+}
+
+@AfterEach
+void tearDown() throws IOException {
+pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID);
+pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID);
+pubSubHelper.close();
+}
+
+@Test
+void pubSubSinkV2DeliversRecords() throws Exception {
+String[] elements = new String[] {"test1", "test2", "test3"};
+DataStreamSource stream =
+env.fromSource(
+new DataGeneratorSource<>(
+new FromElementsGeneratorFunction<>(
+BasicTypeInfo.STRING_TYPE_INFO, 
elements),
+elements.length,
+TypeInformation.of(String.class)),
+WatermarkStrategy.noWatermarks(),
+"DataGeneratorSource");
+
+GcpPublisherConfig gcpPublisherConfig =
+GcpPublisherConfig.builder()
+
.setCredentialsProvider(EmulatorCredentialsProvider.create())
+.setTransportChannelProvider(
+new TestChannelProvider(
+
PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()))
+.build();
+
+PubSubSinkV2 sink =
+PubSubSinkV2.builder()
+.setProjectId(PROJECT_ID)
+.setTopicId(TOPIC_ID)
+.setSerializationSchema(new SimpleStringSchema())
+.setGcpPublisherConfig(gcpPublisherConfig)
+.setFailOnError(true)
+.build();
+stream.sinkTo(sink);
+int maxNumberOfMessages = 100;

Review 

Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


snuyanzin commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632390729


##
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##
@@ -0,0 +1,114 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {
+
+private static final String PROJECT_ID = "test-project";
+
+private static final String TOPIC_ID = "test-topic";
+
+private static final String SUBSCRIPTION_ID = "test-subscription";
+
+private StreamExecutionEnvironment env;
+
+@Container
+private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
+new PubSubEmulatorContainer(
+
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));
+
+private PubsubHelper pubSubHelper;
+
+@BeforeEach
+void setUp() throws IOException {
+env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+pubSubHelper = new 
PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint());
+
+pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID);
+pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, 
PROJECT_ID, TOPIC_ID);
+}
+
+@AfterEach
+void tearDown() throws IOException {
+pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID);
+pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID);
+pubSubHelper.close();
+}
+
+@Test
+void pubSubSinkV2DeliversRecords() throws Exception {
+String[] elements = new String[] {"test1", "test2", "test3"};
+DataStreamSource stream =
+env.fromSource(
+new DataGeneratorSource<>(
+new FromElementsGeneratorFunction<>(
+BasicTypeInfo.STRING_TYPE_INFO, 
elements),
+elements.length,
+TypeInformation.of(String.class)),
+WatermarkStrategy.noWatermarks(),
+"DataGeneratorSource");
+
+GcpPublisherConfig gcpPublisherConfig =
+GcpPublisherConfig.builder()
+
.setCredentialsProvider(EmulatorCredentialsProvider.create())
+.setTransportChannelProvider(
+new TestChannelProvider(
+
PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()))
+.build();
+
+PubSubSinkV2 sink =
+PubSubSinkV2.builder()
+.setProjectId(PROJECT_ID)
+.setTopicId(TOPIC_ID)
+.setSerializationSchema(new SimpleStringSchema())
+.setGcpPublisherConfig(gcpPublisherConfig)
+.setFailOnError(true)
+.build();
+stream.sinkTo(sink);
+int maxNumberOfMessages = 100;
+

Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


snuyanzin commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632390518


##
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##
@@ -0,0 +1,115 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {
+
+private static final String PROJECT_ID = "test-project";
+
+private static final String TOPIC_ID = "test-topic";
+
+private static final String SUBSCRIPTION_ID = "test-subscription";
+
+private StreamExecutionEnvironment env;
+
+@Container
+private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
+new PubSubEmulatorContainer(
+
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));
+
+private PubsubHelper pubSubHelper;
+
+@BeforeEach
+void setUp() throws IOException {
+env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);

Review Comment:
   I would suggest either to add a comment why it is 1 or use defaults 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


snuyanzin commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632390394


##
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##
@@ -0,0 +1,114 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {

Review Comment:
   `TestLogger` is Junit4 rule based class, I think we don't need to use it here
   Or did I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-09 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632385233


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java:
##
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
+import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
+import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
+import static 
org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
+
+/**
+ * Sink writer created by {@link SqsSink} to write to SQS. More details on the 
operation of this
+ * sink writer may be found in the doc for {@link SqsSink}. More details on 
the internals of this
+ * sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link SqsAsyncClient} used here may be configured in the standard 
way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class SqsSinkWriter extends AsyncSinkWriter {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SqsSinkWriter.class);
+
+private static SdkAsyncHttpClient createHttpClient(Properties 
sqsClientProperties) {
+return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties);
+}
+
+private static SqsAsyncClient createSqsClient(
+Properties sqsClientProperties, SdkAsyncHttpClient httpClient) {
+AWSGeneralUtil.validateAwsCredentials(sqsClientProperties);
+return AWSClientUtil.createAwsAsyncClient(
+sqsClientProperties,
+httpClient,
+SqsAsyncClient.builder(),
+SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT,
+SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX);
+}
+
+private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER =
+AWSExceptionHandler.withClassifier(
+FatalExceptionClassifier.createChain(
+getInterruptedExceptionClassifier(),
+getInvalidCredentialsExceptionClassifier(),
+
SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(),
+

Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-09 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632370742


##
flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsConfigConstants.java:
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.sqs.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Defaults for {@link SqsSinkWriter}. */
+@PublicEvolving
+public class SqsConfigConstants {
+
+public static final String BASE_SQS_USER_AGENT_PREFIX_FORMAT =

Review Comment:
   Sure. updated with ConfigOption



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]

2024-06-09 Thread via GitHub


jeyhunkarimov commented on PR #24725:
URL: https://github.com/apache/flink/pull/24725#issuecomment-2156736006

   Hi @reswqa thanks a lot for the review. Could you please do another pass in 
your available time? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]

2024-06-09 Thread via GitHub


LadyForest commented on code in PR #24889:
URL: https://github.com/apache/flink/pull/24889#discussion_r1632297334


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/LookupJoinHintOptions.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds {@link org.apache.flink.configuration.ConfigOption}s used 
by lookup join hint.
+ */
+@PublicEvolving
+public class LookupJoinHintOptions {
+
+public static final ConfigOption LOOKUP_TABLE =
+key("lookup-table")

Review Comment:
   It makes sense to me 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-34108) Add URL_ENCODE and URL_DECODE function

2024-06-09 Thread chesterxu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853489#comment-17853489
 ] 

chesterxu edited comment on FLINK-34108 at 6/9/24 12:34 PM:


[~martijnvisser]  Please take a CR on the latest code, thanks!


was (Author: JIRAUSER302535):
[~martijnvisser]  Please take a CR, thanks!

> Add URL_ENCODE and URL_DECODE function
> --
>
> Key: FLINK-34108
> URL: https://issues.apache.org/jira/browse/FLINK-34108
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Martijn Visser
>Assignee: chesterxu
>Priority: Major
>  Labels: pull-request-available
>
> Add URL_ENCODE and URL_DECODE function
> URL_ENCODE(str) - Translates a string into 
> 'application/x-www-form-urlencoded' format using a specific encoding scheme. 
> URL_DECODE(str) - Decodes a string in 'application/x-www-form-urlencoded' 
> format using a specific encoding scheme. 
> Related ticket from Calcite: CALCITE-5825



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34108) Add URL_ENCODE and URL_DECODE function

2024-06-09 Thread chesterxu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853489#comment-17853489
 ] 

chesterxu commented on FLINK-34108:
---

[~martijnvisser]  Please take a CR, thanks!

> Add URL_ENCODE and URL_DECODE function
> --
>
> Key: FLINK-34108
> URL: https://issues.apache.org/jira/browse/FLINK-34108
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Martijn Visser
>Assignee: chesterxu
>Priority: Major
>  Labels: pull-request-available
>
> Add URL_ENCODE and URL_DECODE function
> URL_ENCODE(str) - Translates a string into 
> 'application/x-www-form-urlencoded' format using a specific encoding scheme. 
> URL_DECODE(str) - Decodes a string in 'application/x-www-form-urlencoded' 
> format using a specific encoding scheme. 
> Related ticket from Calcite: CALCITE-5825



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern

2024-06-09 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:

Description: 
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
return true;
}
}{code}


I'd be happy to implement this feature, if we can reach on agreement. 
Thanks.

  was:
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
return true;
}
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.


> Add query validator support to flink sql gateway via spi pattern
> 

[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern

2024-06-09 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:

Description: 
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
}
return true;
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.

  was:
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:sql}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
}
return true;
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.


> Add query validator support to flink sql gateway via spi pattern
> 

[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern

2024-06-09 Thread dongwoo.kim (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dongwoo.kim updated FLINK-35560:

Description: 
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
return true;
}
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.

  was:
h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
}
return true;
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.


> Add query validator support to flink sql gateway via spi pattern
> 

[jira] [Created] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern

2024-06-09 Thread dongwoo.kim (Jira)
dongwoo.kim created FLINK-35560:
---

 Summary: Add query validator support to flink sql gateway via spi 
pattern
 Key: FLINK-35560
 URL: https://issues.apache.org/jira/browse/FLINK-35560
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Gateway
Reporter: dongwoo.kim


h3. Summary

Hello I'd like to suggest query validator support in flink sql gateway via spi 
pattern.

As an sql gateway operator, there is need for query validation to only execute 
safe queries and drop unsafe queries. 
To address this need, I propose adding a {{QueryValidator}} interface in flink 
sql gateway api package. 
This interface will allow users to implement their own query validation logic, 
providing benefits to flink sql gateway operators.
h3. Interface

Below is a draft for the interface.
It takes Operation and check whether the query is valid or not.
{code:java}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.annotation.Public;
import org.apache.flink.table.operations.Operation;

/**
 * Interface for implementing a validator that checks the safety of executing 
queries.
 */
@Public
public interface QueryValidator {     
boolean validateQuery(Operation op);
}
{code}
h3. Example implementation

Below is an example implementation that inspects Kafka table options, 
specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value 
is too small, which can cause high disk I/O load.
{code:sql}
package org.apache.flink.table.gateway.api.validator;

import org.apache.flink.table.gateway.api.validator.QueryValidator;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

public class KafkaTimestampValidator implements QueryValidator {

private static final long ONE_DAY = 24 * 60 * 60 * 1000L; 

@Override
public boolean validateQuery(Operation op) {
if (op instanceof CreateTableOperation) {
CreateTableOperation createTableOp = (CreateTableOperation) op;
String connector = 
createTableOp.getCatalogTable().getOptions().get("connector");
if ("kafka".equals(connector)) {
String startupTimestamp = 
createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis");
if (startupTimestamp != null && 
Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) {
return false;
}
}
}
}
return true;
}{code}

I'd be happy to implement this feature, if we can reach on agreement. 
Thanks
h4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


vahmed-hamdy commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632243987


##
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java:
##
@@ -36,27 +40,50 @@
 import com.google.pubsub.v1.ReceivedMessage;
 import com.google.pubsub.v1.Topic;
 import com.google.pubsub.v1.TopicName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /** A helper class to make managing the testing topics a bit easier. */
 public class PubsubHelper {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(PubsubHelper.class);
 
+private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(5);
+
+private ManagedChannel channel;
+
 private TransportChannelProvider channelProvider;
 
 private TopicAdminClient topicClient;
+
 private SubscriptionAdminClient subscriptionAdminClient;
 
+public PubsubHelper(String endpoint) {
+channel = 
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
+channelProvider =
+
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));

Review Comment:
   I am not following we are creating channel and channelProvider here, why do 
we need to check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


vahmed-hamdy commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632243692


##
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java:
##
@@ -36,27 +40,50 @@
 import com.google.pubsub.v1.ReceivedMessage;
 import com.google.pubsub.v1.Topic;
 import com.google.pubsub.v1.TopicName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /** A helper class to make managing the testing topics a bit easier. */
 public class PubsubHelper {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(PubsubHelper.class);
 
+private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(5);
+
+private ManagedChannel channel;

Review Comment:
   We hace 2 different ways for channel creation, one is by setting the channel 
and other (used in old tests) uses channel provider. making this final will 
force us to create a channel in all constructors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


vahmed-hamdy commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632243045


##
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##
@@ -0,0 +1,115 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {
+
+private static final String PROJECT_ID = "test-project";
+
+private static final String TOPIC_ID = "test-topic";
+
+private static final String SUBSCRIPTION_ID = "test-subscription";
+
+private StreamExecutionEnvironment env;
+
+@Container
+private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
+new PubSubEmulatorContainer(
+
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));
+
+private PubsubHelper pubSubHelper;
+
+@BeforeEach
+void setUp() throws IOException {
+env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);

Review Comment:
   We are not testing anything that needs higher parallelism in this scenario, 
is there a reason to use anything different?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]

2024-06-09 Thread via GitHub


vahmed-hamdy commented on code in PR #28:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632242928


##
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java:
##
@@ -0,0 +1,115 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.TestLogger;
+
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests extends TestLogger {
+
+private static final String PROJECT_ID = "test-project";
+
+private static final String TOPIC_ID = "test-topic";
+
+private static final String SUBSCRIPTION_ID = "test-subscription";
+
+private StreamExecutionEnvironment env;
+
+@Container
+private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
+new PubSubEmulatorContainer(
+
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));
+
+private PubsubHelper pubSubHelper;
+
+@BeforeEach
+void setUp() throws IOException {
+env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.setParallelism(1);
+pubSubHelper = new 
PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint());
+
+pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID);
+pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, 
PROJECT_ID, TOPIC_ID);
+}
+
+@AfterEach
+void tearDown() throws IOException {
+pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID);
+pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID);
+pubSubHelper.close();
+}
+
+@Test
+void pubSubSinkV2DeliversRecords() throws Exception {

Review Comment:
   There is no exception expected, any exception in this test should fail the 
test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-09 Thread via GitHub


nicusX commented on PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-2156438875

   Addressed the latest comments and ported AMP signer to AWS SDKv2 (!)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19

2024-06-09 Thread Tamir Sagi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853470#comment-17853470
 ] 

Tamir Sagi commented on FLINK-35138:


Thank you Danny! highly appreciated it.

> Release flink-connector-kafka v3.2.0 for Flink 1.19
> ---
>
> Key: FLINK-35138
> URL: https://issues.apache.org/jira/browse/FLINK-35138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.2.0
>
>
> https://github.com/apache/flink-connector-kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19

2024-06-09 Thread Tamir Sagi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853470#comment-17853470
 ] 

Tamir Sagi edited comment on FLINK-35138 at 6/9/24 7:45 AM:


Thank you Danny! highly appreciated.


was (Author: JIRAUSER283777):
Thank you Danny! highly appreciated it.

> Release flink-connector-kafka v3.2.0 for Flink 1.19
> ---
>
> Key: FLINK-35138
> URL: https://issues.apache.org/jira/browse/FLINK-35138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.2.0
>
>
> https://github.com/apache/flink-connector-kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19

2024-06-09 Thread Tamir Sagi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853471#comment-17853471
 ] 

Tamir Sagi commented on FLINK-35138:


Thank you Danny! highly appreciated.

> Release flink-connector-kafka v3.2.0 for Flink 1.19
> ---
>
> Key: FLINK-35138
> URL: https://issues.apache.org/jira/browse/FLINK-35138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.2.0
>
>
> https://github.com/apache/flink-connector-kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19

2024-06-09 Thread Tamir Sagi (Jira)


[ https://issues.apache.org/jira/browse/FLINK-35138 ]


Tamir Sagi deleted comment on FLINK-35138:


was (Author: JIRAUSER283777):
Thank you Danny! highly appreciated.

> Release flink-connector-kafka v3.2.0 for Flink 1.19
> ---
>
> Key: FLINK-35138
> URL: https://issues.apache.org/jira/browse/FLINK-35138
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Kafka
>Reporter: Danny Cranmer
>Assignee: Danny Cranmer
>Priority: Major
>  Labels: pull-request-available
> Fix For: kafka-3.2.0
>
>
> https://github.com/apache/flink-connector-kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-09 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632187848


##
README.md:
##
@@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework 
with powerful stream-
 
 Learn more about Flink at 
[https://flink.apache.org/](https://flink.apache.org/)
 
+## Modules
+
+This repository contains the following modules
+
+* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector 
implementation; supports optional request signer
+* [Sample application](./example-datastream-job): Sample application showing 
the usage of the connector with DataStream API. It also demonstrates how to 
configure the request signer. 

Review Comment:
   I don't think an integration test is useful for a user. And the lack of 
proper working examples is a common obstacle to the adoption for users new to 
Flink.
   This module is not supposed to be released in any form. Can't we just keep 
it as source, and possibly reference from the documentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-09 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632194796


##
amp-request-signer/README.md:
##
@@ -0,0 +1,31 @@
+## Request Signer for Amazon Managed Prometheus (AMP)
+
+Request signer implementation for Amazon Managed Prometheus (AMP)
+
+The signer retrieves AWS credentials using 
`com.amazonaws.auth.DefaultAWSCredentialsProviderChain` and automatically
+supports session credentials.
+
+The Flink application requires `RemoteWrite` permissions to the AMP workspace 
(e.g. `AmazonPromethusRemoteWriteAccess`
+policy).
+
+### Sample usage
+
+To enable request signing for Amazon Managed Prometheus, and instance of 
`AmazonManagedPrometheusWriteRequestSigner`
+must

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-09 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632194645


##
README.md:
##
@@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework 
with powerful stream-
 
 Learn more about Flink at 
[https://flink.apache.org/](https://flink.apache.org/)
 
+## Modules
+
+This repository contains the following modules
+
+* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector 
implementation; supports optional request signer
+* [Sample application](./example-datastream-job): Sample application showing 
the usage of the connector with DataStream API. It also demonstrates how to 
configure the request signer. 
+* [Amazon Managed Prometheus Request Signer](./amp-request-signer): 
Implementation of request signer for Amazon Managed Prometheus (AMP)

Review Comment:
   It was actually requested earlier in the PR to make the module name fully 
qualified, prepending `flink-connector-prometheus-`.
   I am renaming the module to `flink-connector-prometheus-request-signer-amp` 
to make it fully qualified and retaining the logical order, general to specific



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-09 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632188645


##
README.md:
##
@@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework 
with powerful stream-
 
 Learn more about Flink at 
[https://flink.apache.org/](https://flink.apache.org/)
 
+## Modules
+
+This repository contains the following modules
+
+* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector 
implementation; supports optional request signer

Review Comment:
   Renamed subdirectories to match artifact names



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]

2024-06-09 Thread via GitHub


nicusX commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632187848


##
README.md:
##
@@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework 
with powerful stream-
 
 Learn more about Flink at 
[https://flink.apache.org/](https://flink.apache.org/)
 
+## Modules
+
+This repository contains the following modules
+
+* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector 
implementation; supports optional request signer
+* [Sample application](./example-datastream-job): Sample application showing 
the usage of the connector with DataStream API. It also demonstrates how to 
configure the request signer. 

Review Comment:
   I don't think an integration test is useful for a user. And the lack of 
proper working examples is a common obstacle to the adoption from new to Flink.
   This module is not supposed to be released in any form. Can't we just keep 
it as source, and possibly reference from the documentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-09 Thread via GitHub


19priyadhingra commented on PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2156354857

   > It'd also be great to mention what permissions are needed for this 
connector to work. E.g., is `sqs:SendMessage` sufficient?
   
   Good point!, Yes, `sqs:SendMessage`  is sufficient, updated the same in the 
documentation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35453][core] StreamReader Charset fix with UTF8 in core files [flink]

2024-06-09 Thread via GitHub


xuzifu666 commented on PR #24842:
URL: https://github.com/apache/flink/pull/24842#issuecomment-2156347384

   > Hi @xuzifu666 thanks for driving this PR. Could you please update the 
intro section of this PR and add tests for your changes? Thanks
   
   @jeyhunkarimov Hi,it is not fitable for adding test,maybe we can refer other 
changes in hive or spark? follows is some prs refer to it: 
hive(https://github.com/apache/hive/pull/5243),spark(https://github.com/apache/spark/pull/46509),expect
 for your next review, Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-09 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632183182


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,122 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests
+Flink : Connectors : AWS : E2E Tests : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test
+test-jar
+
+
+com.typesafe.netty
+netty-reactive-streams-http
+
+
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+test-jar
+
+
+com.typesafe.netty
+netty-reactive-streams-http
+
+

Review Comment:
   Unnecessary. Removed in next revision



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]

2024-06-09 Thread via GitHub


19priyadhingra commented on code in PR #141:
URL: 
https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632182646


##
flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml:
##
@@ -0,0 +1,122 @@
+
+
+
+http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+flink-connector-aws-e2e-tests-parent
+org.apache.flink
+4.3-SNAPSHOT
+
+
+4.0.0
+
+flink-connector-aws-sqs-e2e-tests
+Flink : Connectors : AWS : E2E Tests : Amazon SQS
+jar
+
+
+
+org.apache.flink
+flink-streaming-java
+${flink.version}
+test
+
+
+
+org.apache.flink
+flink-connector-sqs
+${project.version}
+test
+
+
+
+org.apache.flink
+flink-connector-aws-base
+${project.version}
+test
+test-jar
+
+
+com.typesafe.netty
+netty-reactive-streams-http
+
+

Review Comment:
   Unnecessary. Removed in next revision



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org