Elasticsearch SQL connector with SSL

2022-04-02 Thread Yuheng Zhong
Hi,

I recently started working on a flink job using elasticsearch sql
connector. Our elasticsearch cluster required SSL certificate to be
connected and there is no option to set cert in the current version of
elasticsearch sql connector(Elasticsearch | Apache Flink
).
May I know if there is any way to connect ES as a sink with SSL?

Thanks,

Ricardo


Re: flink 1.15

2022-04-02 Thread Zhanghao Chen
不是的哈。MVP 是 Minimum Viable Product (最简可行产品)的缩写,代表一个只实现了核心功能,听取早期用户反馈来后续进一步完善的版本。

Best,
Zhanghao Chen

From: guanyq 
Sent: Saturday, April 2, 2022 14:56
To: user-zh@flink.apache.org 
Subject: flink 1.15

看了FFA的分享(流批一体) Flink1.15版本推出 MVP版本,动态表存储的流批一体


请问MVP版本是收费版么?


Why discard checkpoints on the job finished

2022-04-02 Thread Wu Poker
Dear Flinkers:
As  "CheckpointProperties#CHECKPOINT_RETAINED_ON_CANCELLATION" shows,
if Job stopped with JobStatus#FINISHED "CompletedCheckpointStore" will
discard all completed checkpoints.
   My question is, why job on the FINISHED status the
CompletedCheckpointStore discard all completed checkpoints. What will
happen if the completed checkpoint remains?

   In my case: in Flink version 1.14.4 use StreamGraph to execute the bound
source and remain checkpoints for the next crontab trigger. Is that
possible? (Change CheckpointProperties to remain all checkpoints on the job
FINISHED)

Sincerely.


Re: The flink-kubernetes-operator vs third party flink operators

2022-04-02 Thread Gyula Fóra
Hi!

The main difference at the moment is the programming language and the APIs
used to interact with Flink.

The flink-kubernetes-operator, uses Java and interacts with Flink using the
built in (native) clients.

The other operators have been around since earlier Flink versions. They all
use Golang, and some of them have been already abandoned by the initial
developers. In many cases they also do not support the latest Flink
operational features.

With the flink-kubernetes-operator project we aimed to take inspiration
from the existing operators and create a project where Flink developers can
easily contribute and could be maintained together with Flink itself while
keeping up the high quality standards.

We hope that developers of the other operators would start contributing
soon :)

Cheers,
Gyula



On Sat, 2 Apr 2022 at 11:01, Hao t Chang  wrote:

> Hi
>
>
>
> I started looking into Flink recently more specifically the
> flink-kubernetes-operator so I only know little about it. I found at least
> 3 other Flink K8s operators that Lyft, Google, and Spotify developed.
> Could someone please enlighten me what is the difference of these third
> party Flink k8s operators ? and why don’t these parties contribute to the
> same repo in the Flink community from the beginning ? Thanks.
>
>
>
> Ted
>


The flink-kubernetes-operator vs third party flink operators

2022-04-02 Thread Hao t Chang
Hi

I started looking into Flink recently more specifically the 
flink-kubernetes-operator so I only know little about it. I found at least 3 
other Flink K8s operators that Lyft, Google, and Spotify developed.  Could 
someone please enlighten me what is the difference of these third party Flink 
k8s operators ? and why don’t these parties contribute to the same repo in the 
Flink community from the beginning ? Thanks.

Ted


Re:Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread wang
Hi,


Got it, seems this way is not flexable enough, but still thanks so much for 
your great support!  Good wished!




Regards && Thanks
Hunk








At 2022-04-02 16:34:29, "Qingsheng Ren"  wrote:
>Hi,
>
>If the schema of records is not fixed I’m afraid you have to do it in UDTF. 
>
>Best, 
>
>Qingsheng
>
>> On Apr 2, 2022, at 15:45, wang <24248...@163.com> wrote:
>> 
>> Hi,
>> 
>> Thanks for your quick response! 
>> 
>> And I tried the format "raw", seems it only support single physical column, 
>> and in our project reqiurement, there are more than one hundred columns in 
>> sink table. So I need combine those columns into one string in a single UDF?
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2022-04-02 15:17:14, "Qingsheng Ren"  wrote:
>> >Hi,
>> >
>> >You can construct the final json string in your UDTF, and write it to Kafka 
>> >sink table with only one field, which is the entire json string constructed 
>> >in UDTF, and use raw format [1] in the sink table:
>> >
>> >CREATE TABLE TableSink (
>> >`final_json_string` STRING
>> >) WITH (
>> >‘connector’ = ‘kafka’,
>> >‘format’ = ‘raw’,
>> >...
>> >)
>> >
>> >So the entire flow would be like:
>> >
>> >1. Kafka source table reads data
>> >2. UDTF parses the ‘content’ field, and construct the final json (id, 
>> >content without backslash) string you need, maybe using Jackson [2] or 
>> >other json tools
>> >3. Insert the constructed json string as the only field in sink table
>> >
>> >The key problem is that the schema of field “content” is not fixed, so you 
>> >have to complete most logics in UDTF. 
>> >
>> >[1] 
>> >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
>> >[2] https://github.com/FasterXML/jackson
>> >
>> >Best regards, 
>> >
>> >Qingsheng
>> >
>> >
>> >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
>> >> 
>> >> Hi,
>> >> 
>> >> Thanks so much for your support! 
>> >> 
>> >> But sorry to say I'm still confused about it. No matter what the udf 
>> >> looks like, the first thing I need confirm is the type of 'content' in 
>> >> TableSink, what's the type of it should be, should I use type Row, like 
>> >> this?
>> >> 
>> >>  CREATE TABLE TableSink (
>> >>   `id` STRING NOT NULL,
>> >>   `content` ROW
>> >>  )
>> >>  WITH (
>> >>  ...
>> >> );
>> >> 
>> >> This type is only suitable for source input {"schema": "schema_infos", 
>> >> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> >> 
>> >> But the json key name and format of 'content' in source is variable, if 
>> >> the source input is 
>> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> >> 
>> >> I should define `content` in TableSink with type `content` ROW> >> STRING, BackgroundColor STRING, Height BIGINT>, like this:
>> >> 
>> >>  CREATE TABLE TableSink (
>> >>   `id` STRING NOT NULL,
>> >>   `content` ROW> >> BIGINT>
>> >>  )
>> >>  WITH (
>> >>  ...
>> >> );
>> >> 
>> >> And in input json also might contains json array, like: 
>> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
>> >> 
>> >> 
>> >> So is there some common type I can use which can handle all input json 
>> >> formats?  
>> >> 
>> >> Thanks so much!!
>> >> 
>> >> 
>> >> Thanks && Regards,
>> >> Hunk
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> 
>> >> At 2022-04-01 17:25:59, "Qingsheng Ren" > >> > wrote:
>> >> >Hi, 
>> >> >
>> >> >I’m afraid you have to use a UDTF to parse the content and construct the 
>> >> >final json string manually. The key problem is that the field “content” 
>> >> >is actually a STRING, although it looks like a json object. Currently 
>> >> >the json format provided by Flink could not handle this kind of field 
>> >> >defined as STRING. Also considering the schema of this “content” field 
>> >> >is not fixed across records, Flink SQL can’t use one DDL to consume / 
>> >> >produce records with changing schema. 
>> >> >
>> >> >Cheers,
>> >> >
>> >> >Qingsheng
>> >> >
>> >> >> On Mar 31, 2022, at 21:42, wang <
>> >> 24248...@163.com
>> >> > wrote:
>> >> >> 
>> >> >> Hi dear engineer,
>> >> >> 
>> >> >> Thanks so much for your precious time reading my email!
>> >> >> Resently I'm working on the Flink sql (with version 1.13) in my 
>> >> >> project and encountered one problem about json format data, hope you 
>> >> >> can take a look, thanks! Below is the description of my issue.
>> >> >> 
>> >> >> I use kafka as source and sink, I define kafka source table like this:
>> >> >> 
>> >> >>  CREATE TABLE TableSource (
>> >> >>   schema STRING,
>> >> >>   payload ROW(
>> >> >>   `id` STRING,
>> 

Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread Qingsheng Ren
Hi,

If the schema of records is not fixed I’m afraid you have to do it in UDTF. 

Best, 

Qingsheng

> On Apr 2, 2022, at 15:45, wang <24248...@163.com> wrote:
> 
> Hi,
> 
> Thanks for your quick response! 
> 
> And I tried the format "raw", seems it only support single physical column, 
> and in our project reqiurement, there are more than one hundred columns in 
> sink table. So I need combine those columns into one string in a single UDF?
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> 
> At 2022-04-02 15:17:14, "Qingsheng Ren"  wrote:
> >Hi,
> >
> >You can construct the final json string in your UDTF, and write it to Kafka 
> >sink table with only one field, which is the entire json string constructed 
> >in UDTF, and use raw format [1] in the sink table:
> >
> >CREATE TABLE TableSink (
> >`final_json_string` STRING
> >) WITH (
> >‘connector’ = ‘kafka’,
> >‘format’ = ‘raw’,
> >...
> >)
> >
> >So the entire flow would be like:
> >
> >1. Kafka source table reads data
> >2. UDTF parses the ‘content’ field, and construct the final json (id, 
> >content without backslash) string you need, maybe using Jackson [2] or other 
> >json tools
> >3. Insert the constructed json string as the only field in sink table
> >
> >The key problem is that the schema of field “content” is not fixed, so you 
> >have to complete most logics in UDTF. 
> >
> >[1] 
> >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
> >[2] https://github.com/FasterXML/jackson
> >
> >Best regards, 
> >
> >Qingsheng
> >
> >
> >> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
> >> 
> >> Hi,
> >> 
> >> Thanks so much for your support! 
> >> 
> >> But sorry to say I'm still confused about it. No matter what the udf looks 
> >> like, the first thing I need confirm is the type of 'content' in 
> >> TableSink, what's the type of it should be, should I use type Row, like 
> >> this?
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` ROW
> >>  )
> >>  WITH (
> >>  ...
> >> );
> >> 
> >> This type is only suitable for source input {"schema": "schema_infos", 
> >> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> >> 
> >> But the json key name and format of 'content' in source is variable, if 
> >> the source input is 
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> >> 
> >> I should define `content` in TableSink with type `content` ROW >> STRING, BackgroundColor STRING, Height BIGINT>, like this:
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` ROW >> BIGINT>
> >>  )
> >>  WITH (
> >>  ...
> >> );
> >> 
> >> And in input json also might contains json array, like: 
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> >> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> >> 
> >> 
> >> So is there some common type I can use which can handle all input json 
> >> formats?  
> >> 
> >> Thanks so much!!
> >> 
> >> 
> >> Thanks && Regards,
> >> Hunk
> >> 
> >> 
> >> 
> >> 
> >> 
> >> 
> >> At 2022-04-01 17:25:59, "Qingsheng Ren"  >> > wrote:
> >> >Hi, 
> >> >
> >> >I’m afraid you have to use a UDTF to parse the content and construct the 
> >> >final json string manually. The key problem is that the field “content” 
> >> >is actually a STRING, although it looks like a json object. Currently the 
> >> >json format provided by Flink could not handle this kind of field defined 
> >> >as STRING. Also considering the schema of this “content” field is not 
> >> >fixed across records, Flink SQL can’t use one DDL to consume / produce 
> >> >records with changing schema. 
> >> >
> >> >Cheers,
> >> >
> >> >Qingsheng
> >> >
> >> >> On Mar 31, 2022, at 21:42, wang <
> >> 24248...@163.com
> >> > wrote:
> >> >> 
> >> >> Hi dear engineer,
> >> >> 
> >> >> Thanks so much for your precious time reading my email!
> >> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
> >> >> and encountered one problem about json format data, hope you can take a 
> >> >> look, thanks! Below is the description of my issue.
> >> >> 
> >> >> I use kafka as source and sink, I define kafka source table like this:
> >> >> 
> >> >>  CREATE TABLE TableSource (
> >> >>   schema STRING,
> >> >>   payload ROW(
> >> >>   `id` STRING,
> >> >>   `content` STRING
> >> >>  )
> >> >>  )
> >> >>  WITH (
> >> >>  'connector' = 'kafka',
> >> >>  'topic' = 'topic_source',
> >> >>  'properties.bootstrap.servers' = 'localhost:9092',
> >> >>  '
> >> properties.group.id
> >> ' = 'all_gp',
> >> >>  'scan.startup.mode' = 

Re: Re: flink jdbc source oom

2022-04-02 Thread r pp
我觉得 流处理中,无论是一个一个处理,还是一批一批处理,强调了 连续性,自定义sql 在连续性的保证上,想到的比较好的方式是自增 id
的方式(这就意味着只接受 insert 操作),而在一批数据中 排序、去重,其实对于整体而言 收效不好说, 除非
每一批数据都严格的分区(如不同日期),不过过滤是有好处的。

Michael Ran  于2022年4月1日周五 11:00写道:

> 这个当初提过自定义SQL 数据集,但是社区否定了这种做法- -,但是从功能上来说,我们也是实现的自定义SQL结果集,进行join
> 之类的操作,在大数据集,以及一些数据排序、剔除重复等场景有一定优势
> 在 2022-04-01 10:12:55,"Lincoln Lee"  写道:
> >@Peihui  当前社区的 jdbc table source 实现了这些接口:
> >ScanTableSource,
> >LookupTableSource,
> >SupportsProjectionPushDown,
> >SupportsLimitPushDown
> >
> >其中 lookup table source 用于维表的 kv lookup 查询,  scan table source 支持了
> >projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
> >filter/aggregate pushdown 满足前置过滤需求
> >
> >
> >Best,
> >Lincoln Lee
> >
> >
> >r pp  于2022年3月31日周四 18:40写道:
> >
> >> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
> >>
>


-- 
Best,
  pp


Re: Re: flink jdbc source oom

2022-04-02 Thread r pp
我觉得 流处理中,无论是一个一个处理,还是一批一批处理,强调了 连续性,自定义sql 在连续性的保证上,想到的比较好的方式是自增 id
的方式(这就意味着只接受 insert 操作),而在一批数据中 排序、去重,其实对于整体而言 收效不好说, 除非
每一批数据都严格的分区(如不同日期),不过过滤是有好处的。

Michael Ran  于2022年4月1日周五 11:00写道:

> 这个当初提过自定义SQL 数据集,但是社区否定了这种做法- -,但是从功能上来说,我们也是实现的自定义SQL结果集,进行join
> 之类的操作,在大数据集,以及一些数据排序、剔除重复等场景有一定优势
> 在 2022-04-01 10:12:55,"Lincoln Lee"  写道:
> >@Peihui  当前社区的 jdbc table source 实现了这些接口:
> >ScanTableSource,
> >LookupTableSource,
> >SupportsProjectionPushDown,
> >SupportsLimitPushDown
> >
> >其中 lookup table source 用于维表的 kv lookup 查询,  scan table source 支持了
> >projection 和 limit 下推, 如果有需求做其他 pushdown.可以尝试自行扩展 connector 来实现比如
> >filter/aggregate pushdown 满足前置过滤需求
> >
> >
> >Best,
> >Lincoln Lee
> >
> >
> >r pp  于2022年3月31日周四 18:40写道:
> >
> >> hi,不是很清楚你的问题~ 你的数据量很大,是多久的一天,还是一秒,source怎么就无力了
> >>
>


-- 
Best,
  pp


Re: Helm install flink-kubernetes-operator failed

2022-04-02 Thread spoon_lz




Thanks for everyone's help, by executing:


helm repo add operator-rc3 
https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-0.1.0-rc3

Now it works correctly
On 04/2/2022 15:31,Gyula Fóra wrote:
As Biao Geng said this will be much easier after the first preview release. 
Which should become available on monday if all works out :)

Until then you can also test our last release candidate which will hopefully 
become the release:

helm repo add operator-rc3 
https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-0.1.0-rc3
helm install flink-operator operator-rc3/flink-kubernetes-operator

These commands will stop working once the release is finalized and then you can 
move to that :)

Cheers,
Gyi;a


On Sat, Apr 2, 2022 at 9:26 AM Geng Biao  wrote:


Hi Spoon,

 

The command in current doc (helm install flink-kubernetes-operator 
helm/flink-kubernetes-operator) should be executed under the repo’s root dir 
(e.g. ~/flink-kubernetes-operator/).

The community are working on to make this process 
simpler(https://github.com/apache/flink-kubernetes-operator/pull/143). You 
should be able to helm install it directly later.

 

Best,

Biao Geng

 

发件人: spoon_lz 
日期:星期六, 2022年4月2日下午3:11
收件人: user 
主题: Helm install flink-kubernetes-operator failed

 

Hi, I am trying it according to the official documentation of 
flink-kubernetes-operator, according to the description in 'Quick Start', when 
I execute the command:

|

helm install flink-kubernetes-operator helm/flink-kubernetes-operator

|

it returns an error:

|

Error: INSTALLATION FAILED: failed to download "helm/flink-kubernetes-operator

|

Executing 'helm repo list’ returned:

|

NAME   URL
bitnami  https://charts.bitnami.com/bitnami
stablehttps://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts

|

and the command 'helm repo update' has been executed.

 

But using the commands 'helm search repo flink' and 'helm search repo 
helm/flink-kubernetes-operator' both returned:

|

No results found

|


Is my repository configured incorrectly? Hope to get your help

 

 

Re:Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread wang
Hi,


Thanks for your quick response! 


And I tried the format "raw", seems it only support single physical column, and 
in our project reqiurement, there are more than one hundred columns in sink 
table. So I need combine those columns into one string in a single UDF?


Thanks && Regards,
Hunk

















At 2022-04-02 15:17:14, "Qingsheng Ren"  wrote:
>Hi,
>
>You can construct the final json string in your UDTF, and write it to Kafka 
>sink table with only one field, which is the entire json string constructed in 
>UDTF, and use raw format [1] in the sink table:
>
>CREATE TABLE TableSink (
>`final_json_string` STRING
>) WITH (
>‘connector’ = ‘kafka’,
>‘format’ = ‘raw’,
>...
>)
>
>So the entire flow would be like:
>
>1. Kafka source table reads data
>2. UDTF parses the ‘content’ field, and construct the final json (id, content 
>without backslash) string you need, maybe using Jackson [2] or other json tools
>3. Insert the constructed json string as the only field in sink table
>
>The key problem is that the schema of field “content” is not fixed, so you 
>have to complete most logics in UDTF. 
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
>[2] https://github.com/FasterXML/jackson
>
>Best regards, 
>
>Qingsheng
>
>
>> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
>> 
>> Hi,
>> 
>> Thanks so much for your support! 
>> 
>> But sorry to say I'm still confused about it. No matter what the udf looks 
>> like, the first thing I need confirm is the type of 'content' in TableSink, 
>> what's the type of it should be, should I use type Row, like this?
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` ROW
>>  )
>>  WITH (
>>  ...
>> );
>> 
>> This type is only suitable for source input {"schema": "schema_infos", 
>> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> But the json key name and format of 'content' in source is variable, if the 
>> source input is 
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> I should define `content` in TableSink with type `content` ROW> BackgroundColor STRING, Height BIGINT>, like this:
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` ROW
>>  )
>>  WITH (
>>  ...
>> );
>> 
>> And in input json also might contains json array, like: 
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
>> 
>> 
>> So is there some common type I can use which can handle all input json 
>> formats?  
>> 
>> Thanks so much!!
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2022-04-01 17:25:59, "Qingsheng Ren" > > wrote:
>> >Hi, 
>> >
>> >I’m afraid you have to use a UDTF to parse the content and construct the 
>> >final json string manually. The key problem is that the field “content” is 
>> >actually a STRING, although it looks like a json object. Currently the json 
>> >format provided by Flink could not handle this kind of field defined as 
>> >STRING. Also considering the schema of this “content” field is not fixed 
>> >across records, Flink SQL can’t use one DDL to consume / produce records 
>> >with changing schema. 
>> >
>> >Cheers,
>> >
>> >Qingsheng
>> >
>> >> On Mar 31, 2022, at 21:42, wang <
>> 24248...@163.com
>> > wrote:
>> >> 
>> >> Hi dear engineer,
>> >> 
>> >> Thanks so much for your precious time reading my email!
>> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
>> >> and encountered one problem about json format data, hope you can take a 
>> >> look, thanks! Below is the description of my issue.
>> >> 
>> >> I use kafka as source and sink, I define kafka source table like this:
>> >> 
>> >>  CREATE TABLE TableSource (
>> >>   schema STRING,
>> >>   payload ROW(
>> >>   `id` STRING,
>> >>   `content` STRING
>> >>  )
>> >>  )
>> >>  WITH (
>> >>  'connector' = 'kafka',
>> >>  'topic' = 'topic_source',
>> >>  'properties.bootstrap.servers' = 'localhost:9092',
>> >>  '
>> properties.group.id
>> ' = 'all_gp',
>> >>  'scan.startup.mode' = 'group-offsets',
>> >>  'format' = 'json',
>> >>  'json.fail-on-missing-field' = 'false',
>> >>  'json.ignore-parse-errors' = 'true'
>> >>  );
>> >> 
>> >> Define the kafka sink table like this:
>> >> 
>> >>  CREATE TABLE TableSink (
>> >>   `id` STRING NOT NULL,
>> >>   `content` STRING NOT NULL
>> >>  )
>> >>  WITH (
>> >>  'connector' = 'kafka',
>> >>  'topic' = 'topic_sink',
>> >>  

Re: Helm install flink-kubernetes-operator failed

2022-04-02 Thread Gyula Fóra
As Biao Geng said this will be much easier after the first preview release.
Which should become available on monday if all works out :)

Until then you can also test our last release candidate which will
hopefully become the release:

helm repo add operator-rc3
https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-0.1.0-rc3
helm install flink-operator operator-rc3/flink-kubernetes-operator

These commands will stop working once the release is finalized and then you
can move to that :)

Cheers,
Gyi;a

On Sat, Apr 2, 2022 at 9:26 AM Geng Biao  wrote:

> Hi Spoon,
>
>
>
> The command in current doc (helm install flink-kubernetes-operator
> helm/flink-kubernetes-operator) should be executed under the repo’s root
> dir (e.g. ~/flink-kubernetes-operator/).
>
> The community are working on to make this process simpler(
> https://github.com/apache/flink-kubernetes-operator/pull/143). You should
> be able to helm install it directly later.
>
>
>
> Best,
>
> Biao Geng
>
>
>
> *发件人**:* spoon_lz 
> *日期**:* 星期六, 2022年4月2日 下午3:11
> *收件人**:* user 
> *主题**:* Helm install flink-kubernetes-operator failed
>
>
>
> Hi, I am trying it according to the official documentation of
> flink-kubernetes-operator, according to the description in 'Quick Start',
> when I execute the command:
>
> helm install flink-kubernetes-operator helm/flink-kubernetes-operator
>
> it returns an error:
>
> Error: INSTALLATION FAILED: failed to download
> "helm/flink-kubernetes-operator
>
> Executing 'helm repo list’ returned:
>
> NAME   URL
> bitnami  https://charts.bitnami.com/bitnami
> stablehttps://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts
>
> and the command 'helm repo update' has been executed.
>
>
>
> But using the commands 'helm search repo flink' and 'helm search repo
> helm/flink-kubernetes-operator' both returned:
>
> No results found
>
>
> Is my repository configured incorrectly? Hope to get your help
>
>
>
>
>


答复: Helm install flink-kubernetes-operator failed

2022-04-02 Thread Geng Biao
Hi Spoon,

The command in current doc (helm install flink-kubernetes-operator 
helm/flink-kubernetes-operator) should be executed under the repo’s root dir 
(e.g. ~/flink-kubernetes-operator/).
The community are working on to make this process 
simpler(https://github.com/apache/flink-kubernetes-operator/pull/143). You 
should be able to helm install it directly later.

Best,
Biao Geng

发件人: spoon_lz 
日期: 星期六, 2022年4月2日 下午3:11
收件人: user 
主题: Helm install flink-kubernetes-operator failed

Hi, I am trying it according to the official documentation of 
flink-kubernetes-operator, according to the description in 'Quick Start', when 
I execute the command:
helm install flink-kubernetes-operator helm/flink-kubernetes-operator
it returns an error:
Error: INSTALLATION FAILED: failed to download "helm/flink-kubernetes-operator
Executing 'helm repo list’ returned:
NAME   URL
bitnami  https://charts.bitnami.com/bitnami
stablehttps://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts
and the command 'helm repo update' has been executed.

But using the commands 'helm search repo flink' and 'helm search repo 
helm/flink-kubernetes-operator' both returned:
No results found

Is my repository configured incorrectly? Hope to get your help




Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread Qingsheng Ren
Hi,

You can construct the final json string in your UDTF, and write it to Kafka 
sink table with only one field, which is the entire json string constructed in 
UDTF, and use raw format [1] in the sink table:

CREATE TABLE TableSink (
`final_json_string` STRING
) WITH (
‘connector’ = ‘kafka’,
‘format’ = ‘raw’,
...
)

So the entire flow would be like:

1. Kafka source table reads data
2. UDTF parses the ‘content’ field, and construct the final json (id, content 
without backslash) string you need, maybe using Jackson [2] or other json tools
3. Insert the constructed json string as the only field in sink table

The key problem is that the schema of field “content” is not fixed, so you have 
to complete most logics in UDTF. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/raw/
[2] https://github.com/FasterXML/jackson

Best regards, 

Qingsheng


> On Apr 2, 2022, at 14:47, wang <24248...@163.com> wrote:
> 
> Hi,
> 
> Thanks so much for your support! 
> 
> But sorry to say I'm still confused about it. No matter what the udf looks 
> like, the first thing I need confirm is the type of 'content' in TableSink, 
> what's the type of it should be, should I use type Row, like this?
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` ROW
>  )
>  WITH (
>  ...
> );
> 
> This type is only suitable for source input {"schema": "schema_infos", 
> "payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}
> 
> But the json key name and format of 'content' in source is variable, if the 
> source input is 
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
> 
> I should define `content` in TableSink with type `content` ROW BackgroundColor STRING, Height BIGINT>, like this:
> 
>  CREATE TABLE TableSink (
>   `id` STRING NOT NULL,
>   `content` ROW
>  )
>  WITH (
>  ...
> );
> 
> And in input json also might contains json array, like: 
> {"schema": "schema_infos", "payload": {"id": "1", "content": 
> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}
> 
> 
> So is there some common type I can use which can handle all input json 
> formats?  
> 
> Thanks so much!!
> 
> 
> Thanks && Regards,
> Hunk
> 
> 
> 
> 
> 
> 
> At 2022-04-01 17:25:59, "Qingsheng Ren"  > wrote:
> >Hi, 
> >
> >I’m afraid you have to use a UDTF to parse the content and construct the 
> >final json string manually. The key problem is that the field “content” is 
> >actually a STRING, although it looks like a json object. Currently the json 
> >format provided by Flink could not handle this kind of field defined as 
> >STRING. Also considering the schema of this “content” field is not fixed 
> >across records, Flink SQL can’t use one DDL to consume / produce records 
> >with changing schema. 
> >
> >Cheers,
> >
> >Qingsheng
> >
> >> On Mar 31, 2022, at 21:42, wang <
> 24248...@163.com
> > wrote:
> >> 
> >> Hi dear engineer,
> >> 
> >> Thanks so much for your precious time reading my email!
> >> Resently I'm working on the Flink sql (with version 1.13) in my project 
> >> and encountered one problem about json format data, hope you can take a 
> >> look, thanks! Below is the description of my issue.
> >> 
> >> I use kafka as source and sink, I define kafka source table like this:
> >> 
> >>  CREATE TABLE TableSource (
> >>   schema STRING,
> >>   payload ROW(
> >>   `id` STRING,
> >>   `content` STRING
> >>  )
> >>  )
> >>  WITH (
> >>  'connector' = 'kafka',
> >>  'topic' = 'topic_source',
> >>  'properties.bootstrap.servers' = 'localhost:9092',
> >>  '
> properties.group.id
> ' = 'all_gp',
> >>  'scan.startup.mode' = 'group-offsets',
> >>  'format' = 'json',
> >>  'json.fail-on-missing-field' = 'false',
> >>  'json.ignore-parse-errors' = 'true'
> >>  );
> >> 
> >> Define the kafka sink table like this:
> >> 
> >>  CREATE TABLE TableSink (
> >>   `id` STRING NOT NULL,
> >>   `content` STRING NOT NULL
> >>  )
> >>  WITH (
> >>  'connector' = 'kafka',
> >>  'topic' = 'topic_sink',
> >>  'properties.bootstrap.servers' = 'localhost:9092',
> >>  'format' = 'json',
> >>  'json.fail-on-missing-field' = 'false',
> >>  'json.ignore-parse-errors' = 'true'
> >> );
> >> 
> >> 
> >> Then insert into TableSink with data from TableSource:
> >> INSERT INTO TableSink SELECT id, content FROM TableSource;
> >> 
> >> Then I use "kafka-console-producer.sh" to produce data below into topic 
> >> "topic_source" (TableSource):
> >> {"schema": "schema_infos", "payload": {"id": "1", "content": 

Helm install flink-kubernetes-operator failed

2022-04-02 Thread spoon_lz


Hi, I am trying it according to the official documentation of 
flink-kubernetes-operator, according to the description in 'Quick Start', when 
I execute the command:
| helm install flink-kubernetes-operator helm/flink-kubernetes-operator |
it returns an error:
| Error: INSTALLATION FAILED: failed to download 
"helm/flink-kubernetes-operator |
Executing 'helm repo list’ returned:

| NAME   URL
bitnami  https://charts.bitnami.com/bitnami
stablehttps://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts |
and the command 'helm repo update' has been executed.


But using the commands 'helm search repo flink' and 'helm search repo 
helm/flink-kubernetes-operator' both returned:
| No results found |

Is my repository configured incorrectly? Hope to get your help





flink 1.15

2022-04-02 Thread guanyq
看了FFA的分享(流批一体) Flink1.15版本推出 MVP版本,动态表存储的流批一体


请问MVP版本是收费版么?

Re:Re: Could you please give me a hand about json object in flink sql

2022-04-02 Thread wang
Hi,




Thanks so much for your support! 




But sorry to say I'm still confused about it. No matter what the udf looks 
like, the first thing I need confirm is the type of 'content' in TableSink, 
what's the type of it should be, should I use type Row, like this?




 CREATE TABLE TableSink (

  `id` STRING NOT NULL,

  `content` ROW

 )

 WITH (

 ...

);




This type is only suitable for source input {"schema": "schema_infos", 
"payload": {"id": "1", "content": "{\"name\":\"Jone\",\"age\":20}"}}




But the json key name and format of 'content' in source is variable, if the 
source input is 

{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}




I should define `content` in TableSink with type `content` ROW, like this:




 CREATE TABLE TableSink (

  `id` STRING NOT NULL,

  `content` ROW

 )

 WITH (

 ...

);



And in input json also might contains json array, like: 
{"schema": "schema_infos", "payload": {"id": "1", "content": 
"{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30",\"detail\":[{\"value_type\":1,\"value_name\":\"AAA\"},{\"value_type\":2,\"value_name\":\"BBB\"}]}}




So is there some common type I can use which can handle all input json formats? 
 


Thanks so much!!







Thanks && Regards,

Hunk

















At 2022-04-01 17:25:59, "Qingsheng Ren"  wrote:
>Hi, 
>
>I’m afraid you have to use a UDTF to parse the content and construct the final 
>json string manually. The key problem is that the field “content” is actually 
>a STRING, although it looks like a json object. Currently the json format 
>provided by Flink could not handle this kind of field defined as STRING. Also 
>considering the schema of this “content” field is not fixed across records, 
>Flink SQL can’t use one DDL to consume / produce records with changing schema. 
>
>Cheers,
>
>Qingsheng
>
>> On Mar 31, 2022, at 21:42, wang <24248...@163.com> wrote:
>> 
>> Hi dear engineer,
>> 
>> Thanks so much for your precious time reading my email!
>> Resently I'm working on the Flink sql (with version 1.13) in my project and 
>> encountered one problem about json format data, hope you can take a look, 
>> thanks! Below is the description of my issue.
>> 
>> I use kafka as source and sink, I define kafka source table like this:
>> 
>>  CREATE TABLE TableSource (
>>   schema STRING,
>>   payload ROW(
>>   `id` STRING,
>>   `content` STRING
>>  )
>>  )
>>  WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'topic_source',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'properties.group.id' = 'all_gp',
>>  'scan.startup.mode' = 'group-offsets',
>>  'format' = 'json',
>>  'json.fail-on-missing-field' = 'false',
>>  'json.ignore-parse-errors' = 'true'
>>  );
>> 
>> Define the kafka sink table like this:
>> 
>>  CREATE TABLE TableSink (
>>   `id` STRING NOT NULL,
>>   `content` STRING NOT NULL
>>  )
>>  WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'topic_sink',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>>  'format' = 'json',
>>  'json.fail-on-missing-field' = 'false',
>>  'json.ignore-parse-errors' = 'true'
>> );
>> 
>> 
>> Then insert into TableSink with data from TableSource:
>> INSERT INTO TableSink SELECT id, content FROM TableSource;
>> 
>> Then I use "kafka-console-producer.sh" to produce data below into topic 
>> "topic_source" (TableSource):
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"name\":\"Jone\",\"age\":20}"}}
>> 
>> 
>> Then I listen on topic_sink (TableSink) by kafka-console-consumer.sh, the 
>> output is:
>> {"id":"1","content":"{\"name\":\"Jone\",\"age\":20}"}
>> 
>> But what I want here is {"id":"1","content": {"name":"Jone","age":20}}
>> I want the the value of "content" is json object, not json string.
>> 
>> And what's more, the format of "content" in TableSource is not fixed, it can 
>> be any json formated(or json array format) string, such as:
>> {"schema": "schema_infos", "payload": {"id": "1", "content": 
>> "{\"color\":\"Red\",\"BackgroundColor\":\"Yellow\",\"Height\":30"}}
>> 
>> 
>> So my question is, how can I transform json format string(like 
>> "{\"name\":\"Jone\",\"age\":20}")  from TableSource to json object 
>> (like{"name":"Jone","age":20} ).
>> 
>> 
>> Thanks && Regards,
>> Hunk
>> 
>> 
>> 
>> 
>>