Re: Bug report for reading Hive table as streaming source.

2024-04-15 Thread Xiaolong Wang
Reported. JIRA link: https://issues.apache.org/jira/browse/FLINK-35118

On Mon, Apr 15, 2024 at 12:03 PM Xiaolong Wang 
wrote:

> Sure
>
> On Mon, Apr 1, 2024 at 9:28 AM yuxia  wrote:
>
>> Thanks for reporting. Could you please help create a jira about it?
>>
>> Best regards,
>> Yuxia
>>
>> - 原始邮件 -
>> 发件人: "Xiaolong Wang" 
>> 收件人: "dev" 
>> 发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20
>> 主题: Re: Bug report for reading Hive table as streaming source.
>>
>> I think it worth mentioning in the documentation of Hive read that it
>> cannot read a table that has more than 32,767 partitions.
>>
>> On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang <
>> xiaolong.w...@smartnews.com>
>> wrote:
>>
>> > Found out the reason:
>> >
>> > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch
>> > partitions using the following method:
>> >
>> >   List listPartitionNames(String db_name, String tbl_name,
>> > short max_parts) throws MetaException, TException;
>> >
>> > where the max_parts represents the max number of partitions it can fetch
>> > from the Hive metastore.
>> > So the max number of partitions it can fetch is Short.MAX_VALUE, which
>> is
>> > 32767 .
>> >
>> > But the table has a way more partition number than the max value, thus
>> the
>> > list partition operations cannot fetch all partitions, hence it cannot
>> > consume the recent partition.
>> >
>> > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang <
>> xiaolong.w...@smartnews.com>
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> I found a weird bug when reading a Hive table as a streaming source.
>> >>
>> >> In summary, if the first partition is not time related, then the Hive
>> >> table cannot be read as a streaming source.
>> >>
>> >> e.g.
>> >>
>> >> I've a Hive table in the definition of
>> >>
>> >> ```
>> >> CREATE TABLE article (
>> >> id BIGINT,
>> >> edition STRING,
>> >> dt STRING,
>> >> hh STRING
>> >> )
>> >> PARTITIONED BY (edition, dt, hh)
>> >> USING orc;
>> >> ```
>> >> Then I try to query it as a streaming source:
>> >>
>> >> ```
>> >> INSERT INTO kafka_sink
>> >> SELECT id
>> >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
>> >> 'streaming-source.partition-order' = 'partition-name',
>> >> 'streaming-source.consume-start-offset' =
>> >> 'edition=en_US/dt=2024-03-26/hh=00') */
>> >> ```
>> >>
>> >> And I see no output in the `kafka_sink`.
>> >>
>> >> Then I defined an external table pointing to the same path but has no
>> >> `edition` partition,
>> >>
>> >> ```
>> >> CREATE TABLE en_article (
>> >> id BIGINT,
>> >> edition STRING,
>> >> dt STRING,
>> >> hh STRING
>> >> )
>> >> PARTITIONED BY (edition, dt, hh)
>> >> LOCATION 's3://xxx/article/edition=en_US'
>> >> USING orc;
>> >> ```
>> >>
>> >> And insert with the following statement:
>> >>
>> >> ```
>> >> INSERT INTO kafka_sink
>> >> SELECT id
>> >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
>> >> 'streaming-source.partition-order' = 'partition-name',
>> >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
>> >> ```
>> >>
>> >> The data is sinked.
>> >>
>> >>
>> >
>>
>


Re: Bug report for reading Hive table as streaming source.

2024-04-14 Thread Xiaolong Wang
Sure

On Mon, Apr 1, 2024 at 9:28 AM yuxia  wrote:

> Thanks for reporting. Could you please help create a jira about it?
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Xiaolong Wang" 
> 收件人: "dev" 
> 发送时间: 星期四, 2024年 3 月 28日 下午 5:11:20
> 主题: Re: Bug report for reading Hive table as streaming source.
>
> I think it worth mentioning in the documentation of Hive read that it
> cannot read a table that has more than 32,767 partitions.
>
> On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang  >
> wrote:
>
> > Found out the reason:
> >
> > It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch
> > partitions using the following method:
> >
> >   List listPartitionNames(String db_name, String tbl_name,
> > short max_parts) throws MetaException, TException;
> >
> > where the max_parts represents the max number of partitions it can fetch
> > from the Hive metastore.
> > So the max number of partitions it can fetch is Short.MAX_VALUE, which is
> > 32767 .
> >
> > But the table has a way more partition number than the max value, thus
> the
> > list partition operations cannot fetch all partitions, hence it cannot
> > consume the recent partition.
> >
> > On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang <
> xiaolong.w...@smartnews.com>
> > wrote:
> >
> >> Hi,
> >>
> >> I found a weird bug when reading a Hive table as a streaming source.
> >>
> >> In summary, if the first partition is not time related, then the Hive
> >> table cannot be read as a streaming source.
> >>
> >> e.g.
> >>
> >> I've a Hive table in the definition of
> >>
> >> ```
> >> CREATE TABLE article (
> >> id BIGINT,
> >> edition STRING,
> >> dt STRING,
> >> hh STRING
> >> )
> >> PARTITIONED BY (edition, dt, hh)
> >> USING orc;
> >> ```
> >> Then I try to query it as a streaming source:
> >>
> >> ```
> >> INSERT INTO kafka_sink
> >> SELECT id
> >> FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
> >> 'streaming-source.partition-order' = 'partition-name',
> >> 'streaming-source.consume-start-offset' =
> >> 'edition=en_US/dt=2024-03-26/hh=00') */
> >> ```
> >>
> >> And I see no output in the `kafka_sink`.
> >>
> >> Then I defined an external table pointing to the same path but has no
> >> `edition` partition,
> >>
> >> ```
> >> CREATE TABLE en_article (
> >> id BIGINT,
> >> edition STRING,
> >> dt STRING,
> >> hh STRING
> >> )
> >> PARTITIONED BY (edition, dt, hh)
> >> LOCATION 's3://xxx/article/edition=en_US'
> >> USING orc;
> >> ```
> >>
> >> And insert with the following statement:
> >>
> >> ```
> >> INSERT INTO kafka_sink
> >> SELECT id
> >> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
> >> 'streaming-source.partition-order' = 'partition-name',
> >> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
> >> ```
> >>
> >> The data is sinked.
> >>
> >>
> >
>


Re: Bug report for reading Hive table as streaming source.

2024-03-28 Thread Xiaolong Wang
I think it worth mentioning in the documentation of Hive read that it
cannot read a table that has more than 32,767 partitions.

On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang 
wrote:

> Found out the reason:
>
> It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch
> partitions using the following method:
>
>   List listPartitionNames(String db_name, String tbl_name,
> short max_parts) throws MetaException, TException;
>
> where the max_parts represents the max number of partitions it can fetch
> from the Hive metastore.
> So the max number of partitions it can fetch is Short.MAX_VALUE, which is
> 32767 .
>
> But the table has a way more partition number than the max value, thus the
> list partition operations cannot fetch all partitions, hence it cannot
> consume the recent partition.
>
> On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang 
> wrote:
>
>> Hi,
>>
>> I found a weird bug when reading a Hive table as a streaming source.
>>
>> In summary, if the first partition is not time related, then the Hive
>> table cannot be read as a streaming source.
>>
>> e.g.
>>
>> I've a Hive table in the definition of
>>
>> ```
>> CREATE TABLE article (
>> id BIGINT,
>> edition STRING,
>> dt STRING,
>> hh STRING
>> )
>> PARTITIONED BY (edition, dt, hh)
>> USING orc;
>> ```
>> Then I try to query it as a streaming source:
>>
>> ```
>> INSERT INTO kafka_sink
>> SELECT id
>> FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
>> 'streaming-source.partition-order' = 'partition-name',
>> 'streaming-source.consume-start-offset' =
>> 'edition=en_US/dt=2024-03-26/hh=00') */
>> ```
>>
>> And I see no output in the `kafka_sink`.
>>
>> Then I defined an external table pointing to the same path but has no
>> `edition` partition,
>>
>> ```
>> CREATE TABLE en_article (
>> id BIGINT,
>> edition STRING,
>> dt STRING,
>> hh STRING
>> )
>> PARTITIONED BY (edition, dt, hh)
>> LOCATION 's3://xxx/article/edition=en_US'
>> USING orc;
>> ```
>>
>> And insert with the following statement:
>>
>> ```
>> INSERT INTO kafka_sink
>> SELECT id
>> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
>> 'streaming-source.partition-order' = 'partition-name',
>> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
>> ```
>>
>> The data is sinked.
>>
>>
>


Re: Bug report for reading Hive table as streaming source.

2024-03-28 Thread Xiaolong Wang
Found out the reason:

It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch
partitions using the following method:

  List listPartitionNames(String db_name, String tbl_name,
short max_parts) throws MetaException, TException;

where the max_parts represents the max number of partitions it can fetch
from the Hive metastore.
So the max number of partitions it can fetch is Short.MAX_VALUE, which is
32767 .

But the table has a way more partition number than the max value, thus the
list partition operations cannot fetch all partitions, hence it cannot
consume the recent partition.

On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang 
wrote:

> Hi,
>
> I found a weird bug when reading a Hive table as a streaming source.
>
> In summary, if the first partition is not time related, then the Hive
> table cannot be read as a streaming source.
>
> e.g.
>
> I've a Hive table in the definition of
>
> ```
> CREATE TABLE article (
> id BIGINT,
> edition STRING,
> dt STRING,
> hh STRING
> )
> PARTITIONED BY (edition, dt, hh)
> USING orc;
> ```
> Then I try to query it as a streaming source:
>
> ```
> INSERT INTO kafka_sink
> SELECT id
> FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
> 'streaming-source.partition-order' = 'partition-name',
> 'streaming-source.consume-start-offset' =
> 'edition=en_US/dt=2024-03-26/hh=00') */
> ```
>
> And I see no output in the `kafka_sink`.
>
> Then I defined an external table pointing to the same path but has no
> `edition` partition,
>
> ```
> CREATE TABLE en_article (
> id BIGINT,
> edition STRING,
> dt STRING,
> hh STRING
> )
> PARTITIONED BY (edition, dt, hh)
> LOCATION 's3://xxx/article/edition=en_US'
> USING orc;
> ```
>
> And insert with the following statement:
>
> ```
> INSERT INTO kafka_sink
> SELECT id
> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
> 'streaming-source.partition-order' = 'partition-name',
> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
> ```
>
> The data is sinked.
>
>


Bug report for reading Hive table as streaming source.

2024-03-26 Thread Xiaolong Wang
Hi,

I found a weird bug when reading a Hive table as a streaming source.

In summary, if the first partition is not time related, then the Hive table
cannot be read as a streaming source.

e.g.

I've a Hive table in the definition of

```
CREATE TABLE article (
id BIGINT,
edition STRING,
dt STRING,
hh STRING
)
PARTITIONED BY (edition, dt, hh)
USING orc;
```
Then I try to query it as a streaming source:

```
INSERT INTO kafka_sink
SELECT id
FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
'streaming-source.partition-order' = 'partition-name',
'streaming-source.consume-start-offset' =
'edition=en_US/dt=2024-03-26/hh=00') */
```

And I see no output in the `kafka_sink`.

Then I defined an external table pointing to the same path but has no
`edition` partition,

```
CREATE TABLE en_article (
id BIGINT,
edition STRING,
dt STRING,
hh STRING
)
PARTITIONED BY (edition, dt, hh)
LOCATION 's3://xxx/article/edition=en_US'
USING orc;
```

And insert with the following statement:

```
INSERT INTO kafka_sink
SELECT id
FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
'streaming-source.partition-order' = 'partition-name',
'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
```

The data is sinked.


Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer

2024-02-29 Thread Xiaolong Wang
Thanks, this looks similar. I'll do some work around.

On Thu, Feb 29, 2024 at 5:15 PM Aleksandr Pilipenko 
wrote:

> Based on the stacktrace, this looks like an issue described here:
> https://issues.apache.org/jira/browse/FLINK-32964
> Is your configuration similar to the one described in the ticket? If so,
> you can work around this issue by explicitly specifying the credentials
> provider for connector, by doing so avoiding using
> DefaultCredentialsProvider (AUTO).
>
> Caused by: java.lang.IllegalStateException: Connection pool shut down
> ...
> at
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.
> *WebIdentityTokenFileCredentialsProvider*
> .resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:
> 143) ~[?:?]
> ...
> at
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.
> *DefaultCredentialsProvider*
> .resolveCredentials(DefaultCredentialsProvider.java:
> 128) ~[?:?]
>
> On Thu, 29 Feb 2024 at 02:24, Xiaolong Wang
>  wrote:
>
> > Sorry, I just attached a wrong file. Let me paste the error log:
> >
> > java.lang.RuntimeException: Maximum retries exceeded for
> SubscribeToShard.
> > Failed 10 times.
> > at
> >
> >
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:
> > 211) ~[?:?]
> > at
> >
> >
> org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:
> > 130) ~[?:?]
> > at
> >
> >
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:
> > 114) ~[?:?]
> > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> > ~[?:?
> > ]
> > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> ~[?:?]
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> > ~[?:?]
> > at java.lang.Thread.run(Unknown Source) ~[?:?]
> > Caused by: java.lang.IllegalStateException: Connection pool shut down
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.util.Asserts.check(Asserts.java:
> > 34) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:
> > 269) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory
> > $
> >
> >
> DelegatingHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:
> > 75) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory
> > $
> >
> >
> InstrumentedHttpClientConnectionManager.requestConnection(ClientConnectionManagerFactory.java:
> > 57) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:
> > 176) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:
> > 186) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:
> > 185) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:
> > 83) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:
> > 56) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient.execute(ApacheSdkHttpClient.java:
> > 72) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.execute(ApacheHttpClient.java:
> > 254) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient.access
> > $500(ApacheHttpClient.java:104) ~[?:?]
> > at
> >
> >
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.apache.ApacheHttpClient
> > $1.call(ApacheHttpClient.java:231) ~[?:?]
> > at
> >
> >

Re: Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer

2024-02-28 Thread Xiaolong Wang
) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:
208) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:
135) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:
105) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.internal.StsWebIdentityCredentialsProviderFactory
$
StsWebIdentityCredentialsProvider.resolveCredentials(StsWebIdentityCredentialsProviderFactory.java:
109) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider.resolveCredentials(WebIdentityTokenFileCredentialsProvider.java:
143) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:
90) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:
45) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:
128) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.util.MetricUtils.measureDuration(MetricUtils.java:
54) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.authcontext.AwsCredentialsAuthorizationStrategy.resolveCredentials(AwsCredentialsAuthorizationStrategy.java:
100) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.authcontext.AwsCredentialsAuthorizationStrategy.addCredentialsToExecutionAttributes(AwsCredentialsAuthorizationStrategy.java:
77) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:
125) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsAsyncClientHandler.java:
65) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.
lambda$execute$3(BaseAsyncClientHandler.java:118) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.measureApiCallSuccess(BaseAsyncClientHandler.java:
291) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.handler.BaseAsyncClientHandler.execute(BaseAsyncClientHandler.java:
91) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler.execute(AwsAsyncClientHandler.java:
59) ~[?:?]
at
org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.subscribeToShard(DefaultKinesisAsyncClient.java:
2730) ~[?:?]
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyAsyncV2.subscribeToShard(KinesisProxyAsyncV2.java:
66) ~[?:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.openSubscriptionToShard(FanOutShardSubscriber.java:
250) ~[?:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:
196) ~[?:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:
176) ~[?:?]
... 7 more

On Wed, Feb 28, 2024 at 6:21 PM Aleksandr Pilipenko 
wrote:

> Hi,
>
> Could you please provide more information on the error you are observing?
> Attached file does not have anything related to Kinesis or any errors.
>
> Best,
> Aleksandr
>
> On Wed, 28 Feb 2024 at 02:28, Xiaolong Wang
>  wrote:
>
> > Hi,
> >
> > I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis.
> > The job can start but will fail within 1 hour. Detailed error log
> > is attached.
> >
> > When I changed the version of the flink-connector-kinesis to `1.15.2` ,
> > everything settled.
> >
> > Any idea to fix it ?
> >
> >
>


Flink kinesis connector v4.2.0-1.18 has issue when consuming from EFO consumer

2024-02-27 Thread Xiaolong Wang
Hi,

I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis.
The job can start but will fail within 1 hour. Detailed error log
is attached.

When I changed the version of the flink-connector-kinesis to `1.15.2` ,
everything settled.

Any idea to fix it ?
create table kafka_event_v1 (
  `timestamp` bigint,
  serverTimestamp bigint,

  name string,
  data string,
  app string,
  `user` string,
  context string,
  browser row<
url string,
referrer string,
userAgent string,
`language` string,
title string,
viewportWidth int,
viewportHeight int,
contentWidth int,
contentHeight int,
cookies map,
name string,
version string,
device row<
  model string,
  type string,
  vendor string
>,
engine row<
  name string,
  version string
>,
os row<
  name string,
  version string
>
  >,
  abtests map,
  apikey string,
  lifecycleId string,
  sessionId string,
  instanceId string,
  requestId string,
  eventId string,
  `trigger` string,

  virtualId string,
  accountId string,
  ip string,

  serverTimestampLtz as to_timestamp(from_unixtime(serverTimestamp / 1000)),
  watermark for serverTimestampLtz as serverTimestampLtz - interval '5' second
) with (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'shared.kafka.smartnews.internal:9093',
  'topic' = 'shared-cluster-sn-pixel-event-v1-dev',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true',

  'properties.group.id' = 'event_v2',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'SNTOKEN',
  'properties.sasl.login.class' = 
'com.smartnews.dp.kafka.security.sn.auth.SnTokenLogin',
  'properties.sasl.login.callback.handler.class' = 
'com.smartnews.dp.kafka.security.sn.auth.SnTokenCallbackHandler',
  'properties.sasl.client.callback.handler.class' = 
'com.smartnews.dp.kafka.security.sn.sasl.SnTokenSaslClientCallbackHandler',
  'properties.sasl.jaas.config' = 
'com.smartnews.dp.kafka.security.sn.auth.SnTokenLoginModule required 
username="sn-pixel" password="aQXJcNUsCuIZpICHO9bQ" env="prd";',
  'properties.ssl.truststore.type' = 'PEM'
);

create catalog iceberg_dev with (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://dev-hive-metastore.smartnews.internal:9083',
  'warehouse'='s3a://smartnews-dmp/warehouse/development'
);

insert into iceberg_dev.pixel.event_v2 /*+ options(
  'partition.time-extractor.timestamp-pattern'='$dt 00:00:00',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'auto-compaction'='true'
) */
select
  `timestamp`,
  serverTimestamp,
  data,
  app,
  `user`,
  context,
  browser,
  abtests,
  lifecycleId,
  sessionId,
  instanceId,
  requestId,
  eventId,
  `trigger`,

  virtualId,
  accountId,
  ip,

  date_format(serverTimestampLtz, '-MM-dd') dt,
  apikey,
  name
from
  default_catalog.default_database.kafka_event_v1
;





Re: [Discuss] CRD for flink sql gateway in the flink k8s operator

2023-09-14 Thread Xiaolong Wang
Hi, Dongwoo,

Since Flink SQL gateway should run upon a Flink session cluster, I think
it'd be easier to add more fields to the CRD of `FlinkSessionJob`.

e.g.

apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
  name: sql-gateway
spec:
  sqlGateway:
endpoint: "hiveserver2"
mode: "streaming"
hiveConf:
  configMap:
name: hive-config
items:
  - key: hive-site.xml
path: hive-site.xml


On Fri, Sep 15, 2023 at 12:56 PM Dongwoo Kim  wrote:

> Hi all,
>
> *@Gyula*
> Thanks for the consideration Gyula. My initial idea for the CR was roughly
> like below.
> I focused on simplifying the setup in k8s environment, but I agree with
> your opinion that for the sql gateway
> we don't need custom operator logic to handle and most of the requirements
> can be met by existing k8s resources.
> So maybe helm chart that bundles all resources needed should be enough.
>
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkSqlGateway
> metadata:
>   name: flink-sql-gateway-example
>   namespace: default
> spec:
>   clusterName: flink-session-cluster-example
>   exposeServiceType: LoadBalancer
>   flinkSqlGatewayConfiguration:
> sql-gateway.endpoint.type: "hiveserver2"
> sql-gateway.endpoint.hiveserver2.catalog.name: "hive"
>   hiveConf:
> configMap:
>   name: hive-config
>   items:
> - key: hive-site.xml
>   path: hive-site.xml
>
>
> *@xiaolong, @Shammon*
> Hi xiaolong and Shammon.
> Thanks for taking the time to share.
> I'd also like to add my experience with setting up flink sql gateway on
> k8s.
> Without building a new Docker image, I've added a separate container to the
> existing JobManager pod and started the sql gateway using the
> "sql-gateway.sh start-foreground" command.
> I haven't explored deploying the sql gateway as an independent deployment
> yet, but that's something I'm considering after modifying JM's address to
> desired session cluster.
>
> Thanks all
>
> Best
> Dongwoo
>
> 2023년 9월 15일 (금) 오전 11:55, Xiaolong Wang
> 님이 작성:
>
> > Hi, Shammon,
> >
> > Yes, I want to create a Flink SQL-gateway in a job-manager.
> >
> > Currently, the above script is generally a work-around and allows me to
> > start a Flink session job manager with a SQL gateway running upon.
> >
> > I agree that it'd be more elegant that we create a new job type and
> write a
> > script, which is much easier for the user to use (since they do not need
> to
> > build a separate Flink image any more).
> >
> >
> >
> > On Fri, Sep 15, 2023 at 10:29 AM Shammon FY  wrote:
> >
> > > Hi,
> > >
> > > Currently `sql-gateway` can be started with the script `sql-gateway.sh`
> > in
> > > an existing node, it is more like a simple "standalone" node. I think
> > it's
> > > valuable if we can do more work to start it in k8s.
> > >
> > > For xiaolong:
> > > Do you want to start a sql-gateway instance in the jobmanager pod? I
> > think
> > > maybe we need a script like `kubernetes-sql-gatewah.sh` to start
> > > `sql-gateway` pods with a flink image, what do you think?
> > >
> > > Best,
> > > Shammon FY
> > >
> > >
> > > On Fri, Sep 15, 2023 at 10:02 AM Xiaolong Wang
> > >  wrote:
> > >
> > > > Hi, I've experiment this feature on K8S recently, here is some of my
> > > trial:
> > > >
> > > >
> > > > 1. Create a new kubernetes-jobmanager.sh script with the following
> > > content
> > > >
> > > > #!/usr/bin/env bash
> > > > $FLINK_HOME/bin/sql-gateway.sh start
> > > > $FLINK_HOME/bin/kubernetes-jobmanager1.sh kubernetes-session
> > > >
> > > > 2. Build your own Flink docker image something like this
> > > > FROM flink:1.17.1-scala_2.12-java11
> > > >
> > > > RUN mv $FLINK_HOME/bin/kubernetes-jobmanager.sh $FLINK_HOME/bin/
> > > > kubernetes-jobmanager1.sh
> > > > COPY ./kubernetes-jobmanager.sh
> > $FLINK_HOME/bin/kubernetes-jobmanager.sh
> > > >
> > > > RUN chmod +x $FLINK_HOME/bin/*.sh
> > > > USER flink
> > > >
> > > > 3. Create a Flink session job with the operator using the above
> image.
> > > >
> > > > On Thu, Sep 14, 2023 at 9:49 PM Gyula Fóra 
> > wrote:
> > > >
> > > > > Hi!
> > > > >
> > > > > I don't completely

Re: [Discuss] CRD for flink sql gateway in the flink k8s operator

2023-09-14 Thread Xiaolong Wang
Hi, I've experiment this feature on K8S recently, here is some of my trial:


1. Create a new kubernetes-jobmanager.sh script with the following content

#!/usr/bin/env bash
$FLINK_HOME/bin/sql-gateway.sh start
$FLINK_HOME/bin/kubernetes-jobmanager1.sh kubernetes-session

2. Build your own Flink docker image something like this
FROM flink:1.17.1-scala_2.12-java11

RUN mv $FLINK_HOME/bin/kubernetes-jobmanager.sh $FLINK_HOME/bin/
kubernetes-jobmanager1.sh
COPY ./kubernetes-jobmanager.sh $FLINK_HOME/bin/kubernetes-jobmanager.sh

RUN chmod +x $FLINK_HOME/bin/*.sh
USER flink

3. Create a Flink session job with the operator using the above image.

On Thu, Sep 14, 2023 at 9:49 PM Gyula Fóra  wrote:

> Hi!
>
> I don't completely understand what would be a content of such CRD, could
> you give a minimal example how the Flink SQL Gateway CR yaml would look
> like?
>
> Adding a CRD would mean you need to add some operator/controller logic as
> well. Why not simply use a Deployment / StatefulSet in Kubernetes?
>
> Or a Helm chart if you want to make it more user friendly?
>
> Cheers,
> Gyula
>
> On Thu, Sep 14, 2023 at 12:57 PM Dongwoo Kim 
> wrote:
>
> > Hi all,
> >
> > I've been working on setting up a flink SQL gateway in a k8s environment
> > and it got me thinking — what if we had a CRD for this?
> >
> > So I have quick questions below.
> > 1. Is there ongoing work to create a CRD for the Flink SQL Gateway?
> > 2. If not, would the community be open to considering a CRD for this?
> >
> > I've noticed a growing demand for simplified setup of the flink sql
> gateway
> > in flink's slack channel.
> > Implementing a CRD could make deployments easier and offer better
> > integration with k8s.
> >
> > If this idea is accepted, I'm open to drafting a FLIP for further
> > discussion
> >
> > Thanks for your time and looking forward to your thoughts!
> >
> > Best regards,
> > Dongwoo
> >
>