Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
flinkbot commented on PR #24919: URL: https://github.com/apache/flink/pull/24919#issuecomment-2157312543 ## CI report: * 14cf0fc58dfb9553ffe703622b6223a54549aacd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
ammar-master opened a new pull request, #24919: URL: https://github.com/apache/flink/pull/24919 ## What is the purpose of the change Additional configuration options for internal and REST SSL setup. ## Brief change log * SSL configuration option for internal and REST SSL keystore/truststore type * Use new option when initializing SSLHandlerFactory * Extend CustomSSLEngineProvider used by Pekko ## Verifying this change - Unit tests to ensure new configuration options can be picked up - Manually verified the change by running a cluster using a BKS keystore and truststore that is not available by default. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
master-ammar closed pull request #24918: [FLINK-35371][security] Add configuration for SSL keystore and truststore type URL: https://github.com/apache/flink/pull/24918 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
flinkbot commented on PR #24918: URL: https://github.com/apache/flink/pull/24918#issuecomment-2157270012 ## CI report: * db7977869fe099d7cf1cf428bec048e608dbdd60 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35371) Allow the keystore and truststore type to configured for SSL
[ https://issues.apache.org/jira/browse/FLINK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-35371: --- Labels: SSL pull-request-available (was: SSL) > Allow the keystore and truststore type to configured for SSL > > > Key: FLINK-35371 > URL: https://issues.apache.org/jira/browse/FLINK-35371 > Project: Flink > Issue Type: Improvement > Components: Runtime / Network >Affects Versions: 1.19.0 >Reporter: Ammar Master >Assignee: Ammar Master >Priority: Minor > Labels: SSL, pull-request-available > > Flink always creates a keystore and trustore using the [default > type|https://github.com/apache/flink/blob/b87ead743dca161cdae8a1fef761954d206b81fb/flink-runtime/src/main/java/org/apache/flink/runtime/net/SSLUtils.java#L236] > defined in the JDK, which in most cases is JKS. > {code} > KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); > {code} > We should add other configuration options to set the type explicitly to > support other custom formats, and match the options provided by other > applications by > [Spark|https://spark.apache.org/docs/latest/security.html#:~:text=the%20key%20store.-,%24%7Bns%7D.keyStoreType,-JKS] > and > [Kafka|https://kafka.apache.org/documentation/#:~:text=per%2Dbroker-,ssl.keystore.type,-The%20file%20format] > already. The default would continue to be specified by the JDK. > > The SSLContext for the REST API can read the configuration option directly, > and we need to add extra logic to the > [CustomSSLEngineProvider|https://github.com/apache/flink/blob/master/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/CustomSSLEngineProvider.java] > for Pekko. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35371][security] Add configuration for SSL keystore and truststore type [flink]
master-ammar opened a new pull request, #24918: URL: https://github.com/apache/flink/pull/24918 ## What is the purpose of the change Additional configuration options for internal and REST SSL setup. ## Brief change log * SSL configuration option for internal and REST SSL keystore/truststore type * Use new option when initializing SSLHandlerFactory * Extend CustomSSLEngineProvider used by Pekko ## Verifying this change - Unit tests to ensure new configuration options can be picked up - Manually verified the change by running a cluster using a BKS keystore and truststore that is not available by default. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
superdiaodiao commented on PR #24773: URL: https://github.com/apache/flink/pull/24773#issuecomment-2157065404 > Thanks for the contribution @superdiaodiao > thanks for the review @HuangXingBo , @davidradl > > it looks ok to me, I will test it a bit more and in case of succeed will merge it Thank you~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#issuecomment-2156823709 @jeyhunkarimov thanks for the feedback, I added a negative test as well @snuyanzin Thanks for the review, All comments addressed as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34108][table] Add built-in URL_ENCODE and URL_DECODE function. [flink]
snuyanzin commented on PR #24773: URL: https://github.com/apache/flink/pull/24773#issuecomment-2156817722 Thanks for the contribution @superdiaodiao thanks for the review @HuangXingBo , @davidradl it looks ok to me, I will test it a bit more and in case of succeed will merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35520) master can't compile as license check failed
[ https://issues.apache.org/jira/browse/FLINK-35520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853540#comment-17853540 ] Sergey Nuyanzin commented on FLINK-35520: - That seem to be trickier than I thought... I thought there was an issue only with licensing however now it looks like license issue is fixed however python tests were succeeded with this PR... > master can't compile as license check failed > > > Key: FLINK-35520 > URL: https://issues.apache.org/jira/browse/FLINK-35520 > Project: Flink > Issue Type: Bug > Components: Build System / CI >Affects Versions: 1.20.0 >Reporter: Weijie Guo >Priority: Critical > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=60024=logs=52b61abe-a3cc-5bde-cc35-1bbe89bb7df5=54421a62-0c80-5aad-3319-094ff69180bb=45808 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632401145 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java: ## @@ -20,12 +20,14 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.auth.Credentials; +import java.io.Serializable; + /** * A CredentialsProvider that simply provides the right credentials that are to be used for * connecting to an emulator. NOTE: The Google provided NoCredentials and NoCredentialsProvider do * not behave as expected. See https://github.com/googleapis/gax-java/issues/1148 */ -public final class EmulatorCredentialsProvider implements CredentialsProvider { +public final class EmulatorCredentialsProvider implements CredentialsProvider, Serializable { Review Comment: Unlike PubSubSinkV1 we use CredentialsProvider as argument instead of Credentials, hence the class itself is passed to the Job and needs to be serialized, previously Credentials were the argument and hence this wasn't needed to be serialized -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632400893 ## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ## @@ -0,0 +1,114 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +class PubSubSinkV2ITTests extends TestLogger { Review Comment: Yes, My bad. Removed now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632400806 ## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ## @@ -0,0 +1,114 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +class PubSubSinkV2ITTests extends TestLogger { + +private static final String PROJECT_ID = "test-project"; + +private static final String TOPIC_ID = "test-topic"; + +private static final String SUBSCRIPTION_ID = "test-subscription"; + +private StreamExecutionEnvironment env; + +@Container +private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = +new PubSubEmulatorContainer( + DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR)); + +private PubsubHelper pubSubHelper; + +@BeforeEach +void setUp() throws IOException { +env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +pubSubHelper = new PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()); + +pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID); +pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, PROJECT_ID, TOPIC_ID); +} + +@AfterEach +void tearDown() throws IOException { +pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID); +pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID); +pubSubHelper.close(); +} + +@Test +void pubSubSinkV2DeliversRecords() throws Exception { +String[] elements = new String[] {"test1", "test2", "test3"}; +DataStreamSource stream = +env.fromSource( +new DataGeneratorSource<>( +new FromElementsGeneratorFunction<>( +BasicTypeInfo.STRING_TYPE_INFO, elements), +elements.length, +TypeInformation.of(String.class)), +WatermarkStrategy.noWatermarks(), +"DataGeneratorSource"); + +GcpPublisherConfig gcpPublisherConfig = +GcpPublisherConfig.builder() + .setCredentialsProvider(EmulatorCredentialsProvider.create()) +.setTransportChannelProvider( +new TestChannelProvider( + PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint())) +.build(); + +PubSubSinkV2 sink = +PubSubSinkV2.builder() +.setProjectId(PROJECT_ID) +.setTopicId(TOPIC_ID) +.setSerializationSchema(new SimpleStringSchema()) +.setGcpPublisherConfig(gcpPublisherConfig) +.setFailOnError(true) +.build(); +stream.sinkTo(sink); +int maxNumberOfMessages = 100; Review
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632391517 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java: ## @@ -20,12 +20,14 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.auth.Credentials; +import java.io.Serializable; + /** * A CredentialsProvider that simply provides the right credentials that are to be used for * connecting to an emulator. NOTE: The Google provided NoCredentials and NoCredentialsProvider do * not behave as expected. See https://github.com/googleapis/gax-java/issues/1148 */ -public final class EmulatorCredentialsProvider implements CredentialsProvider { +public final class EmulatorCredentialsProvider implements CredentialsProvider, Serializable { Review Comment: I wonder why do we need it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632391131 ## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ## @@ -0,0 +1,114 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +class PubSubSinkV2ITTests extends TestLogger { + +private static final String PROJECT_ID = "test-project"; + +private static final String TOPIC_ID = "test-topic"; + +private static final String SUBSCRIPTION_ID = "test-subscription"; + +private StreamExecutionEnvironment env; + +@Container +private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = +new PubSubEmulatorContainer( + DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR)); + +private PubsubHelper pubSubHelper; + +@BeforeEach +void setUp() throws IOException { +env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +pubSubHelper = new PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()); + +pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID); +pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, PROJECT_ID, TOPIC_ID); +} + +@AfterEach +void tearDown() throws IOException { +pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID); +pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID); +pubSubHelper.close(); +} + +@Test +void pubSubSinkV2DeliversRecords() throws Exception { +String[] elements = new String[] {"test1", "test2", "test3"}; +DataStreamSource stream = +env.fromSource( +new DataGeneratorSource<>( +new FromElementsGeneratorFunction<>( +BasicTypeInfo.STRING_TYPE_INFO, elements), +elements.length, +TypeInformation.of(String.class)), +WatermarkStrategy.noWatermarks(), +"DataGeneratorSource"); + +GcpPublisherConfig gcpPublisherConfig = +GcpPublisherConfig.builder() + .setCredentialsProvider(EmulatorCredentialsProvider.create()) +.setTransportChannelProvider( +new TestChannelProvider( + PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint())) +.build(); + +PubSubSinkV2 sink = +PubSubSinkV2.builder() +.setProjectId(PROJECT_ID) +.setTopicId(TOPIC_ID) +.setSerializationSchema(new SimpleStringSchema()) +.setGcpPublisherConfig(gcpPublisherConfig) +.setFailOnError(true) +.build(); +stream.sinkTo(sink); +int maxNumberOfMessages = 100; Review
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632390729 ## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ## @@ -0,0 +1,114 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +class PubSubSinkV2ITTests extends TestLogger { + +private static final String PROJECT_ID = "test-project"; + +private static final String TOPIC_ID = "test-topic"; + +private static final String SUBSCRIPTION_ID = "test-subscription"; + +private StreamExecutionEnvironment env; + +@Container +private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = +new PubSubEmulatorContainer( + DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR)); + +private PubsubHelper pubSubHelper; + +@BeforeEach +void setUp() throws IOException { +env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +pubSubHelper = new PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()); + +pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID); +pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, PROJECT_ID, TOPIC_ID); +} + +@AfterEach +void tearDown() throws IOException { +pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID); +pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID); +pubSubHelper.close(); +} + +@Test +void pubSubSinkV2DeliversRecords() throws Exception { +String[] elements = new String[] {"test1", "test2", "test3"}; +DataStreamSource stream = +env.fromSource( +new DataGeneratorSource<>( +new FromElementsGeneratorFunction<>( +BasicTypeInfo.STRING_TYPE_INFO, elements), +elements.length, +TypeInformation.of(String.class)), +WatermarkStrategy.noWatermarks(), +"DataGeneratorSource"); + +GcpPublisherConfig gcpPublisherConfig = +GcpPublisherConfig.builder() + .setCredentialsProvider(EmulatorCredentialsProvider.create()) +.setTransportChannelProvider( +new TestChannelProvider( + PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint())) +.build(); + +PubSubSinkV2 sink = +PubSubSinkV2.builder() +.setProjectId(PROJECT_ID) +.setTopicId(TOPIC_ID) +.setSerializationSchema(new SimpleStringSchema()) +.setGcpPublisherConfig(gcpPublisherConfig) +.setFailOnError(true) +.build(); +stream.sinkTo(sink); +int maxNumberOfMessages = 100; +
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632390518 ## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ## @@ -0,0 +1,115 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +class PubSubSinkV2ITTests extends TestLogger { + +private static final String PROJECT_ID = "test-project"; + +private static final String TOPIC_ID = "test-topic"; + +private static final String SUBSCRIPTION_ID = "test-subscription"; + +private StreamExecutionEnvironment env; + +@Container +private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = +new PubSubEmulatorContainer( + DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR)); + +private PubsubHelper pubSubHelper; + +@BeforeEach +void setUp() throws IOException { +env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); Review Comment: I would suggest either to add a comment why it is 1 or use defaults -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632390394 ## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ## @@ -0,0 +1,114 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +class PubSubSinkV2ITTests extends TestLogger { Review Comment: `TestLogger` is Junit4 rule based class, I think we don't need to use it here Or did I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632385233 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsSinkWriter.java: ## @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier; +import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter; +import org.apache.flink.connector.base.sink.writer.BufferedRequestState; +import org.apache.flink.connector.base.sink.writer.ElementConverter; +import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.async.SdkAsyncHttpClient; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier; +import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier; +import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier; + +/** + * Sink writer created by {@link SqsSink} to write to SQS. More details on the operation of this + * sink writer may be found in the doc for {@link SqsSink}. More details on the internals of this + * sink writer may be found in {@link AsyncSinkWriter}. + * + * The {@link SqsAsyncClient} used here may be configured in the standard way for the AWS SDK + * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code + * AWS_SECRET_ACCESS_KEY} through environment variables etc. + */ +@Internal +class SqsSinkWriter extends AsyncSinkWriter { + +private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class); + +private static SdkAsyncHttpClient createHttpClient(Properties sqsClientProperties) { +return AWSGeneralUtil.createAsyncHttpClient(sqsClientProperties); +} + +private static SqsAsyncClient createSqsClient( +Properties sqsClientProperties, SdkAsyncHttpClient httpClient) { +AWSGeneralUtil.validateAwsCredentials(sqsClientProperties); +return AWSClientUtil.createAwsAsyncClient( +sqsClientProperties, +httpClient, +SqsAsyncClient.builder(), +SqsConfigConstants.BASE_SQS_USER_AGENT_PREFIX_FORMAT, +SqsConfigConstants.SQS_CLIENT_USER_AGENT_PREFIX); +} + +private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = +AWSExceptionHandler.withClassifier( +FatalExceptionClassifier.createChain( +getInterruptedExceptionClassifier(), +getInvalidCredentialsExceptionClassifier(), + SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(), +
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632370742 ## flink-connector-aws/flink-connector-sqs/src/main/java/org.apache.flink.connector.sqs/sink/SqsConfigConstants.java: ## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.sqs.sink; + +import org.apache.flink.annotation.PublicEvolving; + +/** Defaults for {@link SqsSinkWriter}. */ +@PublicEvolving +public class SqsConfigConstants { + +public static final String BASE_SQS_USER_AGENT_PREFIX_FORMAT = Review Comment: Sure. updated with ConfigOption -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34977][API] Introduce State Access on DataStream API V2 [flink]
jeyhunkarimov commented on PR #24725: URL: https://github.com/apache/flink/pull/24725#issuecomment-2156736006 Hi @reswqa thanks a lot for the review. Could you please do another pass in your available time? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Flink-35473][table] Improve Table/SQL Configuration for Flink 2.0 [flink]
LadyForest commented on code in PR #24889: URL: https://github.com/apache/flink/pull/24889#discussion_r1632297334 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/LookupJoinHintOptions.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; + +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet; + +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * This class holds {@link org.apache.flink.configuration.ConfigOption}s used by lookup join hint. + */ +@PublicEvolving +public class LookupJoinHintOptions { + +public static final ConfigOption LOOKUP_TABLE = +key("lookup-table") Review Comment: It makes sense to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-34108) Add URL_ENCODE and URL_DECODE function
[ https://issues.apache.org/jira/browse/FLINK-34108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853489#comment-17853489 ] chesterxu edited comment on FLINK-34108 at 6/9/24 12:34 PM: [~martijnvisser] Please take a CR on the latest code, thanks! was (Author: JIRAUSER302535): [~martijnvisser] Please take a CR, thanks! > Add URL_ENCODE and URL_DECODE function > -- > > Key: FLINK-34108 > URL: https://issues.apache.org/jira/browse/FLINK-34108 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Martijn Visser >Assignee: chesterxu >Priority: Major > Labels: pull-request-available > > Add URL_ENCODE and URL_DECODE function > URL_ENCODE(str) - Translates a string into > 'application/x-www-form-urlencoded' format using a specific encoding scheme. > URL_DECODE(str) - Decodes a string in 'application/x-www-form-urlencoded' > format using a specific encoding scheme. > Related ticket from Calcite: CALCITE-5825 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34108) Add URL_ENCODE and URL_DECODE function
[ https://issues.apache.org/jira/browse/FLINK-34108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853489#comment-17853489 ] chesterxu commented on FLINK-34108: --- [~martijnvisser] Please take a CR, thanks! > Add URL_ENCODE and URL_DECODE function > -- > > Key: FLINK-34108 > URL: https://issues.apache.org/jira/browse/FLINK-34108 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Martijn Visser >Assignee: chesterxu >Priority: Major > Labels: pull-request-available > > Add URL_ENCODE and URL_DECODE function > URL_ENCODE(str) - Translates a string into > 'application/x-www-form-urlencoded' format using a specific encoding scheme. > URL_DECODE(str) - Decodes a string in 'application/x-www-form-urlencoded' > format using a specific encoding scheme. > Related ticket from Calcite: CALCITE-5825 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern
[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-35560: Description: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } return true; } }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks. was: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } return true; } }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. > Add query validator support to flink sql gateway via spi pattern >
[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern
[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-35560: Description: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } } return true; }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. was: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:sql} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } } return true; }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. > Add query validator support to flink sql gateway via spi pattern >
[jira] [Updated] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern
[ https://issues.apache.org/jira/browse/FLINK-35560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dongwoo.kim updated FLINK-35560: Description: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } return true; } }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. was: h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } } return true; }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. > Add query validator support to flink sql gateway via spi pattern >
[jira] [Created] (FLINK-35560) Add query validator support to flink sql gateway via spi pattern
dongwoo.kim created FLINK-35560: --- Summary: Add query validator support to flink sql gateway via spi pattern Key: FLINK-35560 URL: https://issues.apache.org/jira/browse/FLINK-35560 Project: Flink Issue Type: Improvement Components: Table SQL / Gateway Reporter: dongwoo.kim h3. Summary Hello I'd like to suggest query validator support in flink sql gateway via spi pattern. As an sql gateway operator, there is need for query validation to only execute safe queries and drop unsafe queries. To address this need, I propose adding a {{QueryValidator}} interface in flink sql gateway api package. This interface will allow users to implement their own query validation logic, providing benefits to flink sql gateway operators. h3. Interface Below is a draft for the interface. It takes Operation and check whether the query is valid or not. {code:java} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.annotation.Public; import org.apache.flink.table.operations.Operation; /** * Interface for implementing a validator that checks the safety of executing queries. */ @Public public interface QueryValidator { boolean validateQuery(Operation op); } {code} h3. Example implementation Below is an example implementation that inspects Kafka table options, specifically {{{}scan.startup.timestamp-millis{}}}, and reject when the value is too small, which can cause high disk I/O load. {code:sql} package org.apache.flink.table.gateway.api.validator; import org.apache.flink.table.gateway.api.validator.QueryValidator; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateTableOperation; public class KafkaTimestampValidator implements QueryValidator { private static final long ONE_DAY = 24 * 60 * 60 * 1000L; @Override public boolean validateQuery(Operation op) { if (op instanceof CreateTableOperation) { CreateTableOperation createTableOp = (CreateTableOperation) op; String connector = createTableOp.getCatalogTable().getOptions().get("connector"); if ("kafka".equals(connector)) { String startupTimestamp = createTableOp.getCatalogTable().getOptions().get("scan.startup.timestamp-millis"); if (startupTimestamp != null && Long.parseLong(startupTimestamp) < System.currentTimeMillis() - ONE_DAY) { return false; } } } } return true; }{code} I'd be happy to implement this feature, if we can reach on agreement. Thanks h4. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632243987 ## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java: ## @@ -36,27 +40,50 @@ import com.google.pubsub.v1.ReceivedMessage; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** A helper class to make managing the testing topics a bit easier. */ public class PubsubHelper { private static final Logger LOG = LoggerFactory.getLogger(PubsubHelper.class); +private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(5); + +private ManagedChannel channel; + private TransportChannelProvider channelProvider; private TopicAdminClient topicClient; + private SubscriptionAdminClient subscriptionAdminClient; +public PubsubHelper(String endpoint) { +channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build(); +channelProvider = + FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); Review Comment: I am not following we are creating channel and channelProvider here, why do we need to check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632243692 ## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java: ## @@ -36,27 +40,50 @@ import com.google.pubsub.v1.ReceivedMessage; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** A helper class to make managing the testing topics a bit easier. */ public class PubsubHelper { private static final Logger LOG = LoggerFactory.getLogger(PubsubHelper.class); +private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(5); + +private ManagedChannel channel; Review Comment: We hace 2 different ways for channel creation, one is by setting the channel and other (used in old tests) uses channel provider. making this final will force us to create a channel in all constructors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632243045 ## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ## @@ -0,0 +1,115 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +class PubSubSinkV2ITTests extends TestLogger { + +private static final String PROJECT_ID = "test-project"; + +private static final String TOPIC_ID = "test-topic"; + +private static final String SUBSCRIPTION_ID = "test-subscription"; + +private StreamExecutionEnvironment env; + +@Container +private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = +new PubSubEmulatorContainer( + DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR)); + +private PubsubHelper pubSubHelper; + +@BeforeEach +void setUp() throws IOException { +env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); Review Comment: We are not testing anything that needs higher parallelism in this scenario, is there a reason to use anything different? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35548] Add E2E tests for PubSubSinkV2 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #28: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/28#discussion_r1632242928 ## flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java: ## @@ -0,0 +1,115 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.TestLogger; + +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.util.List; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +class PubSubSinkV2ITTests extends TestLogger { + +private static final String PROJECT_ID = "test-project"; + +private static final String TOPIC_ID = "test-topic"; + +private static final String SUBSCRIPTION_ID = "test-subscription"; + +private StreamExecutionEnvironment env; + +@Container +private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = +new PubSubEmulatorContainer( + DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR)); + +private PubsubHelper pubSubHelper; + +@BeforeEach +void setUp() throws IOException { +env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +pubSubHelper = new PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()); + +pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID); +pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, PROJECT_ID, TOPIC_ID); +} + +@AfterEach +void tearDown() throws IOException { +pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID); +pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID); +pubSubHelper.close(); +} + +@Test +void pubSubSinkV2DeliversRecords() throws Exception { Review Comment: There is no exception expected, any exception in this test should fail the test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#issuecomment-2156438875 Addressed the latest comments and ported AMP signer to AWS SDKv2 (!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853470#comment-17853470 ] Tamir Sagi commented on FLINK-35138: Thank you Danny! highly appreciated it. > Release flink-connector-kafka v3.2.0 for Flink 1.19 > --- > > Key: FLINK-35138 > URL: https://issues.apache.org/jira/browse/FLINK-35138 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.2.0 > > > https://github.com/apache/flink-connector-kafka -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853470#comment-17853470 ] Tamir Sagi edited comment on FLINK-35138 at 6/9/24 7:45 AM: Thank you Danny! highly appreciated. was (Author: JIRAUSER283777): Thank you Danny! highly appreciated it. > Release flink-connector-kafka v3.2.0 for Flink 1.19 > --- > > Key: FLINK-35138 > URL: https://issues.apache.org/jira/browse/FLINK-35138 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.2.0 > > > https://github.com/apache/flink-connector-kafka -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853471#comment-17853471 ] Tamir Sagi commented on FLINK-35138: Thank you Danny! highly appreciated. > Release flink-connector-kafka v3.2.0 for Flink 1.19 > --- > > Key: FLINK-35138 > URL: https://issues.apache.org/jira/browse/FLINK-35138 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.2.0 > > > https://github.com/apache/flink-connector-kafka -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-35138) Release flink-connector-kafka v3.2.0 for Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35138 ] Tamir Sagi deleted comment on FLINK-35138: was (Author: JIRAUSER283777): Thank you Danny! highly appreciated. > Release flink-connector-kafka v3.2.0 for Flink 1.19 > --- > > Key: FLINK-35138 > URL: https://issues.apache.org/jira/browse/FLINK-35138 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Reporter: Danny Cranmer >Assignee: Danny Cranmer >Priority: Major > Labels: pull-request-available > Fix For: kafka-3.2.0 > > > https://github.com/apache/flink-connector-kafka -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632187848 ## README.md: ## @@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework with powerful stream- Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) +## Modules + +This repository contains the following modules + +* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector implementation; supports optional request signer +* [Sample application](./example-datastream-job): Sample application showing the usage of the connector with DataStream API. It also demonstrates how to configure the request signer. Review Comment: I don't think an integration test is useful for a user. And the lack of proper working examples is a common obstacle to the adoption for users new to Flink. This module is not supposed to be released in any form. Can't we just keep it as source, and possibly reference from the documentation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632194796 ## amp-request-signer/README.md: ## @@ -0,0 +1,31 @@ +## Request Signer for Amazon Managed Prometheus (AMP) + +Request signer implementation for Amazon Managed Prometheus (AMP) + +The signer retrieves AWS credentials using `com.amazonaws.auth.DefaultAWSCredentialsProviderChain` and automatically +supports session credentials. + +The Flink application requires `RemoteWrite` permissions to the AMP workspace (e.g. `AmazonPromethusRemoteWriteAccess` +policy). + +### Sample usage + +To enable request signing for Amazon Managed Prometheus, and instance of `AmazonManagedPrometheusWriteRequestSigner` +must Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632194645 ## README.md: ## @@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework with powerful stream- Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) +## Modules + +This repository contains the following modules + +* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector implementation; supports optional request signer +* [Sample application](./example-datastream-job): Sample application showing the usage of the connector with DataStream API. It also demonstrates how to configure the request signer. +* [Amazon Managed Prometheus Request Signer](./amp-request-signer): Implementation of request signer for Amazon Managed Prometheus (AMP) Review Comment: It was actually requested earlier in the PR to make the module name fully qualified, prepending `flink-connector-prometheus-`. I am renaming the module to `flink-connector-prometheus-request-signer-amp` to make it fully qualified and retaining the logical order, general to specific -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632188645 ## README.md: ## @@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework with powerful stream- Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) +## Modules + +This repository contains the following modules + +* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector implementation; supports optional request signer Review Comment: Renamed subdirectories to match artifact names -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33138] DataStream API implementation [flink-connector-prometheus]
nicusX commented on code in PR #1: URL: https://github.com/apache/flink-connector-prometheus/pull/1#discussion_r1632187848 ## README.md: ## @@ -8,6 +8,14 @@ Apache Flink is an open source stream processing framework with powerful stream- Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) +## Modules + +This repository contains the following modules + +* [Prometheus Connector](./prometheus-connector): Flink Prometheus Connector implementation; supports optional request signer +* [Sample application](./example-datastream-job): Sample application showing the usage of the connector with DataStream API. It also demonstrates how to configure the request signer. Review Comment: I don't think an integration test is useful for a user. And the lack of proper working examples is a common obstacle to the adoption from new to Flink. This module is not supposed to be released in any form. Can't we just keep it as source, and possibly reference from the documentation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#issuecomment-2156354857 > It'd also be great to mention what permissions are needed for this connector to work. E.g., is `sqs:SendMessage` sufficient? Good point!, Yes, `sqs:SendMessage` is sufficient, updated the same in the documentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35453][core] StreamReader Charset fix with UTF8 in core files [flink]
xuzifu666 commented on PR #24842: URL: https://github.com/apache/flink/pull/24842#issuecomment-2156347384 > Hi @xuzifu666 thanks for driving this PR. Could you please update the intro section of this PR and add tests for your changes? Thanks @jeyhunkarimov Hi,it is not fitable for adding test,maybe we can refer other changes in hive or spark? follows is some prs refer to it: hive(https://github.com/apache/hive/pull/5243),spark(https://github.com/apache/spark/pull/46509),expect for your next review, Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632183182 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,122 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.3-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests +Flink : Connectors : AWS : E2E Tests : Amazon SQS +jar + + + +org.apache.flink +flink-streaming-java +${flink.version} +test + + + +org.apache.flink +flink-connector-sqs +${project.version} +test + + + +org.apache.flink +flink-connector-aws-base +${project.version} +test +test-jar + + +com.typesafe.netty +netty-reactive-streams-http + + + + + +org.apache.flink +flink-connector-sqs +${project.version} +test +test-jar + + +com.typesafe.netty +netty-reactive-streams-http + + Review Comment: Unnecessary. Removed in next revision -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35305]Amazon SQS Sink Connector [flink-connector-aws]
19priyadhingra commented on code in PR #141: URL: https://github.com/apache/flink-connector-aws/pull/141#discussion_r1632182646 ## flink-connector-aws-e2e-tests/flink-connector-aws-sqs-e2e-tests/pom.xml: ## @@ -0,0 +1,122 @@ + + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +flink-connector-aws-e2e-tests-parent +org.apache.flink +4.3-SNAPSHOT + + +4.0.0 + +flink-connector-aws-sqs-e2e-tests +Flink : Connectors : AWS : E2E Tests : Amazon SQS +jar + + + +org.apache.flink +flink-streaming-java +${flink.version} +test + + + +org.apache.flink +flink-connector-sqs +${project.version} +test + + + +org.apache.flink +flink-connector-aws-base +${project.version} +test +test-jar + + +com.typesafe.netty +netty-reactive-streams-http + + Review Comment: Unnecessary. Removed in next revision -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org