Re: Bug report for reading Hive table as streaming source.
Reported. JIRA link: https://issues.apache.org/jira/browse/FLINK-35118 On Mon, Apr 15, 2024 at 12:03 PM Xiaolong Wang wrote: > Sure > > On Mon, Apr 1, 2024 at 9:28 AM yuxia wrote: > >> Thanks for reporting. Could you please help create a jira about it? >> >> Best regards, >> Yuxia >> >> - 原始邮件 - >> 发件人: "Xiaolong Wang" >> 收件人: "dev" >> 发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20 >> 主题: Re: Bug report for reading Hive table as streaming source. >> >> I think it worth mentioning in the documentation of Hive read that it >> cannot read a table that has more than 32,767 partitions. >> >> On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang < >> xiaolong.w...@smartnews.com> >> wrote: >> >> > Found out the reason: >> > >> > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch >> > partitions using the following method: >> > >> > List listPartitionNames(String db_name, String tbl_name, >> > short max_parts) throws MetaException, TException; >> > >> > where the max_parts represents the max number of partitions it can fetch >> > from the Hive metastore. >> > So the max number of partitions it can fetch is Short.MAX_VALUE, which >> is >> > 32767 . >> > >> > But the table has a way more partition number than the max value, thus >> the >> > list partition operations cannot fetch all partitions, hence it cannot >> > consume the recent partition. >> > >> > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang < >> xiaolong.w...@smartnews.com> >> > wrote: >> > >> >> Hi, >> >> >> >> I found a weird bug when reading a Hive table as a streaming source. >> >> >> >> In summary, if the first partition is not time related, then the Hive >> >> table cannot be read as a streaming source. >> >> >> >> e.g. >> >> >> >> I've a Hive table in the definition of >> >> >> >> ``` >> >> CREATE TABLE article ( >> >> id BIGINT, >> >> edition STRING, >> >> dt STRING, >> >> hh STRING >> >> ) >> >> PARTITIONED BY (edition, dt, hh) >> >> USING orc; >> >> ``` >> >> Then I try to query it as a streaming source: >> >> >> >> ``` >> >> INSERT INTO kafka_sink >> >> SELECT id >> >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true', >> >> 'streaming-source.partition-order' = 'partition-name', >> >> 'streaming-source.consume-start-offset' = >> >> 'edition=en_US/dt=2024-03-26/hh=00') */ >> >> ``` >> >> >> >> And I see no output in the `kafka_sink`. >> >> >> >> Then I defined an external table pointing to the same path but has no >> >> `edition` partition, >> >> >> >> ``` >> >> CREATE TABLE en_article ( >> >> id BIGINT, >> >> edition STRING, >> >> dt STRING, >> >> hh STRING >> >> ) >> >> PARTITIONED BY (edition, dt, hh) >> >> LOCATION 's3://xxx/article/edition=en_US' >> >> USING orc; >> >> ``` >> >> >> >> And insert with the following statement: >> >> >> >> ``` >> >> INSERT INTO kafka_sink >> >> SELECT id >> >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', >> >> 'streaming-source.partition-order' = 'partition-name', >> >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ >> >> ``` >> >> >> >> The data is sinked. >> >> >> >> >> > >> >
Re: Bug report for reading Hive table as streaming source.
Sure On Mon, Apr 1, 2024 at 9:28 AM yuxia wrote: > Thanks for reporting. Could you please help create a jira about it? > > Best regards, > Yuxia > > - 原始邮件 - > 发件人: "Xiaolong Wang" > 收件人: "dev" > 发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20 > 主题: Re: Bug report for reading Hive table as streaming source. > > I think it worth mentioning in the documentation of Hive read that it > cannot read a table that has more than 32,767 partitions. > > On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang > > wrote: > > > Found out the reason: > > > > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch > > partitions using the following method: > > > > List listPartitionNames(String db_name, String tbl_name, > > short max_parts) throws MetaException, TException; > > > > where the max_parts represents the max number of partitions it can fetch > > from the Hive metastore. > > So the max number of partitions it can fetch is Short.MAX_VALUE, which is > > 32767 . > > > > But the table has a way more partition number than the max value, thus > the > > list partition operations cannot fetch all partitions, hence it cannot > > consume the recent partition. > > > > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang < > xiaolong.w...@smartnews.com> > > wrote: > > > >> Hi, > >> > >> I found a weird bug when reading a Hive table as a streaming source. > >> > >> In summary, if the first partition is not time related, then the Hive > >> table cannot be read as a streaming source. > >> > >> e.g. > >> > >> I've a Hive table in the definition of > >> > >> ``` > >> CREATE TABLE article ( > >> id BIGINT, > >> edition STRING, > >> dt STRING, > >> hh STRING > >> ) > >> PARTITIONED BY (edition, dt, hh) > >> USING orc; > >> ``` > >> Then I try to query it as a streaming source: > >> > >> ``` > >> INSERT INTO kafka_sink > >> SELECT id > >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true', > >> 'streaming-source.partition-order' = 'partition-name', > >> 'streaming-source.consume-start-offset' = > >> 'edition=en_US/dt=2024-03-26/hh=00') */ > >> ``` > >> > >> And I see no output in the `kafka_sink`. > >> > >> Then I defined an external table pointing to the same path but has no > >> `edition` partition, > >> > >> ``` > >> CREATE TABLE en_article ( > >> id BIGINT, > >> edition STRING, > >> dt STRING, > >> hh STRING > >> ) > >> PARTITIONED BY (edition, dt, hh) > >> LOCATION 's3://xxx/article/edition=en_US' > >> USING orc; > >> ``` > >> > >> And insert with the following statement: > >> > >> ``` > >> INSERT INTO kafka_sink > >> SELECT id > >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', > >> 'streaming-source.partition-order' = 'partition-name', > >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ > >> ``` > >> > >> The data is sinked. > >> > >> > > >
Re: Bug report for reading Hive table as streaming source.
I think it worth mentioning in the documentation of Hive read that it cannot read a table that has more than 32,767 partitions. On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang wrote: > Found out the reason: > > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch > partitions using the following method: > > List listPartitionNames(String db_name, String tbl_name, > short max_parts) throws MetaException, TException; > > where the max_parts represents the max number of partitions it can fetch > from the Hive metastore. > So the max number of partitions it can fetch is Short.MAX_VALUE, which is > 32767 . > > But the table has a way more partition number than the max value, thus the > list partition operations cannot fetch all partitions, hence it cannot > consume the recent partition. > > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang > wrote: > >> Hi, >> >> I found a weird bug when reading a Hive table as a streaming source. >> >> In summary, if the first partition is not time related, then the Hive >> table cannot be read as a streaming source. >> >> e.g. >> >> I've a Hive table in the definition of >> >> ``` >> CREATE TABLE article ( >> id BIGINT, >> edition STRING, >> dt STRING, >> hh STRING >> ) >> PARTITIONED BY (edition, dt, hh) >> USING orc; >> ``` >> Then I try to query it as a streaming source: >> >> ``` >> INSERT INTO kafka_sink >> SELECT id >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true', >> 'streaming-source.partition-order' = 'partition-name', >> 'streaming-source.consume-start-offset' = >> 'edition=en_US/dt=2024-03-26/hh=00') */ >> ``` >> >> And I see no output in the `kafka_sink`. >> >> Then I defined an external table pointing to the same path but has no >> `edition` partition, >> >> ``` >> CREATE TABLE en_article ( >> id BIGINT, >> edition STRING, >> dt STRING, >> hh STRING >> ) >> PARTITIONED BY (edition, dt, hh) >> LOCATION 's3://xxx/article/edition=en_US' >> USING orc; >> ``` >> >> And insert with the following statement: >> >> ``` >> INSERT INTO kafka_sink >> SELECT id >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', >> 'streaming-source.partition-order' = 'partition-name', >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ >> ``` >> >> The data is sinked. >> >> >
Re: Bug report for reading Hive table as streaming source.
Found out the reason: It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch partitions using the following method: List listPartitionNames(String db_name, String tbl_name, short max_parts) throws MetaException, TException; where the max_parts represents the max number of partitions it can fetch from the Hive metastore. So the max number of partitions it can fetch is Short.MAX_VALUE, which is 32767 . But the table has a way more partition number than the max value, thus the list partition operations cannot fetch all partitions, hence it cannot consume the recent partition. On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang wrote: > Hi, > > I found a weird bug when reading a Hive table as a streaming source. > > In summary, if the first partition is not time related, then the Hive > table cannot be read as a streaming source. > > e.g. > > I've a Hive table in the definition of > > ``` > CREATE TABLE article ( > id BIGINT, > edition STRING, > dt STRING, > hh STRING > ) > PARTITIONED BY (edition, dt, hh) > USING orc; > ``` > Then I try to query it as a streaming source: > > ``` > INSERT INTO kafka_sink > SELECT id > FROM article /*+ OPTIONS('streaming-source.enable' = 'true', > 'streaming-source.partition-order' = 'partition-name', > 'streaming-source.consume-start-offset' = > 'edition=en_US/dt=2024-03-26/hh=00') */ > ``` > > And I see no output in the `kafka_sink`. > > Then I defined an external table pointing to the same path but has no > `edition` partition, > > ``` > CREATE TABLE en_article ( > id BIGINT, > edition STRING, > dt STRING, > hh STRING > ) > PARTITIONED BY (edition, dt, hh) > LOCATION 's3://xxx/article/edition=en_US' > USING orc; > ``` > > And insert with the following statement: > > ``` > INSERT INTO kafka_sink > SELECT id > FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', > 'streaming-source.partition-order' = 'partition-name', > 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ > ``` > > The data is sinked. > >
Bug report for reading Hive table as streaming source.
Hi, I found a weird bug when reading a Hive table as a streaming source. In summary, if the first partition is not time related, then the Hive table cannot be read as a streaming source. e.g. I've a Hive table in the definition of ``` CREATE TABLE article ( id BIGINT, edition STRING, dt STRING, hh STRING ) PARTITIONED BY (edition, dt, hh) USING orc; ``` Then I try to query it as a streaming source: ``` INSERT INTO kafka_sink SELECT id FROM article /*+ OPTIONS('streaming-source.enable' = 'true', 'streaming-source.partition-order' = 'partition-name', 'streaming-source.consume-start-offset' = 'edition=en_US/dt=2024-03-26/hh=00') */ ``` And I see no output in the `kafka_sink`. Then I defined an external table pointing to the same path but has no `edition` partition, ``` CREATE TABLE en_article ( id BIGINT, edition STRING, dt STRING, hh STRING ) PARTITIONED BY (edition, dt, hh) LOCATION 's3://xxx/article/edition=en_US' USING orc; ``` And insert with the following statement: ``` INSERT INTO kafka_sink SELECT id FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true', 'streaming-source.partition-order' = 'partition-name', 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */ ``` The data is sinked.
Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer
Thanks, this looks similar. I'll do some work around. On Thu, Feb 29, 2024 at 5:15 PM Aleksandr Pilipenko wrote: > Based on the stacktrace, this looks like an issue described here: > https://issues.apache.org/jira/browse/FLINK-32964 > Is your configuration similar to the one described in the ticket? If so, > you can work around this issue by explicitly specifying the credentials > provider for connector, by doing so avoiding using > DefaultCredentialsProvider (AUTO). > > Caused by: java.lang.IllegalStateException: Connection pool shut down > ... > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials. > *WebIdentityTokenFileCredentialsProvider* > .resolveCredentials(WebIdentityTokenFileCredentialsProvider.java: > 143) ~[?:?] > ... > at > org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials. > *DefaultCredentialsProvider* > .resolveCredentials(DefaultCredentialsProvider.java: > 128) ~[?:?] > > On Thu, 29 Feb 2024 at 02:24, Xiaolong Wang > wrote: > > > Sorry, I just attached a wrong file. Let me paste the error log: > > > > java.lang.RuntimeException: Maximum retries exceeded for > SubscribeToShard. > > Failed 10 times. > > at > > > > > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java: > > 211) ~[?:?] > > at > > > > > org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java: > > 130) ~[?:?] > > at > > > > > org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java: > > 114) ~[?:?] > > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > > ~[?:? > > ] > > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > > ~[?:?] > > at java.lang.Thread.run(Unknown Source) ~[?:?] > > Caused by: java.lang.IllegalStateException: Connection pool shut down > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java: > > 34) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java: > > 269) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory > > $ > > > > > DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java: > > 75) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory > > $ > > > > > InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java: > > 57) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java: > > 176) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java: > > 186) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java: > > 185) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java: > > 83) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java: > > 56) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java: > > 72) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java: > > 254) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access > > $500(ApacheHttpClient.java:104) ~[?:?] > > at > > > > > org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient > > $1.call(ApacheHttpClient.java:231) ~[?:?] > > at > > > >
Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer
) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java: 208) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java: 135) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java: 105) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory $ StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java: 109) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java: 143) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java: 90) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java: 45) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java: 128) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDuration(MetricUtils.java: 54) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.authcontext.AwsCredentialsAuthorizationStrategy.resolveCredentials(AwsCredentialsAuthorizationStrategy.java: 100) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.authcontext.AwsCredentialsAuthorizationStrategy.addCredentialsToExecutionAttributes(AwsCredentialsAuthorizationStrategy.java: 77) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java: 125) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsAsyncClientHandler.java: 65) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler. lambda$execute$3(BaseAsyncClientHandler.java:118) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.measureApiCallSuccess(BaseAsyncClientHandler.java: 291) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.execute(BaseAsyncClientHandler.java: 91) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.execute(AwsAsyncClientHandler.java: 59) ~[?:?] at org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.subscribeToShard(DefaultKinesisAsyncClient.java: 2730) ~[?:?] at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2.subscribeToShard(KinesisProxyAsyncV2.java: 66) ~[?:?] at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.openSubscriptionToShard(FanOutShardSubscriber.java: 250) ~[?:?] at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java: 196) ~[?:?] at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java: 176) ~[?:?] ... 7 more On Wed, Feb 28, 2024 at 6:21 PM Aleksandr Pilipenko wrote: > Hi, > > Could you please provide more information on the error you are observing? > Attached file does not have anything related to Kinesis or any errors. > > Best, > Aleksandr > > On Wed, 28 Feb 2024 at 02:28, Xiaolong Wang > wrote: > > > Hi, > > > > I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis. > > The job can start but will fail within 1 hour. Detailed error log > > is attached. > > > > When I changed the version of the flink-connector-kinesis to `1.15.2` , > > everything settled. > > > > Any idea to fix it ? > > > > >
Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer
Hi, I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis. The job can start but will fail within 1 hour. Detailed error log is attached. When I changed the version of the flink-connector-kinesis to `1.15.2` , everything settled. Any idea to fix it ? create table kafka_event_v1 ( `timestamp` bigint, serverTimestamp bigint, name string, data string, app string, `user` string, context string, browser row< url string, referrer string, userAgent string, `language` string, title string, viewportWidth int, viewportHeight int, contentWidth int, contentHeight int, cookies map, name string, version string, device row< model string, type string, vendor string >, engine row< name string, version string >, os row< name string, version string > >, abtests map, apikey string, lifecycleId string, sessionId string, instanceId string, requestId string, eventId string, `trigger` string, virtualId string, accountId string, ip string, serverTimestampLtz as to_timestamp(from_unixtime(serverTimestamp / 1000)), watermark for serverTimestampLtz as serverTimestampLtz - interval '5' second ) with ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'shared.kafka.smartnews.internal:9093', 'topic' = 'shared-cluster-sn-pixel-event-v1-dev', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'properties.group.id' = 'event_v2', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'SNTOKEN', 'properties.sasl.login.class' = 'com.smartnews.dp.kafka.security.sn.auth.SnTokenLogin', 'properties.sasl.login.callback.handler.class' = 'com.smartnews.dp.kafka.security.sn.auth.SnTokenCallbackHandler', 'properties.sasl.client.callback.handler.class' = 'com.smartnews.dp.kafka.security.sn.sasl.SnTokenSaslClientCallbackHandler', 'properties.sasl.jaas.config' = 'com.smartnews.dp.kafka.security.sn.auth.SnTokenLoginModule required username="sn-pixel" password="aQXJcNUsCuIZpICHO9bQ" env="prd";', 'properties.ssl.truststore.type' = 'PEM' ); create catalog iceberg_dev with ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://dev-hive-metastore.smartnews.internal:9083', 'warehouse'='s3a://smartnews-dmp/warehouse/development' ); insert into iceberg_dev.pixel.event_v2 /*+ options( 'partition.time-extractor.timestamp-pattern'='$dt 00:00:00', 'sink.partition-commit.policy.kind'='metastore,success-file', 'auto-compaction'='true' ) */ select `timestamp`, serverTimestamp, data, app, `user`, context, browser, abtests, lifecycleId, sessionId, instanceId, requestId, eventId, `trigger`, virtualId, accountId, ip, date_format(serverTimestampLtz, '-MM-dd') dt, apikey, name from default_catalog.default_database.kafka_event_v1 ;
Re: [Discuss] CRD for flink sql gateway in the flink k8s operator
Hi, Dongwoo, Since Flink SQL gateway should run upon a Flink session cluster, I think it'd be easier to add more fields to the CRD of `FlinkSessionJob`. e.g. apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: name: sql-gateway spec: sqlGateway: endpoint: "hiveserver2" mode: "streaming" hiveConf: configMap: name: hive-config items: - key: hive-site.xml path: hive-site.xml On Fri, Sep 15, 2023 at 12:56 PM Dongwoo Kim wrote: > Hi all, > > *@Gyula* > Thanks for the consideration Gyula. My initial idea for the CR was roughly > like below. > I focused on simplifying the setup in k8s environment, but I agree with > your opinion that for the sql gateway > we don't need custom operator logic to handle and most of the requirements > can be met by existing k8s resources. > So maybe helm chart that bundles all resources needed should be enough. > > apiVersion: flink.apache.org/v1beta1 > kind: FlinkSqlGateway > metadata: > name: flink-sql-gateway-example > namespace: default > spec: > clusterName: flink-session-cluster-example > exposeServiceType: LoadBalancer > flinkSqlGatewayConfiguration: > sql-gateway.endpoint.type: "hiveserver2" > sql-gateway.endpoint.hiveserver2.catalog.name: "hive" > hiveConf: > configMap: > name: hive-config > items: > - key: hive-site.xml > path: hive-site.xml > > > *@xiaolong, @Shammon* > Hi xiaolong and Shammon. > Thanks for taking the time to share. > I'd also like to add my experience with setting up flink sql gateway on > k8s. > Without building a new Docker image, I've added a separate container to the > existing JobManager pod and started the sql gateway using the > "sql-gateway.sh start-foreground" command. > I haven't explored deploying the sql gateway as an independent deployment > yet, but that's something I'm considering after modifying JM's address to > desired session cluster. > > Thanks all > > Best > Dongwoo > > 2023년 9월 15일 (금) 오전 11:55, Xiaolong Wang > 님이 작성: > > > Hi, Shammon, > > > > Yes, I want to create a Flink SQL-gateway in a job-manager. > > > > Currently, the above script is generally a work-around and allows me to > > start a Flink session job manager with a SQL gateway running upon. > > > > I agree that it'd be more elegant that we create a new job type and > write a > > script, which is much easier for the user to use (since they do not need > to > > build a separate Flink image any more). > > > > > > > > On Fri, Sep 15, 2023 at 10:29 AM Shammon FY wrote: > > > > > Hi, > > > > > > Currently `sql-gateway` can be started with the script `sql-gateway.sh` > > in > > > an existing node, it is more like a simple "standalone" node. I think > > it's > > > valuable if we can do more work to start it in k8s. > > > > > > For xiaolong: > > > Do you want to start a sql-gateway instance in the jobmanager pod? I > > think > > > maybe we need a script like `kubernetes-sql-gatewah.sh` to start > > > `sql-gateway` pods with a flink image, what do you think? > > > > > > Best, > > > Shammon FY > > > > > > > > > On Fri, Sep 15, 2023 at 10:02 AM Xiaolong Wang > > > wrote: > > > > > > > Hi, I've experiment this feature on K8S recently, here is some of my > > > trial: > > > > > > > > > > > > 1. Create a new kubernetes-jobmanager.sh script with the following > > > content > > > > > > > > #!/usr/bin/env bash > > > > $FLINK_HOME/bin/sql-gateway.sh start > > > > $FLINK_HOME/bin/kubernetes-jobmanager1.sh kubernetes-session > > > > > > > > 2. Build your own Flink docker image something like this > > > > FROM flink:1.17.1-scala_2.12-java11 > > > > > > > > RUN mv $FLINK_HOME/bin/kubernetes-jobmanager.sh $FLINK_HOME/bin/ > > > > kubernetes-jobmanager1.sh > > > > COPY ./kubernetes-jobmanager.sh > > $FLINK_HOME/bin/kubernetes-jobmanager.sh > > > > > > > > RUN chmod +x $FLINK_HOME/bin/*.sh > > > > USER flink > > > > > > > > 3. Create a Flink session job with the operator using the above > image. > > > > > > > > On Thu, Sep 14, 2023 at 9:49 PM Gyula Fóra > > wrote: > > > > > > > > > Hi! > > > > > > > > > > I don't completely
Re: [Discuss] CRD for flink sql gateway in the flink k8s operator
Hi, I've experiment this feature on K8S recently, here is some of my trial: 1. Create a new kubernetes-jobmanager.sh script with the following content #!/usr/bin/env bash $FLINK_HOME/bin/sql-gateway.sh start $FLINK_HOME/bin/kubernetes-jobmanager1.sh kubernetes-session 2. Build your own Flink docker image something like this FROM flink:1.17.1-scala_2.12-java11 RUN mv $FLINK_HOME/bin/kubernetes-jobmanager.sh $FLINK_HOME/bin/ kubernetes-jobmanager1.sh COPY ./kubernetes-jobmanager.sh $FLINK_HOME/bin/kubernetes-jobmanager.sh RUN chmod +x $FLINK_HOME/bin/*.sh USER flink 3. Create a Flink session job with the operator using the above image. On Thu, Sep 14, 2023 at 9:49 PM Gyula Fóra wrote: > Hi! > > I don't completely understand what would be a content of such CRD, could > you give a minimal example how the Flink SQL Gateway CR yaml would look > like? > > Adding a CRD would mean you need to add some operator/controller logic as > well. Why not simply use a Deployment / StatefulSet in Kubernetes? > > Or a Helm chart if you want to make it more user friendly? > > Cheers, > Gyula > > On Thu, Sep 14, 2023 at 12:57 PM Dongwoo Kim > wrote: > > > Hi all, > > > > I've been working on setting up a flink SQL gateway in a k8s environment > > and it got me thinking — what if we had a CRD for this? > > > > So I have quick questions below. > > 1. Is there ongoing work to create a CRD for the Flink SQL Gateway? > > 2. If not, would the community be open to considering a CRD for this? > > > > I've noticed a growing demand for simplified setup of the flink sql > gateway > > in flink's slack channel. > > Implementing a CRD could make deployments easier and offer better > > integration with k8s. > > > > If this idea is accepted, I'm open to drafting a FLIP for further > > discussion > > > > Thanks for your time and looking forward to your thoughts! > > > > Best regards, > > Dongwoo > > >