Re: Issue with flink 1.16 and hive dialect

2023-07-16 Thread yuxia
Hi, Ram. 
Thanks for reaching out. 
1: 
About Hive dialect issue, may be you're using JDK11? 
There's a known issue in FLINK-27450[1]. The main reason that Hive dosen't 
fully support JDK11. More specific to your case, it has been tracked in 
HIVE-21584[2]. 
Flink has upgrade the Hive 2.x version to 2.3.9 to include this patch. But 
unfortunately, IIRC, this patch is still not available in Hive 3.x. 

2: 
About the creating table issue, thanks for reporting it. I tried it and it 
turns out that it's a bug. I have created FLINK-32596 [3] to trace it. 
It only happen with Flink dialect & partitioned table & Hive Catalog. 
In most case, we recommend user to use Hive dialect to created hive tables, 
then we miss the test to cover use Flink dialect to create partitioed table in 
Hive Catalog. So this bug has been hiden for a while. 
For your case, as a work around, I think you can try to create the table in 
Hive itself with the following SQL: 
CREATE TABLE testsource( 
`geo_altitude` FLOAT 
) 
PARTITIONED by ( `date` STRING) tblproperties ( 
'sink.partition-commit.delay'='1 s', 
'sink.partition-commit.policy.kind'='metastore,success-file'); 


[1] https://issues.apache.org/jira/browse/FLINK-27450 
[2] https://issues.apache.org/jira/browse/HIVE-21584 
[3] https://issues.apache.org/jira/browse/FLINK-32596 


Best regards, 
Yuxia 


发件人: "ramkrishna vasudevan"  
收件人: "User" , "dev"  
发送时间: 星期五, 2023年 7 月 14日 下午 8:46:20 
主题: Issue with flink 1.16 and hive dialect 

Hi All, 
I am not sure if this was already discussed in this forum. 
In our set up with 1.16.0 flink we have ensured that the setup has all the 
necessary things for Hive catalog to work. 

The flink dialect works fine functionally (with some issues will come to that 
later). 

But when i follow the steps here in [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hive-dialect/queries/overview/#examples
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hive-dialect/queries/overview/#examples
 ] 
I am getting an exception once i set to hive dialect 
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
[flink-sql-client-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT] 
Caused by: java.lang.ClassCastException: class 
jdk.internal.loader.ClassLoaders$AppClassLoader cannot be cast to class 
java.net.URLClassLoader (jdk.internal.loader.ClassLoaders$AppClassLoader and 
java.net.URLClassLoader are in module java.base of loader 'bootstrap') 
at org.apache.hadoop.hive.ql.session.SessionState.(SessionState.java:413) 
~[flink-sql-connector-hive-3.1.2_2.12-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT]
 
at org.apache.hadoop.hive.ql.session.SessionState.(SessionState.java:389) 
~[flink-sql-connector-hive-3.1.2_2.12-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT]
 
at 
org.apache.flink.table.planner.delegation.hive.HiveSessionState.(HiveSessionState.java:80)
 
~[flink-sql-connector-hive-3.1.2_2.12-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT]
 
at 
org.apache.flink.table.planner.delegation.hive.HiveSessionState.startSessionState(HiveSessionState.java:128)
 
~[flink-sql-connector-hive-3.1.2_2.12-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT]
 
at 
org.apache.flink.table.planner.delegation.hive.HiveParser.parse(HiveParser.java:210)
 
~[flink-sql-connector-hive-3.1.2_2.12-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT]
 
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
 ~[flink-sql-client-1.16.0-0.0-SNAPSHOT.jar:1.16.0-0.0-SNAPSHOT] 

I have ensured the dialect related steps are completed followed including 
[ https://issues.apache.org/jira/browse/FLINK-25128 | 
https://issues.apache.org/jira/browse/FLINK-25128 ] 

In the flink catalog - if we create a table 
> CREATE TABLE testsource( 
> 
> `date` STRING, 
> `geo_altitude` FLOAT 
> ) 
> PARTITIONED by ( `date`) 
> 
> WITH ( 
> 
> 'connector' = 'hive', 
> 'sink.partition-commit.delay'='1 s', 
> 'sink.partition-commit.policy.kind'='metastore,success-file' 
> ); 

The parition always gets created on the last set of columns and not on the 
columns that we specify. Is this a known bug? 

Regards 
Ram 



Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-03 Thread yuxia
Congratulations! 

Best regards, 
Yuxia 


发件人: "Pushpa Ramakrishnan"  
收件人: "Xintong Song"  
抄送: "dev" , "User"  
发送时间: 星期一, 2023年 7 月 03日 下午 8:36:30 
主题: Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award 

Congratulations \uD83E\uDD73 



On 03-Jul-2023, at 3:30 PM, Xintong Song  wrote: 





BQ_BEGIN

Dear Community, 

I'm pleased to share this good news with everyone. As some of you may have 
already heard, Apache Flink has won the 2023 SIGMOD Systems Award [1]. 

"Apache Flink greatly expanded the use of stream data-processing." -- SIGMOD 
Awards Committee 

SIGMOD is one of the most influential data management research conferences in 
the world. The Systems Award is awarded to an individual or set of individuals 
to recognize the development of a software or hardware system whose technical 
contributions have had significant impact on the theory or practice of 
large-scale data management systems. Winning of the award indicates the high 
recognition of Flink's technological advancement and industry influence from 
academia. 

As an open-source project, Flink wouldn't have come this far without the wide, 
active and supportive community behind it. Kudos to all of us who helped make 
this happen, including the over 1,400 contributors and many others who 
contributed in ways beyond code. 



Best, 

Xintong (on behalf of the Flink PMC) 




[1] [ https://sigmod.org/2023-sigmod-systems-award/ | 
https://sigmod.org/2023-sigmod-systems-award/ ] 



BQ_END



Re: [Slack] Request to upload new invitation link

2023-06-28 Thread yuxia
Hi, Stephen. 
Welcome to join Flink Slack channel. Here's my invitation link: 
https://join.slack.com/t/apache-flink/shared_invite/zt-1y7kmx7te-zUg1yfLdGu3Th9En_p4n~g
 

Best regards, 
Yuxia 


发件人: "Stephen Chu"  
收件人: "User"  
抄送: "Satyam Shanker" , "Vaibhav Gosain" 
, "Steve Jiang"  
发送时间: 星期四, 2023年 6 月 29日 上午 12:49:21 
主题: [Slack] Request to upload new invitation link 

Hi there, 
I'd love to join the Flink Slack channel, but it seems the link is outdated: [ 
https://join.slack.com/t/apache-flink/shared_invite/zt-1thin01ch-tYuj6Zwu8qf0QsivHY0anw
 | 
https://join.slack.com/t/apache-flink/shared_invite/zt-1thin01ch-tYuj6Zwu8qf0QsivHY0anw
 ] 

Would someone be able to update or send me a new invite link? 

Thanks, 
Stephen 



Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

2023-05-31 Thread yuxia
Hi, Jingsong. It's hard to provide an option regarding to the fact that we also 
want to decouple Hive with flink planner. 
If we still need this fall back behavior, we will still depend on `ParserImpl` 
provided by flink-table-planner  on HiveParser.
But to try best to minimize the impact to users and more user-friendly, I'll 
remind users may use set table.sql-dialect = default to switch to Flink's 
default dialect in error message when fail to parse the sql in HiveParser.

Best regards,
Yuxia

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jingsong Li" 
收件人: "Rui Li" 
抄送: "dev" , "yuxia" , 
"User" 
发送时间: 星期二, 2023年 5 月 30日 下午 3:21:56
主题: Re: [DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

+1, the fallback looks weird now, it is outdated.

But, it is good to provide an option. I don't know if there are some
users who depend on this fallback.

Best,
Jingsong

On Tue, May 30, 2023 at 1:47 PM Rui Li  wrote:
>
> +1, the fallback was just intended as a temporary workaround to run 
> catalog/module related statements with hive dialect.
>
> On Mon, May 29, 2023 at 3:59 PM Benchao Li  wrote:
>>
>> Big +1 on this, thanks yuxia for driving this!
>>
>> yuxia  于2023年5月29日周一 14:55写道:
>>
>> > Hi, community.
>> >
>> > I want to start the discussion about Hive dialect shouldn't fall back to
>> > Flink's default dialect.
>> >
>> > Currently, when the HiveParser fail to parse the sql in Hive dialect,
>> > it'll fall back to Flink's default parser[1] to handle flink-specific
>> > statements like "CREATE CATALOG xx with (xx);".
>> >
>> > As I‘m involving with Hive dialect and have some communication with
>> > community users who use Hive dialectrecently,  I'm thinking throw exception
>> > directly instead of falling back to Flink's default dialect when fail to
>> > parse the sql in Hive dialect
>> >
>> > Here're some reasons:
>> >
>> > First of all, it'll hide some error with Hive dialect. For example, we
>> > found we can't use Hive dialect any more with Flink sql client in release
>> > validation phase[2], finally we find a modification in Flink sql client
>> > cause it, but our test case can't find it earlier for although HiveParser
>> > faill to parse it but then it'll fall back to default parser and pass test
>> > case successfully.
>> >
>> > Second, conceptually, Hive dialect should be do nothing with Flink's
>> > default dialect. They are two totally different dialect. If we do need a
>> > dialect mixing Hive dialect and default dialect , may be we need to propose
>> > a new hybrid dialect and announce the hybrid behavior to users.
>> > Also, It made some users confused for the fallback behavior. The fact
>> > comes from I had been ask by community users. Throw an excpetioin directly
>> > when fail to parse the sql statement in Hive dialect will be more 
>> > intuitive.
>> >
>> > Last but not least, it's import to decouple Hive with Flink planner[3]
>> > before we can externalize Hive connector[4]. If we still fall back to Flink
>> > default dialct, then we will need depend on `ParserImpl` in Flink planner,
>> > which will block us removing the provided dependency of Hive dialect as
>> > well as externalizing Hive connector.
>> >
>> > Although we hadn't announced the fall back behavior ever, but some users
>> > may implicitly depend on this behavior in theirs sql jobs. So, I hereby
>> > open the dicussion about abandoning the fall back behavior to make Hive
>> > dialect clear and isoloted.
>> > Please remember it won't break the Hive synatax but the syntax specified
>> > to Flink may fail after then. But for the failed sql, you can use `SET
>> > table.sql-dialect=default;` to switch to Flink dialect.
>> > If there's some flink-specific statements we found should be included in
>> > Hive dialect to be easy to use, I think we can still add them as specific
>> > cases to Hive dialect.
>> >
>> > Look forwards to your feedback. I'd love to listen the feedback from
>> > community to take the next steps.
>> >
>> > [1]:
>> > https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java#L348
>> > [2]:https://issues.apache.org/jira/browse/FLINK-26681
>> > [3]:https://issues.apache.org/jira/browse/FLINK-31413
>> > [4]:https://issues.apache.org/jira/browse/FLINK-30064
>> >
>> >
>> >
>> > Best regards,
>> > Yuxia
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>
>
>
> --
> Best regards!
> Rui Li


[DISCUSS] Hive dialect shouldn't fall back to Flink's default dialect

2023-05-28 Thread yuxia
Hi, community . 

I want to start the discussion about Hive dialect shouldn't fall back to 
Flink's default dialect. 

Currently, when the HiveParser fail to parse the sql in Hive dialect, it'll 
fall back to Flink's default parser[1] to handle flink-specific statements like 
"CREATE CATALOG xx with (xx);". 

As I‘m involving with Hive dialect and have some communication with community 
users who use Hive dialectrecently, I'm thinking throw exception directly 
instead of falling back to Flink's default dialect when fail to parse the sql 
in Hive dialect 

Here're some reasons: 

First of all, it'll hide some error with Hive dialect. For example, we found we 
can't use Hive dialect any more with Flink sql client in release validation 
phase[2], finally we find a modification in Flink sql client cause it, but our 
test case can't find it earlier for although HiveParser faill to parse it but 
then it'll fall back to default parser and pass test case successfully. 

Second, conceptually, Hive dialect should be do nothing with Flink's default 
dialect. They are two totally different dialect. If we do need a dialect mixing 
Hive dialect and default dialect , may be we need to propose a new hybrid 
dialect and announce the hybrid behavior to users. 
Also, It made some users confused for the fallback behavior. The fact comes 
from I had been ask by community users. Throw an excpetioin directly when fail 
to parse the sql statement in Hive dialect will be more intuitive. 

Last but not least, it's import to decouple Hive with Flink planner[3] before 
we can externalize Hive connector[4]. If we still fall back to Flink default 
dialct, then we will need depend on `ParserImpl` in Flink planner, which will 
block us removing the provided dependency of Hive dialect as well as 
externalizing Hive connector. 

Although we hadn't announced the fall back behavior ever, but some users may 
implicitly depend on this behavior in theirs sql jobs. So, I hereby open the 
dicussion about abandoning the fall back behavior to make Hive dialect clear 
and isoloted. 
Please remember it won't break the Hive synatax but the syntax specified to 
Flink may fail after then. But for the failed sql, you can use `SET 
table.sql-dialect=default;` to switch to Flink dialect. 
If there's some flink-specific statements we found should be included in Hive 
dialect to be easy to use, I think we can still add them as specific cases to 
Hive dialect. 

Look forwards to your feedback. I'd love to listen the feedback from community 
to take the next steps. 

[1]:https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java#L348
 
[2]:https://issues.apache.org/jira/browse/FLINK-26681 
[3]:https://issues.apache.org/jira/browse/FLINK-31413 
[4]:https://issues.apache.org/jira/browse/FLINK-30064 



Best regards, 
Yuxia 


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 Thread yuxia
congratulations! 

Best regards, 
Yuxia 


发件人: "Andrew Otto"  
收件人: "Matthias Pohl"  
抄送: "Jing Ge" , "Leonard Xu" , "Yu Li" 
, "dev" , "User" 
, "user-zh"  
发送时间: 星期一, 2023年 3 月 27日 下午 8:57:50 
主题: Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache 
Paimon(incubating) 

Exciting! 

If this ends up working well, Wikimedia Foundation would love to try it out! 

On Mon, Mar 27, 2023 at 8:39 AM Matthias Pohl via user < [ 
mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: 



Congratulations and good luck with pushing the project forward. 

On Mon, Mar 27, 2023 at 2:35 PM Jing Ge via user < [ 
mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: 

BQ_BEGIN

Congrats! 
Best regards, 
Jing 

On Mon, Mar 27, 2023 at 2:32 PM Leonard Xu < [ mailto:xbjt...@gmail.com | 
xbjt...@gmail.com ] > wrote: 

BQ_BEGIN

Congratulations! 

Best, 
Leonard 


BQ_BEGIN

On Mar 27, 2023, at 5:23 PM, Yu Li < [ mailto:car...@gmail.com | 
car...@gmail.com ] > wrote: 

Dear Flinkers, 




As you may have noticed, we are pleased to announce that Flink Table Store has 
joined the Apache Incubator as a separate project called Apache 
Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
streaming data lake platform for high-speed data ingestion, change data 
tracking and efficient real-time analytics, with the vision of supporting a 
larger ecosystem and establishing a vibrant and neutral open source community. 




We would like to thank everyone for their great support and efforts for the 
Flink Table Store project, and warmly welcome everyone to join the development 
and activities of the new project. Apache Flink will continue to be one of the 
first-class citizens supported by Paimon, and we believe that the Flink and 
Paimon communities will maintain close cooperation. 




亲爱的Flinkers, 




正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache 孵化器独立孵化 [1] [2] 
[3]。新项目的名字是 Apache 
Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
 




在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 
将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。 




Best Regards, 
Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC) 

致礼, 
李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC) 




[1] [ https://paimon.apache.org/ | https://paimon.apache.org/ ] 
[2] [ https://github.com/apache/incubator-paimon | 
https://github.com/apache/incubator-paimon ] 
[3] [ https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal | 
https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal ] 





BQ_END


BQ_END


BQ_END




Re: Are the Table API Connectors production ready?

2023-03-13 Thread yuxia
The plan shows the filters has been pushed down. But remeber, although pused 
down, the filesystem table won't accept the filter. So, it'll be still like 
scan 
all files. 

Best regards, 
Yuxia 


发件人: "Maryam Moafimadani"  
收件人: "Hang Ruan"  
抄送: "yuxia" , "ravi suryavanshi" 
, "Yaroslav Tkachenko" , 
"Shammon FY" , "User"  
发送时间: 星期一, 2023年 3 月 13日 下午 10:07:57 
主题: Re: Are the Table API Connectors production ready? 

Hi All, 
It's exciting to see file filtering in the plan for development. I am curious 
whether the following query on a filesystem connector would actually push down 
the filter on metadata `file.path`? 

Select score, `file.path` from MyUserTable WHERE `file.path` LIKE '%prefix_%' 

== Optimized Execution Plan == 
Calc(select=[score, file.path], where=[LIKE(file.path, '%2022070611284%')]) 
+- TableSourceScan(table=[[default_catalog, default_database, MyUserTable, 
filter=[LIKE(file.path, _UTF-16LE'%2022070611284%')]]], fields=[score, 
file.path]) 

Thanks, 
Maryam 

On Mon, Mar 13, 2023 at 8:55 AM Hang Ruan < [ mailto:ruanhang1...@gmail.com | 
ruanhang1...@gmail.com ] > wrote: 



Hi, yuxia, 
I would like to help to complete this task. 

Best, 
Hang 

yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] > 
于2023年3月13日周一 09:32写道: 

BQ_BEGIN

Yeah, you're right. We don't provide filtering files with patterns. And 
actually we had already a jira[1] for it. 
I was intended to do this in the past, but don't have much time. Anyone who are 
insterested can take it over. We're 
happy to help review. 

[1] [ https://issues.apache.org/jira/browse/FLINK-17398 | 
https://issues.apache.org/jira/browse/FLINK-17398 ] 

Best regards, 
Yuxia 


发件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
收件人: "Yaroslav Tkachenko" < [ mailto:yaros...@goldsky.com | 
yaros...@goldsky.com ] >, "Shammon FY" < [ mailto:zjur...@gmail.com | 
zjur...@gmail.com ] > 
抄送: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
发送时间: 星期一, 2023年 3 月 13日 上午 12:36:46 
主题: Re: Are the Table API Connectors production ready? 

Thanks a lot, Yaroslav and Shammon. 
I want to use the Filesystem Connector. I tried it works well till it is 
running. If the job is restarted. It processes all the files again. 

Could not find the move or delete option after collecting the files. Also, I 
could not find the filtering using patterns. 

Pattern matching is required as different files exist in the same folder. 

Regards, 
Ravi 
On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY < [ 
mailto:zjur...@gmail.com | zjur...@gmail.com ] > wrote: 


Hi Ravi 

Agree with Yaroslav and if you find any problems in use, you can create an 
issue in jira [ 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK | 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK ] . I have 
used kafka/jdbc/hive in production too, they work well. 

Best, 
Shammon 

On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko < [ 
mailto:yaros...@goldsky.com | yaros...@goldsky.com ] > wrote: 

BQ_BEGIN

Hi Ravi, 

All of them should be production ready. I've personally used half of them in 
production. 

Do you have any specific concerns? 

On Thu, Mar 9, 2023 at 9:39 AM [ http://ravi_suryavanshi.yahoo.com/ | 
ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | 
user@flink.apache.org ] > wrote: 

BQ_BEGIN

Hi, 
Can anyone help me here? 

Thanks and regards, 
Ravi 

On Monday, 27 February, 2023 at 09:33:18 am IST, [ 
http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ 
mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: 


Hi Team, 


In Flink 1.16.0, we would like to use some of the Table API Connectors for 
production. Kindly let me know if the below connectors are production ready or 
only for testing purposes. 

Name Version Source Sink [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/
 | Filesystem ]  Bounded and Unbounded Scan, Lookup  Streaming 
Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/elasticsearch/
 | Elasticsearch ] 6.x & 7.x   Not supported   Streaming Sink, Batch 
Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/opensearch/
 | Opensearch ]   1.x & 2.x   Not supported   Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/
 | Apache Kafka ]  0.10+   Unbounded Scan  Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/dynamodb/
 | Amazon DynamoDB ]Not supported   Streaming S

Re: Are the Table API Connectors production ready?

2023-03-13 Thread yuxia
Thanks Hang for taking it. Assigned to you~ 

Best regards, 
Yuxia 


发件人: "Hang Ruan"  
收件人: "yuxia"  
抄送: "ravi suryavanshi" , "Yaroslav Tkachenko" 
, "Shammon FY" , "User" 
 
发送时间: 星期一, 2023年 3 月 13日 下午 8:54:49 
主题: Re: Are the Table API Connectors production ready? 

Hi, yuxia, 
I would like to help to complete this task. 

Best, 
Hang 

yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | luoyu...@alumni.sjtu.edu.cn ] > 
于2023年3月13日周一 09:32写道: 



Yeah, you're right. We don't provide filtering files with patterns. And 
actually we had already a jira[1] for it. 
I was intended to do this in the past, but don't have much time. Anyone who are 
insterested can take it over. We're 
happy to help review. 

[1] [ https://issues.apache.org/jira/browse/FLINK-17398 | 
https://issues.apache.org/jira/browse/FLINK-17398 ] 

Best regards, 
Yuxia 


发件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
收件人: "Yaroslav Tkachenko" < [ mailto:yaros...@goldsky.com | 
yaros...@goldsky.com ] >, "Shammon FY" < [ mailto:zjur...@gmail.com | 
zjur...@gmail.com ] > 
抄送: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
发送时间: 星期一, 2023年 3 月 13日 上午 12:36:46 
主题: Re: Are the Table API Connectors production ready? 

Thanks a lot, Yaroslav and Shammon. 
I want to use the Filesystem Connector. I tried it works well till it is 
running. If the job is restarted. It processes all the files again. 

Could not find the move or delete option after collecting the files. Also, I 
could not find the filtering using patterns. 

Pattern matching is required as different files exist in the same folder. 

Regards, 
Ravi 
On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY < [ 
mailto:zjur...@gmail.com | zjur...@gmail.com ] > wrote: 


Hi Ravi 

Agree with Yaroslav and if you find any problems in use, you can create an 
issue in jira [ 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK | 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK ] . I have 
used kafka/jdbc/hive in production too, they work well. 

Best, 
Shammon 

On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko < [ 
mailto:yaros...@goldsky.com | yaros...@goldsky.com ] > wrote: 

BQ_BEGIN

Hi Ravi, 

All of them should be production ready. I've personally used half of them in 
production. 

Do you have any specific concerns? 

On Thu, Mar 9, 2023 at 9:39 AM [ http://ravi_suryavanshi.yahoo.com/ | 
ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | 
user@flink.apache.org ] > wrote: 

BQ_BEGIN

Hi, 
Can anyone help me here? 

Thanks and regards, 
Ravi 

On Monday, 27 February, 2023 at 09:33:18 am IST, [ 
http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ 
mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: 


Hi Team, 


In Flink 1.16.0, we would like to use some of the Table API Connectors for 
production. Kindly let me know if the below connectors are production ready or 
only for testing purposes. 

Name Version Source Sink [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/
 | Filesystem ]  Bounded and Unbounded Scan, Lookup  Streaming 
Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/elasticsearch/
 | Elasticsearch ] 6.x & 7.x   Not supported   Streaming Sink, Batch 
Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/opensearch/
 | Opensearch ]   1.x & 2.x   Not supported   Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/
 | Apache Kafka ]  0.10+   Unbounded Scan  Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/dynamodb/
 | Amazon DynamoDB ]Not supported   Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kinesis/
 | Amazon Kinesis Data Streams ] Unbounded Scan  Streaming Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/firehose/
 | Amazon Kinesis Data Firehose ]   Not supported   Streaming Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/jdbc/
 | JDBC ]   Bounded Scan, LookupStreaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hbase/
 | Apache HBase ]  1.4.x & 2.2.x   Bounded Scan, LookupStreaming Sink, 
Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/overview/
 | Apache Hive ] 

Thanks and regards 




BQ_END


BQ_END




Re: Are the Table API Connectors production ready?

2023-03-12 Thread yuxia
Yeah, you're right. We don't provide filtering files with patterns. And 
actually we had already a jira[1] for it. 
I was intended to do this in the past, but don't have much time. Anyone who are 
insterested can take it over. We're 
happy to help review. 

[1] https://issues.apache.org/jira/browse/FLINK-17398 

Best regards, 
Yuxia 


发件人: "User"  
收件人: "Yaroslav Tkachenko" , "Shammon FY" 
 
抄送: "User"  
发送时间: 星期一, 2023年 3 月 13日 上午 12:36:46 
主题: Re: Are the Table API Connectors production ready? 

Thanks a lot, Yaroslav and Shammon. 
I want to use the Filesystem Connector. I tried it works well till it is 
running. If the job is restarted. It processes all the files again. 

Could not find the move or delete option after collecting the files. Also, I 
could not find the filtering using patterns. 

Pattern matching is required as different files exist in the same folder. 

Regards, 
Ravi 
On Friday, 10 March, 2023 at 05:47:27 am IST, Shammon FY  
wrote: 


Hi Ravi 

Agree with Yaroslav and if you find any problems in use, you can create an 
issue in jira [ 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK | 
https://issues.apache.org/jira/issues/?jql=project%20%3D%20FLINK ] . I have 
used kafka/jdbc/hive in production too, they work well. 

Best, 
Shammon 

On Fri, Mar 10, 2023 at 1:42 AM Yaroslav Tkachenko < [ 
mailto:yaros...@goldsky.com | yaros...@goldsky.com ] > wrote: 



Hi Ravi, 

All of them should be production ready. I've personally used half of them in 
production. 

Do you have any specific concerns? 

On Thu, Mar 9, 2023 at 9:39 AM [ http://ravi_suryavanshi.yahoo.com/ | 
ravi_suryavanshi.yahoo.com ] via user < [ mailto:user@flink.apache.org | 
user@flink.apache.org ] > wrote: 

BQ_BEGIN

Hi, 
Can anyone help me here? 

Thanks and regards, 
Ravi 

On Monday, 27 February, 2023 at 09:33:18 am IST, [ 
http://ravi_suryavanshi.yahoo.com/ | ravi_suryavanshi.yahoo.com ] via user < [ 
mailto:user@flink.apache.org | user@flink.apache.org ] > wrote: 


Hi Team, 


In Flink 1.16.0, we would like to use some of the Table API Connectors for 
production. Kindly let me know if the below connectors are production ready or 
only for testing purposes. 

Name Version Source Sink [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/filesystem/
 | Filesystem ]  Bounded and Unbounded Scan, Lookup  Streaming 
Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/elasticsearch/
 | Elasticsearch ] 6.x & 7.x   Not supported   Streaming Sink, Batch 
Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/opensearch/
 | Opensearch ]   1.x & 2.x   Not supported   Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kafka/
 | Apache Kafka ]  0.10+   Unbounded Scan  Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/dynamodb/
 | Amazon DynamoDB ]Not supported   Streaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/kinesis/
 | Amazon Kinesis Data Streams ] Unbounded Scan  Streaming Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/firehose/
 | Amazon Kinesis Data Firehose ]   Not supported   Streaming Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/jdbc/
 | JDBC ]   Bounded Scan, LookupStreaming Sink, Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hbase/
 | Apache HBase ]  1.4.x & 2.2.x   Bounded Scan, LookupStreaming Sink, 
Batch Sink 
[ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/overview/
 | Apache Hive ] 

Thanks and regards 




BQ_END




Re: CSV File Sink in Streaming Use Case

2023-03-07 Thread yuxia
Hi, as the doc said: 
'The BulkFormat reads and decodes batches of records at a time.' So, the bulk 
is not binded to column format, the bulk writer for csv is indeed implemented 
in the Flink code. Actaully, you can use either Row or Bulk depending on what 
style you would like to write data. 

As for the doc missing for CSV BulkFormat and not public in flink-csv, I really 
don't know why. I guess the reason maybe Flink won't expose it the datastream 
api, but only expose to table api. 

Best regards, 
Yuxia 


发件人: "User"  
收件人: "User"  
发送时间: 星期二, 2023年 3 月 07日 下午 7:35:47 
主题: CSV File Sink in Streaming Use Case 

Hi, 

I am working on a Java DataStream application and need to implement a File sink 
with CSV format. 

I see that I have two options here - Row and Bulk ( [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem/#format-types-1
 | 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem/#format-types-1
 ] ) 

So for CSV file distribution which one should I use? Row or Bulk? 

I think the documentation is confusing for File connectors. Because I can see 
an example for PyFlink which uses a BulkWriter for CSV. But the same class is 
not public in flink-csv. So does Flink not support CSVBulkWriter for Java? 

And for Table API File sink explicitly supports CSV for Row format. But fails 
to mention anything about CSV in DataStream File sink. 

This all is just really confusing. Any leads on this are much appreciated. 

Thanks 



Re: Example of dynamic table

2023-03-07 Thread yuxia
What do your mean "try the feature of dynamic table", do you want to know the 
concept of dynamic table[1] or  User-defined Sources & Sinks[2] with dynamic 
table?
[1]: 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/
[2]: 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jie Han" 
收件人: "User" 
发送时间: 星期三, 2023年 3 月 08日 上午 7:54:06
主题: Example of dynamic table

Hello community!
I want to try the feature of dynamic table but do not find examples in the 
official doc.
Is this part missing?


Re: Is there any API method for dynamic loading of the UDF jar

2023-02-26 Thread yuxia
Flink 1.17 which is to be released recently will support it[1] in table 
environment[2]. 

No any difference in performance between TableEnvironment method calls and 
TableEnvironment.executeSql, the api is different but the runtime is same under 
the hood. You can choose any one of them according to your need / perference. 

[1] https://issues.apache.org/jira/browse/FLINK-27660 
[2]: 
https://github.com/apache/flink/blob/56b124bcfd661a295ab8772d265c12de25f690ab/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java#L541
 

Best regards, 
Yuxia 


发件人: "neha goyal"  
收件人: "User"  
发送时间: 星期一, 2023年 2 月 27日 上午 2:23:01 
主题: Is there any API method for dynamic loading of the UDF jar 

Hello, 
In Flink 16, CREATE FUNCTION USING JAR functionality has been introduced where 
we can specify the jar resources and the jar can be located in a remote file 
system such as hdfs/s3. I don't see an alternative method for the same 
functionality using the TableEnvironment methods call, for example, 
createTemporarySystemFunction doesn't take any URI. 

Will these methods be provided in the future? 
Is there any difference in performance if we use TableEnvironment method calls 
vs TableEnvironment.executeSql for the same feature? which one is recommended? 

Thanks and regards 



Re: Inconsistent data format of flink-training-repo and learn-flink doc

2023-02-21 Thread yuxia
Seems in this pr[1], startTime and endTIme have been replaced with a single 
eveentTime, but the doc forget to update. 
Coud you please help create a JIRA[2] for it? And if you're interested to fix 
it, weclome to contribute. 

[1] https://github.com/apache/flink-training/pull/36 
[2] https://issues.apache.org/jira/projects/FLINK/issues 

Best regards, 
Yuxia 


发件人: "Zhongpu Chen"  
收件人: "User"  
发送时间: 星期二, 2023年 2 月 21日 下午 8:49:13 
主题: Inconsistent data format of flink-training-repo and learn-flink doc 

Hi, 
The data format specified in flink-training-repo ( [ 
https://github.com/apache/flink-training/tree/release-1.16 | 
https://github.com/apache/flink-training/tree/release-1.16 ] ) shows that a 
TaixRide is either a start or an end one with the eventTime. However, the Java 
code in "Data Pipelines & ETL" ( [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/learn-flink/etl/
 | 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/learn-flink/etl/
 ] ) tried to get both "startTime" and "endTime". But in fact, those are not 
defined in flink-training-repo. 

```java 
Interval rideInterval = new Interval ( ride . startTime , ride . endTime ); 
``` 

I think such inconsistency would puzzle new comers of Flink. 

-- 
Zhongpu Chen 



Re: Flink SQL support array transform function

2023-02-21 Thread yuxia
May be you can try with a non-lambda function. 
But TBH, I haven't seen any Flink UDF that accepts function as parameter in my 
previous experience. I'm afraid that it's no allowed to pass a function as 
parameter. 

Best regards, 
Yuxia 


发件人: "Xuekui"  
收件人: "yuxia"  
抄送: "fskmine" , "Caizhi Weng" , "User" 
 
发送时间: 星期二, 2023年 2 月 21日 上午 11:25:48 
主题: Re:Re: Flink SQL support array transform function 

Hi YuXia, 

Thanks for your advice. 

By adding the hint, the type validation can pass. 
But still I can't pass the function to this udf 
Here is my query 

select array_transform(ids, id -> id +1) from tmp_table 

The lambda function id -> id +1 can't be passed because "->" is not supported 
in calcite now. 

Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "- >" at line 3, column 40. 
at 
org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56)
 
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:74) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
 











Original Email 


Sender: "yuxia" < luoyu...@alumni.sjtu.edu.cn >; 

Sent Time: 2023/2/20 10:00 

To: "Xuekui" < baixue...@foxmail.com >; 

Cc recipient: "fskmine" < fskm...@gmail.com >; "Caizhi Weng" < 
tsreape...@gmail.com >; "User" < user@flink.apache.org >; 

Subject: Re: Flink SQL support array transform function 

Hi, Xuekui. 
As said in the exception stack, may be you can try to provide hint for the 
function's parameters. 


class ArrayTransformFunction extends ScalarFunction { 

def eval ( @DataTypeHint("ARRAY") a: Array [Long], 
@DataTypeHint("RAW") function: Long => Long): Array [Long] = { 
a.map(e => function(e)) 
} 
} 
Hope it can help. 
For more detail, please refer to Flink doc[1] 

[1]: [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
 ] 


Best regards, 
Yuxia 


发件人: "Xuekui"  
收件人: "fskmine" , "Caizhi Weng"  
抄送: "User"  
发送时间: 星期四, 2023年 2 月 16日 上午 10:54:05 
主题: Re: Flink SQL support array transform function 

Hi Caizhi, 

I've tried to write UDF to support this function, but I found I can't pass the 
function parameter to udf because the data type of function is not supported. 
An exception throws in SQL validation. 

My UDF code: 
class ArrayTransformFunction extends ScalarFunction { 

def eval (a: Array [Long], function: Long => Long): Array [Long] = { 
a.map(e => function(e)) 
} 
} 

Exception: 
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. An error occurred in the type inference logic of function 
'transform'. 
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
 
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
 
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
 
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
 
at SQLTest$.main(SQLTest.scala:44) 
at SQLTest.main(SQLTest.scala) 
Caused by: org.apache.flink.table.api.ValidationException: An error occurred in 
the type inference logic of function 'transform'. 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
 
at java.util.Optional.flatMap(Optional.java:241) 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
 
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImp

Re: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API

2023-02-21 Thread yuxia
Hi, Daniel Henneberger. 
Thanks for reporting. It seems a bug to me. Could you please help create a 
Jira[1] for it? 
As a workaround, is it possible not to use UNNEST? May be you can try to use 
EXPLODE function for the Flink planner will rewrites UNNEST to explode function 
in implementation[2]. 

[1] https://issues.apache.org/jira/projects/FLINK/issues/ 
[2] 
https://github.com/apache/flink/blob/bf342d2f67a46e5266c3595734574db270f1b48c/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRule.scala
 

Best regards, 
Yuxia 


发件人: "Daniel Henneberger"  
收件人: "User"  
发送时间: 星期三, 2023年 2 月 22日 上午 5:35:02 
主题: Issue with de-serializing CompiledPlan and UNNEST_ROWS in Table API 

Dear Apache Flink community, 

I could use some help with a serialization issue I'm having while using the 
Table API. Specifically, I'm trying to deserialize a serialized CompiledPlan, 
but I'm running into trouble with the UNNEST_ROWS operation. It seems that the 
CompilePlan deserializer isn't looking up any functions in the 
BuiltInFunctionDefinitions class, which is causing the de-serialization to 
fail. 

Do any of you have experience with this issue or know of a workaround for 
serializing a Table API plan? 

Below is code to replicate. 

Thanks, 
Daniel Henneberger 

private void test () { 
EnvironmentSettings settings = EnvironmentSettings . newInstance 
().inStreamingMode().build(); 
TableEnvironment tEnv = TableEnvironment . create ( settings ); 

// Create a table of values 
Table table = tEnv .fromValues(createNestedDatatype(), 
Row . of ( List . of ( Row . of ( "nested" )), "name" )); 
tEnv .createTemporaryView( "table1" , table ); 

// Invoke the unnest operation 
Table unnested = tEnv .sqlQuery( "SELECT name, nested \n " 
+ "FROM table1 CROSS JOIN UNNEST(arr) AS t (nested)" ); 

StatementSet statementSet = tEnv .createStatementSet(); 
statementSet .addInsert( TableDescriptor . forConnector ( "print" ).build(), 
unnested ); 

// Serialize the plan 
CompiledPlan plan = statementSet .compilePlan(); 
String json = plan .asJsonString(); 

// Attempt to load the plan 
// This fails with the error 'Could not resolve internal system function 
'$UNNEST_ROWS$1'. This is a bug, please file an issue.' 
CompiledPlan plan2 = tEnv .loadPlan( PlanReference . fromJsonString ( json )); 
plan2 .execute().print(); 
} 

private DataType createNestedDatatype () { 
return DataTypes . ROW ( 
DataTypes . FIELD ( "arr" , DataTypes . ARRAY ( DataTypes . ROW ( 
DataTypes . FIELD ( "nested" , DataTypes . STRING ()) 
))), 
DataTypes . FIELD ( "name" , DataTypes . STRING ())); 
} 



Re: Metrics or runtimeContext in global commit

2023-02-19 Thread yuxia
It seems no other way to get the runtimeContext in a global commit. For me, I 
think it's reasoable to propose the fetature. 
I added flink-devs channel for more attention/discussion in flink devs.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Tobias Fröhlich" 
收件人: "User" 
发送时间: 星期二, 2023年 2 月 14日 下午 9:26:34
主题: Metrics or runtimeContext in global commit

Dear flink team,

I would like to use metrics (which are then written to an influxdb) in the 
method 
org.apache.flink.api.connector.sink2.Committer::commit(Collection>
 committables) that I use for global commit. I use the helper method 
StandardSinkTopologies.addGlobalCommitter(...) to define the post-commit 
topology.

The problem is: When I implement the interface Committer, I cannot get the 
runtimeContext that I need for the metrics, because it is not an Operator.

The only solution I found was by cloning the flink source code and amending it 
in the following way:

 1. declaring an abstract class "CommitterWithRuntimeContext" that 
implements Committer and has:
- an additional field for the runtimeContext
- setter and getter for this field
- an abstract method "void init()"

 2. in the setup() method of GlobalCommitterOperator (which is an operator and 
thus has a runtimeContext) adding the following lines at the end:

if (committer instanceof  CommitterWithRuntimeContext) {
((CommitterWithRuntimeContext) 
committer).setRuntimeContext(getRuntimeContext());
((CommitterWithRuntimeContext) committer).init();
}

I can then implement the method CommitterWithRuntimeContext::init() in our code 
and call the method CommitterWithRuntimeContext::getRuntimeContext() when I 
need the runtimeContext.

Is there another way to get the runtimeContext in a global commit? If not, is 
it justified to propose a feature request for a future release, where the 
global commit method can be implemented in a way that the user has access to 
the runtimeContext?

Best regards and thanks in advance
Tobias Fröhlich


Re: Flink SQL support array transform function

2023-02-19 Thread yuxia
Hi, Xuekui. 
As said in the exception stack, may be you can try to provide hint for the 
function's parameters. 


class ArrayTransformFunction extends ScalarFunction { 

def eval ( @DataTypeHint("ARRAY") a: Array [Long], 
@DataTypeHint("RAW") function: Long => Long): Array [Long] = { 
a.map(e => function(e)) 
} 
} 
Hope it can help. 
For more detail, please refer to Flink doc[1] 

[1]: [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#type-inference
 ] 


Best regards, 
Yuxia 


发件人: "Xuekui"  
收件人: "fskmine" , "Caizhi Weng"  
抄送: "User"  
发送时间: 星期四, 2023年 2 月 16日 上午 10:54:05 
主题: Re: Flink SQL support array transform function 

Hi Caizhi, 

I've tried to write UDF to support this function, but I found I can't pass the 
function parameter to udf because the data type of function is not supported. 
An exception throws in SQL validation. 

My UDF code: 
class ArrayTransformFunction extends ScalarFunction { 

def eval (a: Array [Long], function: Long => Long): Array [Long] = { 
a.map(e => function(e)) 
} 
} 

Exception: 
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. An error occurred in the type inference logic of function 
'transform'. 
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
 
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
 
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
 
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) 
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660)
 
at SQLTest$.main(SQLTest.scala:44) 
at SQLTest.main(SQLTest.scala) 
Caused by: org.apache.flink.table.api.ValidationException: An error occurred in 
the type inference logic of function 'transform'. 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
 
at java.util.Optional.flatMap(Optional.java:241) 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
 
at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1260)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1275)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1245)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
 
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724)
 
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147)
 
... 6 more 
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a 
valid type inference for function class 'udf.ArrayTransformFunction'. Please 
check for implementation mistakes and/or provide a corresponding hint. 
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
 
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
 
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.forScalarFunction(TypeInferenceExtractor.java:83)
 
at 
org.apache.flink.table.functions.ScalarFunction.getTypeInference(ScalarFunction.java:143)
 
at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
 
... 17 more 
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting 
a signature to output mapping. 
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
 
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
 
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
 
at 
org.apache.fli

Re: KafkaSink handling message size produce errors

2023-02-16 Thread yuxia
Hi, Hatem. 
I think there is no way to catch the exception and then ignore it in current 
implementation for KafkaSink. You may also need to extend the KafkaSink. 

Best regards, 
Yuxia 


发件人: "Hatem Mostafa"  
收件人: "User"  
发送时间: 星期四, 2023年 2 月 16日 下午 9:32:44 
主题: KafkaSink handling message size produce errors 

Hello, 
I am writing a flink job that reads and writes into kafka, it is using a window 
operator and eventually writing the result of the window into a kafka topic. 
The accumulated data can exceed the maximum message size after compression on 
the producer level. I want to be able to catch the exception coming from the 
producer and ignore this window. I could not find a way to do that in [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-sink
 | KafkaSink ] , is there a way to do so? 

I attached here an example of an error that I would like to handle gracefully. 




This question is similar to one that was asked on stackoverflow [ 
https://stackoverflow.com/questions/52308911/how-to-handle-exceptions-in-kafka-sink
 | here ] but the answer is relevant for older versions of flink. 

Regards, 
Hatem 



Re: Non-Determinism in Table-API with Kafka and Event Time

2023-02-12 Thread yuxia
HI, Theo.
I'm wondering what the Event-Time-Windowed Query you are using looks like.
For example, how do you define the watermark?
Considering you read records from the 10 partitions, and it may well that the 
records will arrive the window process operator out of order. 
Is it possible that the records exceed the watermark, but there're still some 
records will arrive?

If that's the case, every time, the records used to calculate result may well 
different and then result in non-determinism result.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Theodor Wübker" 
收件人: "User" 
发送时间: 星期日, 2023年 2 月 12日 下午 4:25:45
主题: Non-Determinism in Table-API with Kafka and Event Time

Hey everyone,

I experience non-determinism in my Table API Program at the moment and (as a 
relatively unexperienced Flink and Kafka user) I can’t really explain to myself 
why it happens. So, I have a topic with 10 Partitions and a bit of Data on it. 
Now I run a simple SELECT * query on this, that moves some attributes around 
and writes everything on another topic with 10 partitions. Then, on this topic 
I run a Event-Time-Windowed Query. Now I experience Non-Determinism: The 
results of the windowed query differ with every execution. 
I thought this might be, because the SELECT query wrote the data to the 
partitioned topic without keys. So I tried it again with the same key I used 
for the original topic. It resulted in the exact same topic structure. Now when 
I run the Event-Time-Windowed query, I get incorrect results (too few 
result-entries). 

I have already read a lot of the Docs on this and can’t seem to figure it out. 
I would much appreciate, if someone could shed a bit of light on this. Is there 
anything in particular I should be aware of, when reading partitioned topics 
and running an event time query on that? Thanks :)


Best,
Theo


Re: Flink Hudi HMS Catalog problem

2023-02-12 Thread yuxia
HI, Flink provides HiveCatalog which can store native Hive table and other type 
Flink table(more exactly, a DDL mapping), with which, Flink can access Hive 
table and other Flink tables. 

Does it meet you requirement? 

Best regards, 
Yuxia 


发件人: "melin li"  
收件人: "User"  
发送时间: 星期三, 2023年 2 月 08日 下午 5:51:34 
主题: Flink Hudi HMS Catalog problem 

Flink SQL reads and writes Hudi and synchronizes Hive tables via the Hudi HMS 
Catalog, If the hive database has both the parquet table and the hudi table, 
two different flink catalogs need to be registered, causing problems. Not very 
friendly for data analysts to use. Yes spark does not have this problem, you 
can use spark_catalog catalog to access hudi and parquet tables, not sure if 
this problem is solved in hudi or flink? 




Re: Seeking suggestions for ingesting large amount of data from S3

2023-02-12 Thread yuxia
Hi, Eric. 
Thanks for reaching out. 
I'm wondering how do you use the Table API to ingest the data. Since the OOM is 
too general, do you have any clue for OOM? 
May be you can use jmap to what occupy the most of memory. If find, you can try 
to figure out what's the reason, is it cause by memory lack or others. 

Btw, have ever tried with Flink SQL to ingeset the data. Does the OOM still 
happen? 

Best regards, 
Yuxia 


发件人: "Yang Liu"  
收件人: "User"  
发送时间: 星期五, 2023年 2 月 10日 上午 5:10:49 
主题: Seeking suggestions for ingesting large amount of data from S3 

Hi all, 
We are trying to ingest large amounts of data (20TB) from S3 using Flink 
filesystem connector to bootstrap a Hudi table. Data are well partitioned in S3 
by date/time, but we have been facing OOM issues in Flink jobs, so we wanted to 
update the Flink job to ingest the data chunk by chuck (partition by partition) 
by some kind of looping instead of all at once. Curious what’s the recommended 
way to do this in Flink. I believe this should be a common use case, so hope to 
get some ideas here. 

We have been using Table APIs, but open to other APIs. 
Thanks & Regards 
Eric 



Re: Unsubscribe

2023-02-07 Thread yuxia
Hi. 
To unsubscribe, you should send email to user-unsubscr...@flink.apache.org with 
any contents or subject. Please see more in the Flink Doc[1] 

[1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list 

Best regards, 
Yuxia 


发件人: "liang ji"  
收件人: "User"  
发送时间: 星期三, 2023年 2 月 08日 下午 2:10:03 
主题: Unsubscribe 




Re: Unsubscribe

2023-02-07 Thread yuxia
Hi, All. 
To unsubscribe, you can send email to user-unsubscr...@flink.apache.org with 
any contents or subject. Please see more in the Flink Doc[1] 

[1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list 

Best regards, 
Yuxia 


发件人: "Ragini Manjaiah"  
收件人: "Soumen Choudhury"  
抄送: "User"  
发送时间: 星期三, 2023年 2 月 08日 上午 11:06:30 
主题: Re: Unsubscribe 

Hi Soumen, 
I want to unsubscribe from this mailing list. 

Thanks & Regards 
Ragini Manjaiah 

On Fri, Feb 3, 2023 at 4:07 PM Soumen Choudhury < [ mailto:sou@gmail.com | 
sou@gmail.com ] > wrote: 





-- 
Regards 
Soumen Choudhury 
Cell : +91865316168 
mail to : [ mailto:sou@gmail.com | sou@gmail.com ] 






Re: I want to subscribe users' questions

2023-02-07 Thread yuxia
Maybe you will also be interested in joining Flink Slack, here is my invite 
link for joining Flink Slack:
https://join.slack.com/t/apache-flink/shared_invite/zt-1obpql04h-R3o5XM8d~Siyl3KGldkl2Q

Best regards,
Yuxia

- 原始邮件 -
发件人: "guanyuan chen" 
收件人: "User" , "user-zh" 
发送时间: 星期五, 2023年 2 月 03日 下午 7:48:55
主题: I want to subscribe users' questions

Hi,
My name is Guanyuan Chen.I am a big data development engineer, tencent
wechat department, china. I have 4 years experience in flink developing,
and want to subscribe flink's development news and help someone developing
flink job willingly.
Thanks a lot.


Re: Unable to do event time window aggregation with Kafka source

2023-02-06 Thread yuxia
Hi, Lucas. 
What do you mean by saying "unable to do event time window aggregation with 
watermarkedStream"? 
What exception it will throw? 

Best regards, 
Yuxia 


发件人: "wei_yuze"  
收件人: "User"  
发送时间: 星期二, 2023年 2 月 07日 下午 1:43:59 
主题: Unable to do event time window aggregation with Kafka source 



Hello! 




I was unable to do event time window aggregation with Kafka source, but had no 
problem with "fromElement" source. The code is attached as follow. The code has 
two data sources, named "streamSource" and "kafkaSource" respectively. The 
program works well with "streamSource", but not with "watermarkedStream". 


public class WindowReduceTest2 { 
public static void main(String[] args) throws Exception { 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(); 

// 使用fromElement数据源 
DataStreamSource streamSource = env.fromElements( 
new Event2("Alice", "./home", "2023-02-04 17:10:11"), 
new Event2("Bob", "./cart", "2023-02-04 17:10:12"), 
new Event2("Alice", "./home", "2023-02-04 17:10:13"), 
new Event2("Alice", "./home", "2023-02-04 17:10:15"), 
new Event2("Cary", "./home", "2023-02-04 17:10:16"), 
new Event2("Cary", "./home", "2023-02-04 17:10:16") 
); 

// 使用Kafka数据源 
JsonDeserializationSchema jsonFormat = new 
JsonDeserializationSchema<>(Event2.class); 
KafkaSource source = KafkaSource.builder() 
.setBootstrapServers(Config.KAFKA_BROKERS) 
.setTopics(Config.KAFKA_TOPIC) 
.setGroupId("my-group") 
.setStartingOffsets(OffsetsInitializer.earliest()) 
.setValueOnlyDeserializer(jsonFormat) 
.build(); 
DataStreamSource kafkaSource = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "Kafka Source"); 
kafkaSource.print(); 

// 生成watermark,从数据中提取时间作为事件时间 
SingleOutputStreamOperator watermarkedStream = 
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
 
.withTimestampAssigner(new SerializableTimestampAssigner() { 
@Override 
public long extractTimestamp(Event2 element, long recordTimestamp) { 
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("-MM-dd 
HH:mm:ss"); 
Date date = null; 
try { 
date = simpleDateFormat.parse(element.getTime()); 
} catch (ParseException e) { 
throw new RuntimeException(e); 
} 
long time = date.getTime(); 
System.out.println(time); 
return time; 
} 
})); 

// 窗口聚合 
watermarkedStream.map(new MapFunction>() { 
@Override 
public Tuple2 map(Event2 value) throws Exception { 
// 将数据转换成二元组,方便计算 
return Tuple2.of(value.getUser(), 1L); 
} 
}) 
.keyBy(r -> r.f0) 
// 设置滚动事件时间窗口 
.window(TumblingEventTimeWindows.of(Time.seconds(5))) 
.reduce(new ReduceFunction>() { 
@Override 
public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception { 
// 定义累加规则,窗口闭合时,向下游发送累加结果 
return Tuple2.of(value1.f0, value1.f1 + value2.f1); 
} 
}) 
.print("Aggregated stream"); 

env.execute(); 
} 
} 



Notably, if TumblingEventTimeWindows was replaced with 
TumblingProcessingTimeWindows, the program works well even with 
"watermarkedStream" 

Thanks for your time! 
Lucas 



Re: Design decisions around flink table store

2023-02-05 Thread yuxia
Hi, Bright. 

Thanks for reaching out. That's a really good question. 
Briefly speaking, the reason is both Hudi and iceberg are not efficient for 
updating. 
Also, the FLIP for flink-table-store has said why not hudi [1]: 

" 
Why doesn't FileStore use Hudi directly? 

1: Hudi aims to support the update of upsert, so it needs to forcibly define 
the primary key and time column. It is not easy to support all changelog types 
2: The update of Hudi is based on the index (currently there are BloomFilter 
and HBase). The data in the bucket is out of order. Every merge needs to be 
reread and rewritten, which is expensive. We need fast update storage, LSM is 
more suitable. 
" 

Also I have add JingSong Li to the mail list. He is the creator/maintainer of 
flink-table-store. Maybe he can provide more detail. 

[1] [ 
https://cwiki.apache.org/confluence/display/Flink/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage#FLIP188:IntroduceBuiltinDynamicTableStorage-UsingHudi
 | 
https://cwiki.apache.org/confluence/display/Flink/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage#FLIP188:IntroduceBuiltinDynamicTableStorage-UsingHudi
 ] 


Best regards, 
Yuxia 


发件人: "graceking lau"  
收件人: "User"  
发送时间: 星期一, 2023年 2 月 06日 上午 9:24:31 
主题: Design decisions around flink table store 

Hi there, 

Recently I had a chance to get to know the flink-table-store project. I was 
attracted by the idea behind it at first glance. 

After reading the docs, I've got a question in my head for a while. It's about 
the design of the file storage. 

It looks like we can implement it based on the other popular open-source 
libraries other than creating a totally new component (lsm tree based). Hudi or 
iceburg looks like a good choice, since they both support change logs saving 
and querying. 
If we do it like this, there is no need to create a component for other related 
computation engines (spark, hive or trinno) since they are already supported by 
hudi or iceburg. It looks like a better solution for me instead of creating 
another wheel. 

So, here are my questions. Is there any issue not to write data as hudi or 
iceburg? Why don't we choose them in the first design decision? 

Looking forward to your answer! 

(Not knowing if it's a good way to ask questions here, but I didn't find 
another way yet. If it's not ok to ask in the mail, could someone please point 
the right direction for me?) 

Best regards, 
Bright. 



Re: How to add permission validation? flink reads and writes hive table。

2023-01-31 Thread yuxia
HI, melin li. 
Could you please explain a bit more about unified access check in flink? 

Best regards, 
Yuxia 


发件人: "melin li"  
收件人: "User"  
发送时间: 星期三, 2023年 2 月 01日 下午 2:39:15 
主题: How to add permission validation? flink reads and writes hive table。 



flink supports both sql and jar types.How can we implement a unified access 
check in flink? spark supports extensions; flink lacks extensions. 






Re: Custom catalog implementation - getting table schema for computed columns

2023-01-31 Thread yuxia
HI, 
> about the question can I assume that ResolvedCatalogTable will be always a 
> runtime type. 

Sorry for I don't really understand your question , why do you have such 
assumtion? 

Best regards, 
Yuxia 


发件人: "Krzysztof Chmielewski"  
收件人: "User"  
发送时间: 星期六, 2023年 1 月 21日 上午 3:13:12 
主题: Re: Custom catalog implementation - getting table schema for computed 
columns 

Ok, 
so now I see that runtime type of "table" parameter is ResolvedCatalogTable 
that has method getResolvedSchema. 

So I guess my question is, can I assume that ResolvedCatalogTable will be 
always a runtime type? 

pt., 20 sty 2023 o 19:27 Krzysztof Chmielewski < [ 
mailto:krzysiek.chmielew...@gmail.com | krzysiek.chmielew...@gmail.com ] > 
napisał(a): 



Hi, 
I'm implementing a custom Catalog where for "create table" I need to get tables 
schema, both column names and types from DDL. 

Now the Catalog's createTable method has "CatalogBaseTable table" argument. 
The CatalogBaseTable has a deprecated "getSchema" and suggest to use 
getUnresolvedSchema instead. 

I was able to resolve schema types for physical columns, but I'm struggling 
with computed columns [1]. To be more precise I'm struggling to get//resolve 
the type of this field. 

I see that all implementations that would be needed to resolve inderlying 
expression of UnresolvedComputedColumn are marked as @Internal. 

On the other hand the deprecated "getSchema" has a proper type for this 
ComputedColumn. 

I'm wondering now, what should I do. Should I use the deprecated API that has 
what I need already or should I use suggested API and somehow try to resolve 
the type using @Internal classes which also does not seems safe. 

I would appreciate for any hint here. 

[1] [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are
 | 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are
 ] 







Re: Custom catalog implementation - getting table schema for computed columns

2023-01-31 Thread yuxia
Hi. 
Just FYI, I have seen some catalogs are still use deprecated TableSchema in 
flink hive, Iceberg, etc connector. But it's in Flink plan to drop the 
deprecated table schema [1]. In long term, seems use new schema api is a better 
choice. 

If it's for the case of Catalog's createTable method, from the code base [1], 
the passed CatalogBaseTable looks like should be a instance of 
ResolvedCatalogBaseTable with which you can get the resolve schema. From the 
commit history[3], since Flink 1.13, the pased CatalogBaseTable is intance of 
ResolvedCatalogBaseTable. 

I think maybe you can cast it ResolvedCatalogBaseTable and get the resolved 
schema. But please remeber, the cast will fail when the Flink version is lower 
than 1.13 since only from Flink 1.13, the passed CatalogBaseTable is intance of 
ResolvedCatalogBaseTable. 

[1] https://issues.apache.org/jira/browse/FLINK-29072 
[2] 
https://github.com/apache/flink/blob/75a92efd7b35501698e5de253e5231d680830c16/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java#L654
 
[3] [ https://issues.apache.org/jira/browse/FLINK-21396 | 
https://issues.apache.org/jira/browse/FLINK-21396 ] 

Best regards, 
Yuxia 


发件人: "Krzysztof Chmielewski"  
收件人: "User"  
发送时间: 星期六, 2023年 1 月 21日 上午 2:27:25 
主题: Custom catalog implementation - getting table schema for computed columns 

Hi, 
I'm implementing a custom Catalog where for "create table" I need to get tables 
schema, both column names and types from DDL. 

Now the Catalog's createTable method has "CatalogBaseTable table" argument. 
The CatalogBaseTable has a deprecated "getSchema" and suggest to use 
getUnresolvedSchema instead. 

I was able to resolve schema types for physical columns, but I'm struggling 
with computed columns [1]. To be more precise I'm struggling to get//resolve 
the type of this field. 

I see that all implementations that would be needed to resolve inderlying 
expression of UnresolvedComputedColumn are marked as @Internal. 

On the other hand the deprecated "getSchema" has a proper type for this 
ComputedColumn. 

I'm wondering now, what should I do. Should I use the deprecated API that has 
what I need already or should I use suggested API and somehow try to resolve 
the type using @Internal classes which also does not seems safe. 

I would appreciate for any hint here. 

[1] [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are
 | 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/#:~:text=BIGINT%2C%20%60name%60%20STRING)-,Computed%20Columns,-Computed%20columns%20are
 ] 



Re: request for link to join

2023-01-29 Thread yuxia
Hi, all. Here is my invitation link: 
https://join.slack.com/t/apache-flink/shared_invite/zt-1obpql04h-R3o5XM8d~Siyl3KGldkl2Q
 

Best regards, 
Yuxia 


发件人: "P Singh"  
收件人: "Tamir Sagi" , "Wai Chee Yau" 
, "User"  
发送时间: 星期日, 2023年 1 月 29日 下午 7:35:21 
主题: Re: request for link to join 

Unable to join link asking to contact admin. 

Please do the needful. 

Get [ https://aka.ms/o0ukef | Outlook for iOS ] 

From: Tamir Sagi  
Sent: Sunday, January 29, 2023 1:52:51 PM 
To: Wai Chee Yau ; user@flink.apache.org 
 
Subject: Re: request for link to join 
Welcome Wai, 

[ https://flink.apache.org/community.html#slack | 
https://flink.apache.org/community.html#slack ] 


From: Wai Chee Yau  
Sent: Sunday, January 29, 2023 9:48 AM 
To: user@flink.apache.org  
Subject: request for link to join 



EXTERNAL EMAIL 

hi 
can i please get an invite link to join Slack for Flink? 

thanks 



Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately. 
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails. 
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free. 



Re: Detect Table options override by Query Dynamic options

2023-01-28 Thread yuxia
Hi, Krzysztof Chmielewski. 
I'm afraid that there's no a way to detect in Table Factory as the passed 
catalog table has contained the overriden options by query dynamci option and 
seems we have no any flag to identify it in Table Factory. 

Best regards, 
Yuxia 


发件人: "Krzysztof Chmielewski"  
收件人: "User"  
发送时间: 星期五, 2022年 12 月 30日 上午 12:26:01 
主题: Detect Table options override by Query Dynamic options 

Hi, 
I'm working on custom Table Factory implementation. 
Is there a way to detect in Table Factory 
createDynamicTableSink/createDynamicTableSource which table DDL options were 
override by query dynamic options [1]? 

[1] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#dynamic-table-options
 ] 

Regards, 
Krzysztof Chmielewski 



Re: Using TumblingEventTimeWindows on low traffic kafka topic

2022-12-22 Thread yuxia
Yes, your understanding is correct. To handle this, you can define a watermark 
strategy that will detect idleness and mark an input as idle. 
Please refer to these two documents[1][2] for more details. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#idleness
 
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
 

Best regards, 
Yuxia 


发件人: "deepthi s"  
收件人: "User"  
发送时间: 星期四, 2022年 12 月 22日 上午 9:46:00 
主题: Using TumblingEventTimeWindows on low traffic kafka topic 

(Adding subject) 

On Wed, Dec 21, 2022 at 5:41 PM deepthi s < [ 
mailto:deepthi.sridha...@gmail.com | deepthi.sridha...@gmail.com ] > wrote: 





Hello, I am new to even-time processing and need some help. 



We have a kafka source with very low qps and multiple topic partitions have no 
data for long periods of time. Additionally, data from the source can come out 
of order (within bounds) and the application needs to process the events in 
order per key. So we wanted to try and sort the events in the application. 




I am using BoundedOutOfOrdernessWatermarks for generating the watermarks and 
using TumblingEventTimeWindows to collect the keyed events and sort them in the 
ProcessWindowFunction. I am seeing that the window doesn’t close at all and 
based on my understanding it is because there are no events for some source 
partitions. All operators have the same parallelism as source kafka partition 
count. 



Pseudo code for my processing: 



SingleOutputStreamOperator myStream = 

env.fromSource( 

setupSource () , 

WatermarkStrategy. noWatermarks () , 
"Kafka Source" , 

TypeInformation. of (RowData. class )) 
.map(rowData -> convertToMyEvent(rowData)) 
.assignTimestampsAndWatermarks(WatermarkStrategy 
. forBoundedOutOfOrderness ( 
Duration. ofMinutes (misAlignmentThresholdMinutes)) 
.withTimestampAssigner((event , timestamp) -> event. timestamp )) 
// Key the events by urn which is the key used for the output kafka topic 
.keyBy((event) -> event.urn.toString()) 
// Set up a tumbling window of misAlignmentThresholdMinutes 

.window(TumblingEventTimeWindows. of (Time. of (misAlignmentThresholdMinutes , 
TimeUnit. MINUTES ))) 
.process( new EventTimerOrderProcessFunction()) 

.sinkTo(setupSink()) ; 



Is the understanding correct that the based on the WatermarkStrategy I have, 
multiple operators will keep emitting LONG.MIN_VALUE - threshold if no events 
are read for those partitions, causing the downstream keyBy operator also to 
emit LONG.MIN_VALUE - threshold watermark (as the min of all watermarks it sees 
from its input map operators) and so the window doesn’t close at all? If yes, 
what is the right strategy to handle this? Is there a way to combine 
EventTimeTrigger with ProcessingTimeoutTrigger? 




-- 
Regards, 
Deepthi 





-- 
Regards, 
Deepthi 



Re: unsubscribe

2022-12-11 Thread yuxia
To unsubscribe, you should send an email to user-unsubscr...@flink.apache.org, 
which has been documented in Flink offical website[1]. 

[1] https://flink.apache.org/community.html#mailing-lists 

Best regards, 
Yuxia 


发件人: "Ayush"  
收件人: "User"  
发送时间: 星期日, 2022年 12 月 11日 下午 9:41:02 
主题: unsubscribe 

unsubscribe 



Re: How to set disableChaining like streaming multiple INSERT statements in a StatementSet ?

2022-12-07 Thread yuxia
Could you please post the image of the running job graph in Flink UI? 

Best regards, 
Yuxia 


发件人: "hjw"  
收件人: "User"  
发送时间: 星期四, 2022年 12 月 08日 上午 12:05:00 
主题: How to set disableChaining like streaming multiple INSERT statements in a 
StatementSet ? 

Hi, 
I create a StatementSet that contains multiple INSERT statements. 
I found that multiple INSERT tasks will be organized in a operator chain when 
StatementSet.execute() is invoked. 
How to set disableChaining like streaming multiple INSERT statements in a 
StatementSet api ? 

env: 
Flink version:1.14.4 








-- 
Best, 
Hjw 



Re: Registering serializer for RowData

2022-12-06 Thread yuxia
Hi, what's the type of the input for the SortOperator? I mean what's the 
TypeInformation? For example, PojoTypeInfo or RowTypeInfo? 

Best regards, 
Yuxia 


发件人: "Ken Krugler"  
收件人: "User"  
发送时间: 星期三, 2022年 12 月 07日 上午 9:11:17 
主题: Registering serializer for RowData 

Hi there, 

I’m using the Hudi sink to write data, in bulk insert mode, and running into an 
issue where Hudi is unhappy because (I think) Flink is using the Kryo 
serializer for RowData records, instead of something that extends 
AbstractRowDataSerializer. 

It’s this bit of (Hudi) code in SortOperator.java that fails: 

AbstractRowDataSerializer inputSerializer = 
(AbstractRowDataSerializer) 
getOperatorConfig(). getTypeSerializerIn1 (getUserCodeClassloader()); 
this . binarySerializer = new BinaryRowDataSerializer( inputSerializer 
.getArity()); 

And I get: 

Caused by: java.lang.ClassCastException: class 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer cannot be cast 
to class org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer 
(org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer and 
org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer are in 
unnamed module of loader 'app') 
at org.apache.hudi.sink.bulk.sort.SortOperator.open(SortOperator.java:73) 
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
 
… 

So I’m wondering if the Flink table code configures this serializer, and I need 
to do the same in my Java API-based workflow. 

Thanks, 

— Ken 

PS - This is with Flink 1.15.1 and Hudi 0.12.0 

-- 
Ken Krugler 
[ http://www.scaleunlimited.com/ | http://www.scaleunlimited.com ] 
Custom big data solutions 
Flink, Pinot, Solr, Elasticsearch 






Re: Flink Table Kinesis sink not failing when sink fails

2022-11-29 Thread yuxia
Which code line the error message happens? Maybe it will swallow the exception 
and then log the error message, in which case Flink job won't fail since it 
seems like no exception happens. 

Best regards, 
Yuxia 


发件人: "Dan Hill"  
收件人: "User"  
发送时间: 星期三, 2022年 11 月 30日 上午 8:06:52 
主题: Flink Table Kinesis sink not failing when sink fails 

I set up a simple Flink SQL job (Flink v1.14.4) that writes to Kinesis . The 
job looks healthy but the records are not being written. I did not give enough 
IAM permissions to write to Kinesis . However, the Flink SQL job acts like it's 
healthy and checkpoints even though the Kinesis PutRecords call fails. I'd 
expect this error to kill the Flink job. 
I looked through Flink Jira and the Flink user group but didn't see a similar 
issue. 

Is the silent failure a known issue? If the Flink job doesn't fail, it'll be 
hard to detect production issues. 

``` 
2022 - 11 - 29 23 : 30 : 27 , 587 ERROR org.apache.flink. kinesis .shaded. com 
. amazonaws . services . kinesis . producer . LogInputStreamReader [] - [ 2022 
- 11 - 29 23 : 30 : 27.578072 ] [ 0 x1e3b][ 0 x7f12ef8fc700] [error] 
[AWS Log: ERROR](AWSClient)HTTP response code: 400 
Exception name: AccessDeniedException 
Error message: User : arn:aws:sts:: 055315558257 
:assumed-role/dev-workers-us-east-1b-202203101433138915000a/i-09e4f747a4bdbb1f0
 is not authorized to perform: kinesis :ListShards on resource: arn:aws: 
kinesis :us-east-1: 055315558257 :stream/dan-dev-content-metrics because no 
identity -based policy allows the kinesis :ListShards action 
6 response headers: 
connection : close 
content-length : 379 
content-type : application/x-amz-json-1. 1 
date : Tue, 29 Nov 2022 23 : 30 : 27 GMT 
x-amz-id-2 : q8kuplUOMJILzVU97YA+TYSyy6aozeoST+yws26rOkyzEUUZT0zKBdcMWUAjV/ 8 
RrnMeed/+em7CbjpwzGYEANgkwCihZWdC 
x-amzn-requestid : e4a39674-66fa-4dcd-b8a3-0e273e5e628a 
``` 



Re: Weird Flink SQL error

2022-11-23 Thread yuxia
Hi, Dan. 
I'm wondering what type of error you expect. IMO, I think most engines throw 
parse error in such way which tell you encounter an unexpected token. 

Best regards, 
Yuxia 


发件人: "Dan Hill"  
收件人: "User"  
发送时间: 星期三, 2022年 11 月 23日 下午 1:55:20 
主题: Weird Flink SQL error 

Hi. I'm hitting an obfuscated Flink SQL parser error. Is there a way to get 
better errors for Flink SQL? I'm hitting it when I wrap some of the fields on 
an inner Row. 


Works 
CREATE TEMPORARY VIEW `test_content_metrics_view` AS 
SELECT 
DATE_FORMAT(TUMBLE_ROWTIME(rowtime, INTERVAL '1' DAY ), '-MM-dd' ), 
platform_id, 
content_id 
FROM content_event 
GROUP BY 
platform_id, 
content_id, 
TUMBLE(rowtime, INTERVAL '1' DAY ) 

CREATE TABLE test_content_metrics ( 
dt STRING NOT NULL , 
`platform_id` BIGINT, 
`content_id` STRING 
) PARTITIONED BY (dt) WITH ( 
'connector' = 'filesystem' , 
'path' = 'etl/test_content_metrics' , 
'format' = 'json' , 
) 

INSERT INTO `test_content_metrics` 
SELECT * FROM `test_content_metrics_view` 

Fails 

Wrapping a couple parameters in a Row causes the following exception. 

Caused by : org.apache.flink. sql .parser.impl.ParseException: Encountered "." 
at line 1 , column 119 . 
Was expecting one of : 
")" ... 
"," ... 

org.apache.flink. sql 
.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java: 
40981 ) 
org.apache.flink. sql 
.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java: 40792 
) 
org.apache.flink. sql 
.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:
 25220 ) 
org.apache.flink. sql 
.parser.impl.FlinkSqlParserImpl.Expression3(FlinkSqlParserImpl.java: 19925 ) 
org.apache.flink. sql 
.parser.impl.FlinkSqlParserImpl.Expression2b(FlinkSqlParserImpl.java: 19581 ) 
[...] 

CREATE TEMPORARY VIEW `test_content_metrics_view` AS 
SELECT 
DATE_FORMAT(TUMBLE_ROWTIME(rowtime, INTERVAL '1' DAY ), '-MM-dd' ), 
ROW( 
platform_id, 
content_id 
) 
FROM content_event 
GROUP BY 
platform_id, 
content_id, 
TUMBLE(rowtime, INTERVAL '1' DAY ) 

CREATE TABLE test_content_metrics ( 
dt STRING NOT NULL , 
`body` ROW( 
`platform_id` BIGINT, 
`content_id` STRING 
) 
) PARTITIONED BY (dt) WITH ( 
'connector' = 'filesystem' , 
'path' = 'etl/test_content_metrics' , 
'format' = 'json' , 
) 

INSERT INTO `test_content_metrics` 
SELECT * FROM `test_content_metrics_view` 



Re: Flink SQL JSON

2022-11-20 Thread yuxia
Hi! 
Maybe you can map it to Flink's BINARY / VARBINARY type [1]. 

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/#data-type-mapping
 

Best regards, 
Yuxia 


发件人: "Timothy Bess"  
收件人: "User"  
发送时间: 星期六, 2022年 11 月 19日 上午 4:10:41 
主题: Flink SQL JSON 

Hi Flink Users, 

I use Flink SQL to ETL Kafka event data into tables we use for analysis, and 
I'm currently trying to use it to basically extract a few fields out of some 
JSON, but leave the original blob in a subdocument using the elasticsearch 
connector. 

I see that there is `ROW` and `MAP` but I don't see any data types for 
unstructured JSON blobs. This would be really useful for both `jsonb` columns 
in Postgres and subdocuments in Elasticsearch. Has anyone found a good solution 
for this? 

Thanks, 

Tim 



Re: How to use lookup join sql hint

2022-11-13 Thread yuxia
Could you please show us the detail exception? You can find it in 
FLINK_HOME/log 

Best regards, 
Yuxia 


发件人: "Si-li Liu"  
收件人: "User"  
发送时间: 星期六, 2022年 11 月 12日 下午 4:27:54 
主题: How to use lookup join sql hint 

I try to use this sql to insert my data to doris, and my Flink version is 
1.16.0. After check [ 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#join-hints
 | 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#join-hints
 ] . 
Table scrm_admin_dept_user and table scrm_admin_dept use jdbc conector and 
scrm_admin_user use mysql-cdc connector. 

insert into dim_scrm_user_dept_doris_sink 
select /*+ 
LOOKUP('table'='scrm_admin_dept_user'),LOOKUP('table'='scrm_admin_dept') */ [ 
http://a.id/ | a.id ] , b.dept_id, [ http://a.name/ | a.name ] as name, [ 
http://c.name/ | c.name ] as dept_name, a.tenant_id from scrm_admin_user a 
join scrm_admin_dept_user b 
on [ http://a.id/ | a.id ] = b.user_id 
join scrm_admin_dept c 
on b.dept_id = [ http://c.id/ | c.id ] 
where a.deleted = false and b.deleted = false and c.deleted = false; 

But flink reject my sql with this error message: 
[ERROR] Could not execute SQL statement. Reason: 
org.apache.flink.table.client.gateway.SqlExecutionException: Could not execute 
SQL statement. 

Afer I remove this sql hint, the sql can be submitted successfully. Did I get 
anything wrong with this lookup join hint? And also, I'm new to Flink SQL, does 
this pipeline diagram mean I already use lookup join? 

-- 
Best regards 

Sili Liu 



Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-07 Thread yuxia
Wow, cool!  Thanks for your work.
It'll be definitely helpful for the users that want to migrate their batch job 
from DataSet API to DataStream API.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Etienne Chauchot" 
收件人: "dev" , "User" 
发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
主题: [blog article] Howto migrate a real-life batch pipeline from the DataSet 
API to the DataStream API

Hi everyone,

In case some of you are interested, I just posted a blog article about 
migrating a real-life batch pipeline from the DataSet API to the 
DataStream API:

https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html

Best

Etienne


Re: Question about UDF randomly processed input row twice

2022-11-03 Thread yuxia
Thanks for your explanation. The execute plan for the sql `INSERT INTO 
print_table SELECT * FROM ( SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM 
datagenTable ) AS ET WHERE ET.`id_in_bytes` IS NOT NULL` is : 
` 
StreamPhysicalSink(table=[default_catalog.default_database.print_table], 
fields=[id_in_bytes, id]) 
StreamPhysicalCalc(select=[RandomUdf(id) AS id_in_bytes, id], where=[IS NOT 
NULL(RandomUdf(id))]) 
StreamPhysicalTableSourceScan(table=[[default_catalog, default_database, 
datagenTable]], fields=[id]) 
` 
and from the plan, we can see it'll call the udf for twice in the 
StreamPhysicalCalc, as of result of which, it seems the one row will be 
processed for twice. 

Best regards, 
Yuxia 


发件人: "Xinyi Yan"  
收件人: "yuxia"  
抄送: "User"  
发送时间: 星期五, 2022年 11 月 04日 上午 5:28:30 
主题: Re: Question about UDF randomly processed input row twice 

Ok. The datagen with sequence option can produce this issue easily, and it also 
resulted in an incorrect result. I have a sequence generated by datagen that 
starts from 1 to 5 and let the UDF randomly either return null or bytes. 
Surprisingly, not only the UDF has been executed twice but also the where 
clause did not handle the ` IS NOT NULL `. This is a big shock from my side, 
the where clause `IS NOT NULL` condition is a fundamental SQL feature and it 
should not break. I have updated my finding in [ 
https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] , and here 
are the repro steps: 

Query: 
INSERT INTO print_table 
 SELECT * FROM ( 
   SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable 
 ) 
AS ET WHERE ET.`id_in_bytes` IS NOT NULL " 

Result: 
+I[ null , 1] 
+I[[50], 2] 
+I[ null , 4] 


UDF 
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer 
intputNum) { byte [] results = 
intputNum.toString().getBytes(StandardCharsets.UTF_8); int randomNumber = (( 
int ) ( Math .random() * (10 - 1))) + 1;
    LOG.info( "[*][*][*] input num is {} and random number is {}. [*][*][*]" , 
intputNum, randomNumber); if (randomNumber % 2 == 0) {
      LOG.info( "### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
duplicated call??? ### DEBUG  ### ### " , results, intputNum); return results;
    }
    LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); 
return null ;
  } 

Log: 
2022-11-02 13:38:56,765 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:56,766 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:57,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:57,763 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:59,759 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input 
bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
2022-11-02 13:39:00,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input 
bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:02,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.   

Re: Question about UDF randomly processed input row twice

2022-11-03 Thread yuxia
The dategen may produce rows with same values. 

>From my side, in Flink, the udf shouldn't process one row for twice, 
>otherwise, it should be a critical bug. 

Best regards, 
Yuxia 


发件人: "Xinyi Yan"  
收件人: "User"  
发送时间: 星期四, 2022年 11 月 03日 上午 6:59:20 
主题: Question about UDF randomly processed input row twice 

Hi all, 
I found a weird UDF behavior, and it's a single thread that processes UDF 
twice, see [ https://issues.apache.org/jira/browse/FLINK-29855 | FLINK-29855 ] 
for more details. Basically, I created a datagen table with a random integer (1 
row per second) and passed this value into the UDF. Inside UDF, I just simply 
mod the input number, convert the integer to a byte array, and then logged it 
for debugging purposes. As you can see, some of the rows have been called twice 
inside UDF. Not sure if this duplicated UDF call is expected, and not sure why 
it doesn't constantly produce duplicated calls for all rows. In any case of 
concern about the local env setup, I only have 1 task manager and 1 task slot 
in my local Flink cluster. 

Thanks! 

UDF 
public @DataTypeHint( "Bytes" ) byte [] eval(@DataTypeHint( "INT" ) Integer 
intputNum) { byte [] results = 
intputNum.toString().getBytes(StandardCharsets.UTF_8); if (intputNum % 2 == 0) {
      LOG.info( "### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
duplicated call??? ### DEBUG  ### ### " , results, intputNum); return results;
    }
    LOG.info( "*** *** input bytes {} and num {}." , results, intputNum); 
return null ;
  } 

Main class DDLs 
tEnv.executeSql( "CREATE FUNCTION IntInputUdf AS 
'org.apache.flink.playgrounds.spendreport.IntInputUdf' " );        
tEnv.executeSql( "CREATE TABLE datagenTable (\n" + "    id  INT\n" + ") WITH 
(\n" + " 'connector' = 'datagen' ,\n" + " 'number-of-rows' = '100' ,\n" + " 
'rows-per-second' = '1' \n" + ")" );        
tEnv.executeSql( "CREATE TABLE print_table (\n" + "    id_in_bytes  
VARBINARY,\n" + "    id  INT\n" + ") WITH (\n" + " 'connector' = 'print' \n" + 
")" );        
tEnv.executeSql( "INSERT INTO print_table SELECT * FROM ( SELECT 
IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
ET.`id_in_bytes` IS NOT NULL" ); 

Logging 
2022-11-02 13:38:58,760 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:58,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954 .   ### ### 
DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:38:59,759 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input 
bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437 .
2022-11-02 13:39:00,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** *** input 
bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483 .
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
2022-11-02 13:39:01,761 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ### input 
bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686 .   ### 
### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 



Re: why select limit so slow on yarn cluster

2022-11-01 Thread yuxia
Such sql should be finished quickly. 
Could you please look at which part cost much time? Starting the job or running 
the job or other stuff. 
And how about the network? 
Btw, what's the logic plan and physical plan? You can use `explain select * 
from scrm_admin_role limit 10` to show the plan. 

Best regards, 
Yuxia 


发件人: "Si-li Liu"  
收件人: "User"  
发送时间: 星期三, 2022年 11 月 02日 上午 12:17:52 
主题: why select limit so slow on yarn cluster 


I created a table using Flink SQL on yarn session. 

CREATE TEMPORARY TABLE `scrm_admin_role` ( 
> `id` bigint, 
> `role_name` string, 
> `sort` int, 
> `type` tinyint, 
> `status` tinyint, 
> `tenant_id` bigint, 
> `deleted` BOOLEAN, 
> `create_time` TIMESTAMP, 
> `update_time` TIMESTAMP, 
> PRIMARY KEY (`id`) NOT ENFORCED 
> ) WITH ( 
> 'connector' = 'jdbc', 
> 'url'='jdbc:mysql:// [ http://172.17.16.45:3306/willing_base | 
> 172.17.16.45:3306/willing_base ] ', 
> 'username' = '*', 
> 'password' = *', 
> 'table-name' = 'scrm_admin_role' 
> ); 

The origin table is very small, about 10k lines. 

Then I tried to select to check the data, select * from scrm_admin_role limit 
10; It can retrieve the result, but very slow, maybe took about 2~3 minutes. 
Cloud anyone tell me why and how cloud I make it fast. 

-- 
Best regards 

Sili Liu 



Re: Could not find any factory for identifier 'filesystem'

2022-11-01 Thread yuxia
the dependency flink-connector-files is needed. 

Best regards, 
Yuxia 


发件人: "Pavel Penkov"  
收件人: "User"  
发送时间: 星期二, 2022年 11 月 01日 下午 6:06:43 
主题: Could not find any factory for identifier 'filesystem' 

I'm trying to run a Flink job as a standalone program and getting the following 
error. 
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'filesystem' that implements 
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath. 

Available factory identifiers are: 

blackhole 
datagen 
kafka 
print 
upsert-kafka 

Here's a list of dependencies (Flink version is 1.16.0) 

libraryDependencies ++= Seq( 
// Flink 
"org.apache.flink" %% "flink-streaming-scala" % versions.flink, 
"org.apache.flink" %% "flink-table-api-scala" % versions.flink, 
"org.apache.flink" %% "flink-table-api-scala-bridge" % versions.flink, 
"org.apache.flink" % "flink-connector-kafka" % versions.flink, 
"org.apache.flink" % "flink-avro-confluent-registry" % versions.flink, 
"org.apache.flink" %% "flink-table-planner" % versions.flink, 
"org.apache.flink" % "flink-avro" % versions.flink, 
"org.apache.flink" % "flink-clients" % versions.flink, 
"org.apache.flink" % "flink-runtime" % versions.flink, 
"org.apache.flink" % "flink-runtime-web" % versions.flink, 
"org.apache.flink" % "flink-parquet" % versions.flink, 
// Hadoop 
"org.apache.hadoop" % "hadoop-client" % versions.hadoop, 
// Misc 
"org.rogach" %% "scallop" % "4.1.0", 
"ch.qos.logback" % "logback-classic" % "1.4.4" 
) 

I've also tried to run it on a host that has Hadoop installed setting 
HADOOP_CLASSPATH and HADOOP_CONF_DIR but the result is the same. 



Re: SQL Lookup join on nested field

2022-10-18 Thread yuxia
AFAK,there's no any plan/ticket for it. If you think it's needed to be 
supported, you can create a tiket in jira[1] for it. 
[1] https://issues.apache.org/jira/projects/FLINK/summary 

Best regards, 
Yuxia 


发件人: "Krzysztof Chmielewski"  
收件人: "User"  
发送时间: 星期二, 2022年 10 月 18日 下午 3:28:26 
主题: SQL Lookup join on nested field 

Hi all, 
I have found an old thread [1] where there was a question about SQL joins on 
nested fields. 

The conclusion was that (quote): 
"temporal table join doesn't support join on a nested join. 
In blink planner, a temporal table join will be translated into lookup join 
which will use the equality condition fields as the lookup keys." 

The thread is from 2020 and I was wondering if there is any plan/ticket about 
adding support for nested fields to lookup join/temporal join 

I've checked this use case on Flink 1.15.2 and it seems still not supported. 

[1] [ https://lists.apache.org/thread/o3fc5lrqf6dbkl9pm0rp2mqyt7mcnsv3 | 
https://lists.apache.org/thread/o3fc5lrqf6dbkl9pm0rp2mqyt7mcnsv3 ] 

Best Regards, 
Krzysztof Chmielewski 
SQL join on nested field 



Re: Flink 1.15 Interval Join error after Deduplicate

2022-10-16 Thread Yuxia Luo
The view A try to  do de-duplication using event time,  which will still 
produce update rows.  if you using proc time  to do de-duplication.Then the 
view A should only produce append only rows.

Best regards,
Yuxia



> 2022年10月15日 上午9:50,liebin...@whu.edu.cn 写道:
> 
> I had a problem with Interval Join after using Deduplicate. I'm using Flink 
> version 1.15.
> 
> I want to use Flink's Interval Join for double-stream association, and my 
> first table needs to be de-duplicated. Here is my sample code.
> 
> ```
> CREATE TEMPORARY TABLE `source` (
>  id INT,
>  name STRING,
>  event_time TIMESTAMP(3),
>  WATERMARK FOR event_time AS event_time
> ) WITH (
>  'connector' = 'datagen'
> );
> 
> 
> CREATE TEMPORARY TABLE B (
>  id INT,
>  `start` INT,
>  `end` INT,
>  event_time TIMESTAMP(3),
>  WATERMARK FOR event_time AS event_time
> ) WITH (
>  'connector' = 'datagen'
> );
> 
> create TEMPORARY view A as
> select id, name, event_time from (
>  select id, name, event_time,
>  row_number() over(partition by id, name, event_time order by event_time asc) 
> as rn
>  from source
> )
> where rn = 1;
> 
> SELECT *
> FROM A, B
> WHERE 
>A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND 
>A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND 
>B.event_time + INTERVAL '10' SECOND;
> ```
> 
> I used to preserve the first row of data for the de-duplication, so view A 
> should only produce insert rows, but running the SQL above would produce the 
> following error.
> 
> ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't 
> support consuming update and delete changes which is produced by node 
> Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME])
> ```
> 
> How to perform Interval Join after using Deduplicate?



Re: Flink 1.15 Interval Join error after Deduplicate

2022-10-16 Thread Yuxia Luo
> view A should only produce insert rows
No, the view A will still produce update/delete rows. 

Best regards,
Yuxia



> 2022年10月15日 上午9:50,liebin...@whu.edu.cn 写道:
> 
> view A should only produce insert rows



Re: SQL Changes between 1.14 and 1.15?

2022-10-16 Thread Yuxia Luo
Thanks for raising it. It seems a bug that introduced by this pr [1]. I have 
created [FLINK-29651] to trace it. 

[1] https://github.com/apache/flink/pull/19001 
<https://github.com/apache/flink/pull/19001>
[2] https://issues.apache.org/jira/browse/FLINK-26520 
<https://issues.apache.org/jira/browse/FLINK-26520>

Best regards,
Yuxia



> 2022年10月14日 下午9:19,PACE, JAMES  写道:
> 
> We’ve noticed the following difference in sql when upgrading from flink 
> 1.14.5 to 1.15.2 around characters that are escaped in an sql statement:
>  
> This statement:
>   tableEnvironment.executeSql("select * from testTable WHERE lower(field1) 
> LIKE 'b\"cd\"e%'");
> produces a runtime error in flink 1.15.2, but executes properly in flink 
> 1.14.5
>  
> This can be worked around by escaping the backslash, changing the statement 
> to:
>   tableEnvironment.executeSql("select * from testTable WHERE lower(field1) 
> LIKE 'b\\\"cd\\\"e%'");
>  
> This code illustrates the issue:
>  
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Schema;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>  
> public class TestCase3 {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
>  
> TestData testData = new TestData();
> testData.setField1("b\"cd\"e");
> DataStream stream = env.fromElements(testData);
> stream.print();
> final StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(env);
> tableEnvironment.createTemporaryView("testTable", stream, 
> Schema.newBuilder().build());
>  
> // Works with Flink 1.14.x, flink runtime errors in 1.15.2.  
> Uncomment to see runtime trace
> //tableEnvironment.executeSql("select *, '1' as run from testTable 
> WHERE lower(field1) LIKE 'b\"cd\"e%'").print();
> // Works with 1.15.2
> tableEnvironment.executeSql("select * from testTable WHERE 
> lower(field1) LIKE 'b\\\"cd\\\"e%'").print();
>  
> env.execute("TestCase");
> }
>  
> public static class TestData {
> private String field1;
>  
> public String getField1() { return field1; }
> public void setField1(String field1) { this.field1 = field1; }
> }
> }
>  
> Thanks
> Jim



Re: Sorting by source event time

2022-09-26 Thread yuxia
You can change to "order by eventTIme". And it should work.

You can sort on event time, but it must be time-ascending-order without 'limit'.
If you still want to a descending order, I think you can try to set the 
internal configuration `__table.exec.sort.non-temporal.enabled__` to true.
But remember it's just experimental, which may bring unexpect behavior.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Noel OConnor" 
收件人: "User" 
发送时间: 星期二, 2022年 9 月 27日 上午 2:10:36
主题: Sorting by source event time

Hi,
I have a temporary view created from a datastream.

tableEnv.createTemporaryView("productDetailsView", productStream,
Schema.newBuilder()
.columnByMetadata("eventTime",
"TIMESTAMP_LTZ(3)", "rowtime", Boolean.TRUE)
.watermark("eventTime", "SOURCE_WATERMARK()")
.build());


and i'm trying to sort it using the following

Table resultTable2 = tableEnv.sqlQuery(
"SELECT * FROM productDetailsView ORDER BY eventTime DESC");

but I get the following error

Caused by: org.apache.flink.table.api.TableException: Sort on a
non-time-attribute field is not supported.

Can you sort on event time or does it have to be part of the actual
payload data type?
Have I missed something obvious ?

cheers
Noel


Re: Insert into JDBC table

2022-09-12 Thread yuxia
"tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES 
(4000, 100, 'customer')");" should work. If not work, it seems to be a bug.

>> "Flink dynamic table is just a link to real data"
Yes, it's.

>> Is there any way to create empty table? Or table with some values defined in 
>> Flink?
Maybe you can try create table with Hive dialect[1] which enable you create a 
table in Hive using Flink SQL.
Or you can try flink-table-store.
AFAK, seems we can't create a table with some values defined directly.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hive/hive_dialect/
[2] https://nightlies.apache.org/flink/flink-table-store-docs-master/
. Or

Best regards,
Yuxia

- 原始邮件 -
发件人: "podunk" 
收件人: "User" 
发送时间: 星期一, 2022年 9 月 12日 下午 8:36:54
主题: Re: Insert into JDBC table

I see I can only insert into JDBC table with select from another table, 
something like:
 
tEnv.executeSql("INSERT INTO Customers SELECT customer_number, pid_no, name 
FROM another_table");

But what if I want to insert row that I created within Flink? For instance I 
made some calculation and I want to insert completely new row into table (it 
does not exist in any table)? Something like:

tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES 
(4000, 100, 'customer')");

?

One more question - Flink dynamic table is just a link to real data (right?). 
Is there any way to create empty table? Or table with some values defined in 
Flink?

Thanks for help,

M.

 
 

Sent: Friday, September 09, 2022 at 3:03 PM
From: pod...@gmx.com
To: user@flink.apache.org
Subject: Insert into JDBC table
Why this INSERT does not insert row in table (jdbc connection works, I can 
create 'Customers' table from MySQL table)?
 
tEnv.executeSql("CREATE TABLE Customers ("
+ " customer_number INT, "
+ " pid_no INT, "
+ " name STRING, "
+ " PRIMARY KEY (customer_number) NOT ENFORCED"
+ " ) WITH ( "
+ " 'connector' = 'jdbc', "
+ " 'url' = 'jdbc:mysql://localhost:3306/test', "
+ " 'username' = 'some_user', "
+ " 'table-name' = 'customers', "
+ " 'password' = ''"
+ ")");

//This insert does nothing (not even error)
tEnv.executeSql("INSERT INTO Customers (customer_number, pid_no, name) VALUES 
(4000, 100, 'customer')");

According to documentation 
(https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/)
 it should work.
Regards,

Mike


Re: Access to Table environent properties/Job arguents from DynamicTableFactory

2022-09-12 Thread yuxia
Have you ever checked the DynamicTableFactory.Context#getConfiguration? 
Is it something that you're looking for? 

Best regards, 
Yuxia 


发件人: "Krzysztof Chmielewski"  
收件人: "User"  
发送时间: 星期六, 2022年 9 月 10日 上午 12:51:09 
主题: Access to Table environent properties/Job arguents from DynamicTableFactory 

Hi, 
is there a way to access a Table Environment configuration or Job arguments 
from DynamicTableFactory (Sink/Source)? 

I'm guessing no but I just want to double check that I'm not missing anything 
here. 
For my understanding we have access only to Table definition. 

Thanks, 
Krzysztof Chmielewski 



Re: get NoSuchMethodError when using flink flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar

2022-08-31 Thread yuxia
How do you use `flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar`? Do you use sql 
client ? Do you put it in FLINK_HOME/lib? 
If it's for sql client, I think you can remove the jar from FLINK_HOME/lib, but 
add it in Flink SQL client using `add jar 
'flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar' `, and set 
'org.apache.commons.' the to parent-first[1] 

But I think the better way is to relocate the class. 
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#classloader-parent-first-patterns-default
 

Best regards, 
Yuxia 


发件人: "Liting Liu (litiliu)"  
收件人: "User"  
发送时间: 星期三, 2022年 8 月 31日 下午 5:14:35 
主题: get NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar 

Hi, i got NoSuchMethodError when using flink 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar. 
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue. 
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) 
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
Caused by: java.lang.NoSuchMethodError: 
org.apache.commons.lang3.StringUtils.join([IC)Ljava/lang/String; 
at 
org.apache.flink.table.planner.plan.utils.RankProcessStrategy$UpdateFastStrategy.toString(RankProcessStrategy.java:129)
 
at java.lang.String.valueOf(String.java:2994) 
at java.lang.StringBuilder.append(StringBuilder.java:136) 
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.explain(RelDescriptionWriterImpl.java:67)
 
at 
org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl.done(RelDescriptionWriterImpl.java:96)
 
at org.apache.calcite.rel.AbstractRelNode.explain(AbstractRelNode.java:246) 
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription(FlinkRelNode.scala:50)
 
at 
org.apache.flink.table.planner.plan.nodes.FlinkRelNode.getRelDetailedDescription$(FlinkRelNode.scala:46)
 
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRank.getRelDetailedDescription(StreamPhysicalRank.scala:41)
 
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:701)
 
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1(FlinkChangelogModeInferenceProgram.scala:738)
 
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitRankStrategies$1$adapted(FlinkChangelogModeInferenceProgram.scala:730)
 
at scala.collection.Iterator.foreach(Iterator.scala:937) 
at scala.collection.Iterator.foreach$(Iterator.scala:937) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) 
at scala.collection.IterableLike.foreach(IterableLike.scala:70) 
at scala.collection.IterableLike.foreach$(IterableLike.scala:69) 
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitRankStrategies(FlinkChangelogModeInferenceProgram.scala:730)
 
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:489)
 

Seems there is an embeded StringUtils in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar. which confilict with other 
class. 

What should i do? 
Do I have to manually excude StringUtils.class in 
flink-sql-connector-hive-2.2.0_2.11-1.14.4.jar? 




Re: Failing to maven compile install Flink 1.15

2022-08-18 Thread yuxia
which mvn version do you use? It's recommanded to use maven 3.2.5 

Best regards, 
Yuxia 


发件人: "hjw" <1010445...@qq.com> 
收件人: "User"  
发送时间: 星期四, 2022年 8 月 18日 下午 10:48:57 
主题: Failing to maven compile install Flink 1.15 

I try to maven clean install Flink 1.15 parent,but fail. 
A Error happened in compiling flink-clients. 
Error Log: 
Failed to execute goal 
org.apache.maven.plugins:maven-assembly-plugin:2.4:single 
(create-test-dependency) on project flink-clients: Error reading assemblies: 
Error locating assembly descriptor: src/test/assembly/test-assembly.xml 

[1] [INFO] Searching for file location: 
D:\learn\Code\Flink\FlinkSourceCode\Flink-1.15\flink\flink-clients\target\src\test\assembly\test-assembly.xml
 

[2] [INFO] File: 
D:\learn\Code\Flink\FlinkSourceCode\Flink-1.15\flink\flink-clients\target\src\test\assembly\test-assembly.xml
 does not exist. 

[3] [INFO] File: 
D:\learn\Code\Flink\FlinkSourceCode\Flink-1.15\flink\src\test\assembly\test-assembly.xml
 does not exist. 


However, mvn clean package Flink 1.15 parent and flink-client alone are 
successful. 




Re: Failing to compile Flink 1.9 with Scala 2.12

2022-08-18 Thread yuxia
At least for Flink 1.15, it's recommended to use maven 3.2.5. So I guess maybe 
you can try use a lower version of maven. 

Best regards, 
Yuxia 


发件人: "Milind Vaidya"  
收件人: "Weihua Hu"  
抄送: "User"  
发送时间: 星期五, 2022年 8 月 19日 上午 1:26:45 
主题: Re: Failing to compile Flink 1.9 with Scala 2.12 

Hi Weihua, 

Thanks for the update. I do understand that, but right now it is not possible 
to update immediately to 1.15, so wanted to know what is the way out. 

- Milind 

On Thu, Aug 18, 2022 at 7:19 AM Weihua Hu < [ mailto:huweihua@gmail.com | 
huweihua@gmail.com ] > wrote: 



Hi 
Flink 1.9 is not updated since 2020-04-24, it's recommended to use the latest 
stable version 1.15.1. 


Best, 
Weihua 


On Thu, Aug 18, 2022 at 5:36 AM Milind Vaidya < [ mailto:kava...@gmail.com | 
kava...@gmail.com ] > wrote: 

BQ_BEGIN

Hi 

Trying to compile and build Flink jars based on Scala 2.12. 

Settings : 
Java 8 
Maven 3.6.3 / 3.8.6 

Many online posts suggest using Java 8 which is already in place. 

Building using Jenkins. Any clues as to how to get rid of it? 


 
 
net.alchim31.maven 
scala-maven-plugin 
3.3.2 
 
 
-nobootcp 
 
 
-Xss2m 
 
 
 


Exception : 

Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.3.2:compile 
(scala-compile-first) on project flink-table-planner-blink_2.12: wrap: 
org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit 
value: 1) -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal 
net.alchim31.maven:scala-maven-plugin:3.3.2:compile (scala-compile-first) on 
project flink-table-planner-blink_2.12: wrap: 
org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit 
value: 1)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:212)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
at 
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
at 
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
at org.apache.maven.cli.MavenCli.execute(MavenCli.java:863)
at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:288)
at org.apache.maven.cli.MavenCli.main(MavenCli.java:199)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
at 
org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
Caused by: org.apache.maven.plugin.MojoExecutionException: wrap: 
org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit 
value: 1)
at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:593)
at 
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:207)
... 20 more
Caused by: org.apache.commons.exec.ExecuteException: Process exited with an 
error: 1 (Exit value: 1)
at 
org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:377)
at 
org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:160)
at 
org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:147)
at 
scala_maven_executions.JavaMainCallerByFork.run(JavaMainCallerByFork.java:89)
at 
scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:161)
at 
scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99)
at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:585) 




BQ_END




Re: get state from window

2022-08-17 Thread yuxia
Sorry for misleading.
After some investigation, seems UDTAGG can only used in flink table spi.

Best regards,
Yuxia

- 原始邮件 -
发件人: "yuxia" 
收件人: "user-zh" 
抄送: "User" 
发送时间: 星期四, 2022年 8 月 18日 上午 10:21:12
主题: Re: get state from window

> does flink sql support UDTAGG?
Yes, Flink sql support UDTAGG.

Best regards,
Yuxia

- 原始邮件 -
发件人: "曲洋" 
收件人: "user-zh" , "User" 
发送时间: 星期四, 2022年 8 月 18日 上午 10:03:24
主题: get state from window

Hi dear engineers,

I have one question:  does flink streaming support getting the state.I override 
the open method in the map processor,initializing
Hi dear engineers,


One small question:  does flink sql support UDTAGG? (user-defined table 
aggregate function), seems only supported in flink table api? If not supported 
in flink sql, how can I define an aggregated udf which could output multiple 
rows to kafka.


Thanks for your help!
Thanks for your help!


Re: get state from window

2022-08-17 Thread yuxia
> does flink sql support UDTAGG?
Yes, Flink sql support UDTAGG.

Best regards,
Yuxia

- 原始邮件 -
发件人: "曲洋" 
收件人: "user-zh" , "User" 
发送时间: 星期四, 2022年 8 月 18日 上午 10:03:24
主题: get state from window

Hi dear engineers,

I have one question:  does flink streaming support getting the state.I override 
the open method in the map processor,initializing
Hi dear engineers,


One small question:  does flink sql support UDTAGG? (user-defined table 
aggregate function), seems only supported in flink table api? If not supported 
in flink sql, how can I define an aggregated udf which could output multiple 
rows to kafka.


Thanks for your help!
Thanks for your help!


Re: without DISTINCT unique lines show up many times in FLINK SQL

2022-08-17 Thread yuxia
Seems it's the same problem to the problem discussed in [1]

[1]:https://lists.apache.org/thread/3lvkd8hryb1zdxs3o8z65mrjyoqzs88l

Best regards,
Yuxia

- 原始邮件 -
发件人: "Marco Villalobos" 
收件人: "User" 
发送时间: 星期三, 2022年 8 月 17日 下午 12:56:44
主题: without DISTINCT unique lines show up many times in FLINK SQL

Hello everybody,

When I perform this simple set of queries, a unique line from the source file
shows up many times.

I have verified many times that a unique line in the source shows up as much as 
100 times in the select statement.

Is this the correct behavior for Flink 1.15.1?

FYI, it does show the correct results when I perform a DISTINCT query.

Here is the SQL:


CREATE TABLE historical_raw_source_template(
`file.path`  STRING NOT NULL METADATA,
`file.name`  STRING NOT NULL METADATA,
`file.size`  BIGINT NOT NULL METADATA,
`file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA,
lineSTRING
  ) WITH (
'connector' = 'filesystem',   -- required: specify the connector
'format' = 'raw'  -- required: file system connector 
requires to specify a format
  );


CREATE TABLE historical_raw_source
  WITH (
'path' = 's3://raw/'  -- required: path to a directory
  ) LIKE historical_raw_source_template;


SELECT
`file.modification-time` AS modification_time,
`file.path` AS file_path,
line
  FROM
  historical_raw_source


Re: Is this a Batch SQL Bug?

2022-08-17 Thread yuxia
Thanks for raising it. Yes, you're right. It's indeed a bug.
The problem is the RowData produced by LineBytesInputFormat is reused, but 
DeserializationSchemaAdapter#Reader only do shallow copy of the produced data, 
so that the finnal result will always be the last row value.

Could you please help create a jira to track it?

Best regards,
Yuxia

- 原始邮件 -
发件人: "Marco Villalobos" 
收件人: "User" 
发送时间: 星期四, 2022年 8 月 18日 上午 6:08:33
主题: Is this a Batch SQL Bug?

Given this program:

```java
package mvillalobos.bug;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class IsThisABatchSQLBug {

   public static void main(String[] args) {
  final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);
  tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
"`file.path`  STRING NOT NULL METADATA,\n" +
"`file.name`  STRING NOT NULL METADATA,\n" +
"`file.size`  BIGINT NOT NULL METADATA,\n" +
"`file.modification-time` TIMESTAMP_LTZ(3) NOT NULL 
METADATA,\n" +
"lineSTRING\n" +
"  ) WITH (\n" +
"'connector' = 'filesystem', \n" +
"'format' = 'raw'\n" +
"  );");
  tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
"  WITH (\n" +
"'path' = 
'/Users/minmay/dev/mvillalobos/historical/data'\n" +
"  ) LIKE historical_raw_source_template;");

  final TableResult output = 
tableEnv.from("historical_raw_source").select($("line")).execute();
  output.print();
   }
}
```

and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' 
directory:

```text
one
two
three
four
five
six
seven
eight
nine
ten
```

The print results are:
```text
+++
| +I |ten |
| +I |ten |
| +I |ten |
| +I |ten |
| +I |ten |
| +I |ten |
| +I |ten |
| +I |ten |
| +I |ten |
| +I |ten |
+++
10 rows in set
```

I was expecting all rows to print. If this is not a bug, then what am I 
misunderstanding?

I do noticre that the transient field:

private transient RecordCollector collector;

in 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.LineBytesInputFormat

becomes empty on each iteration, as though it failed to serialize correctly.

Regardless, I don't know what's wrong.  Any advice would deeply help.

Marco A. Villalobos


Re: Metrics & Monitoring in Flink SQL

2022-07-17 Thread yuxia
I'm afraid of there's no document for it. But there's a FLIP[1] define the 
metrics that the connector (source / sink ) should expose. And I think the 
offical connectors will cover most of these metrics if possbile. 
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
 

Best regards, 
Yuxia 


发件人: "casel.chen"  
收件人: "yuxia"  
抄送: "Salva Alcántara" , "User"  
发送时间: 星期日, 2022年 7 月 17日 下午 12:00:11 
主题: Re:Re: Metrics & Monitoring in Flink SQL 



How to get all metrics of those connectors shipped inline with flink release? 
any document? 
















At 2022-07-13 11:05:43, "yuxia"  wrote: 


With Flink SQL, You can define your own source/sink metrics [1], but you can't 
define the metrics for the intermediate operators. 

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
 

Best regards, 
Yuxia 


发件人: "Salva Alcántara"  
收件人: "User"  
发送时间: 星期三, 2022年 7 月 13日 上午 1:33:30 
主题: Metrics & Monitoring in Flink SQL 

I have a question regarding Flink SQL, which I'm lately getting into. So far, 
my experience is with the DataStream API mostly. In that context, it's easy for 
me to generate metrics for my operators. However, I'm just wondering which 
level of control there is regarding monitoring & metrics when working with 
Flink SQL. Is it possible to define "metrics for your queries"? Whatever that 
means and assuming that it makes any sense :laughing:. At least I should be 
able to generate typical metrics for common connectors, e.g., messages 
read/produced & things like accumulated lag for the case of the kafka 
connector, to put an example. Sorry for the vagueness, but I could not find a 
section for metrics & monitoring within the Flink SQL docs. 
Any guidance would be appreciated! 

Salva 






Re: Metrics & Monitoring in Flink SQL

2022-07-12 Thread yuxia
With Flink SQL, You can define your own source/sink metrics [1], but you can't 
define the metrics for the intermediate operators. 

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
 

Best regards, 
Yuxia 


发件人: "Salva Alcántara"  
收件人: "User"  
发送时间: 星期三, 2022年 7 月 13日 上午 1:33:30 
主题: Metrics & Monitoring in Flink SQL 

I have a question regarding Flink SQL, which I'm lately getting into. So far, 
my experience is with the DataStream API mostly. In that context, it's easy for 
me to generate metrics for my operators. However, I'm just wondering which 
level of control there is regarding monitoring & metrics when working with 
Flink SQL. Is it possible to define "metrics for your queries"? Whatever that 
means and assuming that it makes any sense :laughing:. At least I should be 
able to generate typical metrics for common connectors, e.g., messages 
read/produced & things like accumulated lag for the case of the kafka 
connector, to put an example. Sorry for the vagueness, but I could not find a 
section for metrics & monitoring within the Flink SQL docs. 
Any guidance would be appreciated! 

Salva 



Re: Parsing a JSON array string as a Flink SQL Array data type

2022-07-12 Thread yuxia
I'm afraid of there's no build-in function on the hand. But you can write a 
UDF[1] to convert the JSON array string to Flink's array.
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/

Best regards,
Yuxia

- 原始邮件 -
发件人: "Abhishek Rai" 
收件人: "User" , "Jordan Hannel" 
发送时间: 星期三, 2022年 7 月 13日 上午 3:02:15
主题: Parsing a JSON array string as a Flink SQL Array data type

Hello!

I'm trying to use the new JSON functions in Flink 1.15 to parse JSON
input data.  In particular, using JSON_QUERY, I'm able to extract out
JSON array elements from a larger JSON record.  However, this function
returns the JSON array as a string.

I'd like to run this array through the SQL UNNEST operator, which
takes an ARRAY data type as an input, not a string.  So how do I
convert a string encoded JSON array to a Flink SQL Array data type, so
I can use it with UNNEST?

Unfortunately, I'm not able to locate any documentation on this.

Thanks for your help!
Abhishek


Re: Does Table API connector, csv, has some option to ignore some columns

2022-07-12 Thread yuxia
For Json format, you only need to define the parital columns to be selected in 
Flink DDL. 
But for csv format, it's not supported. In csv file, if there's no header, how 
can you mapping the incomplete columns defined in Flink DDL to the origin 
fields in the csv file? Thus, you need to write the all columns so that we can 
do the mapping. If there's a header, we can do the mapping, and it should meet 
your requirement. However, the current implementation haven't consider such 
case. 



Best regards, 
Yuxia 


发件人: "podunk"  
收件人: "User"  
发送时间: 星期二, 2022年 7 月 12日 下午 5:13:05 
主题: Re: Re: Does Table API connector, csv, has some option to ignore some 
columns 

This is really surprising. 
When you import data from a file, you really rarely need to import everything 
from that file. Most often it is several columns. 
So the program that reads the file should be able to do this - this is the ABC 
of working with data. 
Often the suggestion is "you can write your script". Sure. I can. I can write 
the entire program here - from scratch. 
But I use a ready-made program to avoid writing my scripts. 
Sent: Tuesday, July 12, 2022 at 12:24 AM 
From: "Alexander Fedulov"  
To: pod...@gmx.com 
Cc: "user"  
Subject: Re: Re: Does Table API connector, csv, has some option to ignore some 
columns 
Hi podunk, 
no, this is currently not possible: 
> Currently, the CSV schema is derived from table schema. [1] 
So the Table schema is used to define how Jackson CSV parses the lines and 
hence needs to be complete. 
[1] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/
 ] 
Best, 
Alexander Fedulov 
On Mon, Jul 11, 2022 at 5:43 PM < [ mailto:pod...@gmx.com | pod...@gmx.com ] > 
wrote: 



No, I did not mean. 
I said 'Does Table API connector, CSV, has some option to ignore some columns 
in source file?' 
Sent: Monday, July 11, 2022 at 5:28 PM 
From: "Xuyang" < [ mailto:xyzhong...@163.com | xyzhong...@163.com ] > 
To: [ mailto:pod...@gmx.com | pod...@gmx.com ] 
Cc: [ mailto:user@flink.apache.org | user@flink.apache.org ] 
Subject: Re:Re: Does Table API connector, csv, has some option to ignore some 
columns 


Hi, did you mean `insert into table1 select col1, col2, col3 ... from table2`? 



If this doesn't meet your requirement, what about using UDF to custom what you 
want in runtime. 




-- 
Best! 
Xuyang 




在 2022-07-11 16:10:00, [ mailto:pod...@gmx.com | pod...@gmx.com ] 写道: 
BQ_BEGIN

I want to control what I insert in table not what I get from table. 
Sent: Monday, July 11, 2022 at 3:37 AM 
From: "Shengkai Fang" < [ mailto:fskm...@gmail.com | fskm...@gmail.com ] > 
To: [ mailto:pod...@gmx.com | pod...@gmx.com ] 
Cc: "user" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
Subject: Re: Does Table API connector, csv, has some option to ignore some 
columns 
Hi. 
In Flink SQL, you can select the column that you wants in the query. For 
example, you can use 
``` 
SELECT col_a, col_b FROM some_table; 
``` 
Best, 
Shengkai 
< [ mailto:pod...@gmx.com | pod...@gmx.com ] > 于2022年7月9日周六 01:48写道: 

BQ_BEGIN

Does Table API connector, CSV, has some option to ignore some columns in source 
file? 
For instance read only first, second, nine... but not the others? 
Or any other trick? 
CREATE TABLE some_table ( some_id BIGINT , ... ) WITH ( 'format' = 'csv' , ... 
) 




BQ_END


BQ_END




Re: How can I convert a DataSet into a Table?

2022-07-10 Thread yuxia
I'm afraid we have no way to do such conversion in Flink 1.15. 

But for you case, which is to read from csv file in table api. You can try as 
follows: 

tableEnv.createTemporaryTable(" csvInput ", 
TableDescriptor.forConnector("filesystem") 
.schema(schema) 
.option("path", "/path/to/file") 
.format(FormatDescriptor.forFormat("csv") 
.option("field-delimiter", "|") 
.build()) 
.build()) 

Table table1 = tEnv.from(" csvInput ").xxx 

See more in the Flink doc[1] 
[1]: [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api
 ] 


Best regards, 
Yuxia 


发件人: "podunk"  
收件人: "User"  
发送时间: 星期三, 2022年 7 月 06日 上午 5:09:54 
主题: How can I convert a DataSet into a Table? 

My code is: 
package flinkTest2; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.table.api.EnvironmentSettings; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.TableEnvironment; 
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
public class flinkTest2 { 
public static void main(String[] args) throws Exception { 
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// read a CSV file with five fields, taking only two of them 
DataSet> csvInput = env.readCsvFile("c:/CSV/file") 
.includeFields("10010") // take the first and the fourth field 
.types(String.class, Double.class); 

//register and create table 
EnvironmentSettings settings = EnvironmentSettings 
.newInstance() 
//.inStreamingMode() 
.inBatchMode() 
.build(); 
TableEnvironment tEnv = TableEnvironment.create(settings); 
//Insert CSV content into table, define column names and read some rows from it 
} 
} 
What to do create table, insert DataSet csvInput into table and read some rows 
from it (into text file)? 
Thanks for help 
Mike 



Re: How can I convert a DataSet into a Table?

2022-07-05 Thread yuxia
What's the version of Flink you are using? 
In Flink 1.13, you can use BatchTableEnvironment#fromDataSet() to do that. But 
since Flink 1.14, the method has been removed. 

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/api/bridge/java/BatchTableEnvironment.html#fromDataSet-org.apache.flink.api.java.DataSet-java.lang.String-
 

Best regards, 
Yuxia 


发件人: pod...@gmx.com 
收件人: "User"  
发送时间: 星期三, 2022年 7 月 06日 上午 5:09:54 
主题: How can I convert a DataSet into a Table? 

My code is: 
package flinkTest2; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.table.api.EnvironmentSettings; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.TableEnvironment; 
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
public class flinkTest2 { 
public static void main(String[] args) throws Exception { 
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// read a CSV file with five fields, taking only two of them 
DataSet> csvInput = env.readCsvFile("c:/CSV/file") 
.includeFields("10010") // take the first and the fourth field 
.types(String.class, Double.class); 

//register and create table 
EnvironmentSettings settings = EnvironmentSettings 
.newInstance() 
//.inStreamingMode() 
.inBatchMode() 
.build(); 
TableEnvironment tEnv = TableEnvironment.create(settings); 
//Insert CSV content into table, define column names and read some rows from it 
} 
} 
What to do create table, insert DataSet csvInput into table and read some rows 
from it (into text file)? 
Thanks for help 
Mike 



Re: ContinuousFileMonitoringFunction retrieved invalid state.

2022-06-30 Thread yuxia
I'm not sure why it happened. But from the Flink source code, it seems try to 
restore from an invalid state. Seems the state actually contains more that one 
value, but Flink expected the state should contains one or zero value. 

Best regards, 
Yuxia 


发件人: "Vishal Surana"  
收件人: "User"  
发送时间: 星期五, 2022年 7 月 01日 上午 5:28:07 
主题: ContinuousFileMonitoringFunction retrieved invalid state. 

My job is unable to restore state after savepoint due to the following 
exception. Seems to be a rare exception as I haven't found any forum discussing 
it. Please advise. 

java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction retrieved 
invalid state. 
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:167)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) 
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] 
at java.lang.Thread.run(Thread.java:829) ~[?:?] 

-- 
Regards, 
Vishal 



Re: StreamingFileSink & checkpoint tuning

2022-06-30 Thread yuxia
Streaming file sink will write to s3 when processing element. But it's just 
temporary file. Only after one successful checkpoint (more exactly, once 
recieve a notification for successful checkpoint), will it commit these 
temporary files written since last successful checkpoint . 

Best regards, 
Yuxia 


发件人: "Xin Ma"  
收件人: "User"  
发送时间: 星期四, 2022年 6 月 30日 下午 11:05:51 
主题: StreamingFileSink & checkpoint tuning 

Hi, 

I recently encountered an issue while using StreamingFileSink. 
I have a flink job consuming records from various sources and write to s3 with 
streaming file sink. But the job sometimes fails due to checkpoint timeout, and 
the root cause is checkpoint alignment failure as there is data skewness 
between different data sources. 

I don't want to enable unaligned checkpointing but prefer to do some checkpoint 
tuning first. 

My current checkpoint interval is 1 min and timeout is also 1 min. I wanna 
increase tolerable checkpoint failure number to 5, as I believe the unaligned 
subtasks will definitely update their watermark in 5 minutes. My question is, 
will streaming file sink still writes to s3 even if the checkpoint fails or 
just wait until next successful checkpoint? (as if we don't tolerate checkpoint 
failure, the job will simply restart from last successful checkpoint) 


Thanks. 

Best, 
Kevin 



Re: The methodlogy behind the join in Table API and Datastream

2022-06-29 Thread yuxia
> any way I can both receive the message of both update. 
I think you may need outer join[1] 

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#outer-equi-join
 

Best regards, 
Yuxia 


发件人: "lan tran"  
收件人: "User"  
发送时间: 星期三, 2022年 6 月 29日 下午 6:04:30 
主题: The methodlogy behind the join in Table API and Datastream 



Hi team, 

I have the question about the methodology behind the joining using SQL-Client 
and DataStream. 

I have some scenario like this: I have two tables: t1 and t2 and I consume the 
WAL log from it and send to Kafka. Next, I will join two tables above together 
and convert this table in changelog stream. Therefore, if one of the tables is 
updated, there will be the messages. 

This is how it works if I use the SQL-Client to join two tables together. 
However, according to the doc since DataStream runs behind the background of 
Table API, I wonder what it will be looked like if I use DataStream instead of 
Table API. 

In Datastream API, I currently using connect to join two stream. And convert t2 
into broadcast Stream and t1 as the main stream. When I update the t1 -> there 
is the output of the updated record but when I update t2, there is no update 
for the broadcast state (even though it update in state). Therefore, is there 
any way I can both receive the message of both update ? Do I have to save state 
for the t1 (main stream) or I have to change the way I joined ? 

Best, 
Quynh 





Sent from [ https://go.microsoft.com/fwlink/?LinkId=550986 | Mail ] for Windows 





Re: Overhead on managing Timers with large number of keys

2022-06-29 Thread yuxia
The short answer is yes. In any case, flink wil spend time/cpu to invoke the 
timer. 

Best regards, 
Yuxia 


发件人: "Surendra Lalwani"  
收件人: "User"  
发送时间: 星期三, 2022年 6 月 29日 下午 3:52:32 
主题: Overhead on managing Timers with large number of keys 

Hi Team, 
I am working on the States and using KeyedStreams and Process Functions. I want 
to store the number of customers in the state and also I am registering onTimer 
for all the customer_ids. I wanted to understand if we register something 
around 70 Million Keys and we have onTimer registered for those keys. Will 
there be any overhead and if yes what will be the overhead, will it impact 
processing time? 

Thanks and Regards , 
Surendra Lalwani 



IMPORTANT NOTICE: This e-mail, including any attachments, may contain 
confidential information and is intended only for the addressee(s) named above. 
If you are not the intended recipient(s), you should not disseminate, 
distribute, or copy this e-mail. Please notify the sender by reply e-mail 
immediately if you have received this e-mail in error and permanently delete 
all copies of the original message from your system. E-mail transmission cannot 
be guaranteed to be secure as it could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses. Company accepts no 
liability for any damage or loss of confidential information caused by this 
email or due to any virus transmitted by this email or otherwise. 


Re: How to make current application cdc

2022-06-26 Thread yuxia
> I mean CDC should be handled on the Kafka side. 
What do you mean about that? Do you mean the the Kafka should store the message 
with the cdc format like debezium[1], Canal[2], MaxWell[3], OGG[4]? 

> Or should I need to use Table API 

I'm afraid not. Seems you can still use Flink Datastream API as Table API makes 
no difference for your case. 

BTW, you can try flink cdc [5] 

[1] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/debezium/
 ] 
[2] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/canal/
 ] 
[3] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/maxwell/
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/maxwell/
 ] 
[4] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/ogg/
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/ogg/
 ] 
[5] [ https://ververica.github.io/flink-cdc-connectors/ | 
https://ververica.github.io/flink-cdc-connectors/ ] 


Best regards, 
Yuxia 


发件人: "Sid"  
收件人: "User"  
发送时间: 星期六, 2022年 6 月 25日 下午 6:32:22 
主题: How to make current application cdc 

Hello, 

I have a current flow where the data from the Flink-Kafka connector is captured 
and processed using Flink Datastream API and stored in Kafka topics. However, I 
would like to make it CDC enabled. I went through an article where it was 
mentioned that it should be handled on the Kafka side while capturing the data. 
I mean CDC should be handled on the Kafka side. Or should I need to use Table 
API? 
So, any ideas/links are much appreciated as I am trying to understand these 
concepts. 

TIA, 
Sid 



Re: Apache Flink - Reading data from Scylla DB

2022-06-13 Thread yuxia
Seems you may need implement a custom connector for Scylla DB as I haven't 
found a connector on hand. 
Hope the doc[1][2] can help you implement your own connector. 
[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
 
[2] https://flink.apache.org/2021/09/07/connector-table-sql-api-part1.html 

Best regards, 
Yuxia 


发件人: "Himanshu Sareen"  
收件人: "User"  
发送时间: 星期二, 2022年 6 月 14日 上午 11:29:38 
主题: Apache Flink - Reading data from Scylla DB 

Team, 

I'm looking for a solution to Consume/Read data from Scylla DB into Apache 
Flink. 

If anyone can guide me or share pointers it will be helpful. 

Regards, 
Himanshu 



Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.

2022-06-08 Thread yuxia
Have you ever unzip your project jar and make sure the class HiveParserFactory 
exist? 
Best regards, 
Yuxia 


发件人: "顾斌杰"  
收件人: luoyu...@alumni.sjtu.edu.cn 
抄送: "User"  
发送时间: 星期三, 2022年 6 月 08日 下午 5:11:33 
主题: Re: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 

can refer to this: 
[ 
https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto
 | 
https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto
 ] 

On 6/8/2022 16:04 , [ mailto:luoyu...@alumni.sjtu.edu.cn | 
yuxia ] wrote: 



Have you ever put the flink-sql-connector-hive into you FLINK_HOME/lib? 
And make sure your JM/TM also contains the jar. 

Best regards, 
Yuxia 


发件人: "顾斌杰"  
收件人: "User"  
发送时间: 星期三, 2022年 6 月 08日 下午 3:19:19 
主题: Re: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 


The following is part of the code : 

String createKafkaSql = "create table if not exists x" + 
"(x\n" + 
",update_time timestamp(3) comment '11'\n" + 
",watermark for update_time as update_time - interval '20' second)\n" + 
"with ('connector' = 'kafka'\n" + 
",'topic' = '" + topic + "'\n" + 
",'properties.bootstrap.servers' = '" + bootstrapServers + "'\n" + 
",'properties.group.id' = 'flink_sql_tyc_company_info'\n" + 
",'scan.startup.mode' = 'earliest-offset'\n" + 
",'format' = 'json','json.fail-on-missing-field' = 
'false','json.ignore-parse-errors' = 'true')"; 
tEnv.executeSql(createKafkaSql); 


tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); 
String CreateHiveSql = "create table if not exists " + 
"()\n" + 
"partitioned by (op_day string comment '111')\n" + 
"stored as orc\n" + 
"tblproperties('partition.time-extractor.timestamp-pattern'='$op_day'\n" + 
",'sink.partition-commit.trigger'='partition-time'\n" + 
",'sink.partition-commit.delay'='1h'\n" + 
",'sink.partition-commit.policy.kind'='metastore,success-file')"; 
tEnv.executeSql(CreateHiveSql); 


tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); 
String insert = "insert into x\n" + 
"select `x" + 
",date_format(update_time,'-MM-dd')\n" + 
"from x"; 
tEnv.executeSql(insert); 
On 6/8/2022 15:14 , [ mailto:binjie...@paat.com | 顾斌杰 ] 
wrote: 

BQ_BEGIN



Flink version: 1.13 





When executed in the local environment (windows), there is no exception. 


When starting the project with flink web ui, I get the following error: 





Server Response: 
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application. 
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
 
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application. 
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
 ... 7 more 

Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute 
application. 
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
 
at 
org.apache.flink.client.

Re: Could not find a suitable table factory for 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.

2022-06-08 Thread yuxia
Have you ever put the flink-sql-connector-hive into you FLINK_HOME/lib? 
And make sure your JM/TM also contains the jar. 

Best regards, 
Yuxia 


发件人: "顾斌杰"  
收件人: "User"  
发送时间: 星期三, 2022年 6 月 08日 下午 3:19:19 
主题: Re: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 


The following is part of the code : 

String createKafkaSql = "create table if not exists x" + 
"(x\n" + 
",update_time timestamp(3) comment '11'\n" + 
",watermark for update_time as update_time - interval '20' second)\n" + 
"with ('connector' = 'kafka'\n" + 
",'topic' = '" + topic + "'\n" + 
",'properties.bootstrap.servers' = '" + bootstrapServers + "'\n" + 
",'properties.group.id' = 'flink_sql_tyc_company_info'\n" + 
",'scan.startup.mode' = 'earliest-offset'\n" + 
",'format' = 'json','json.fail-on-missing-field' = 
'false','json.ignore-parse-errors' = 'true')"; 
tEnv.executeSql(createKafkaSql); 


tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); 
String CreateHiveSql = "create table if not exists " + 
"()\n" + 
"partitioned by (op_day string comment '111')\n" + 
"stored as orc\n" + 
"tblproperties('partition.time-extractor.timestamp-pattern'='$op_day'\n" + 
",'sink.partition-commit.trigger'='partition-time'\n" + 
",'sink.partition-commit.delay'='1h'\n" + 
",'sink.partition-commit.policy.kind'='metastore,success-file')"; 
tEnv.executeSql(CreateHiveSql); 


tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); 
String insert = "insert into x\n" + 
"select `x" + 
",date_format(update_time,'-MM-dd')\n" + 
"from x"; 
tEnv.executeSql(insert); 
On 6/8/2022 15:14 , [ mailto:binjie...@paat.com | 顾斌杰 ] 
wrote: 





Flink version: 1.13 





When executed in the local environment (windows), there is no exception. 


When starting the project with flink web ui, I get the following error: 





Server Response: 
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application. 
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
 
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822) 
at 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
 
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
 
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 

Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application. 
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
 
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
 ... 7 more 

Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute 
application. 
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
 
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
 
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
 
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 ... 7 more 

Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Could not find a suitable table factory for 
'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath. 
Reason: Required context properties mismatch. The following properties are 
requested: table.sql-dialect=hive The following factories have been considered: 
org.apache.flink.table.planner.delegation.Defau

Re: slack invite link

2022-06-05 Thread yuxia
I have send the invitation to the email address shmily...@gmail.com. Please 
check you email! Look forward your joining. 

Best regards, 
Yuxia 


发件人: "shmily"  
收件人: "User"  
发送时间: 星期日, 2022年 6 月 05日 下午 4:55:11 
主题: slack invite link 

hi, 
can someone please send me a slack invite link, the one provided by the 
community has expired~ 

many thanks! 



Re: Can we resume a job from a savepoint from Java api?

2022-06-01 Thread yuxia
Hope the unit test 
SavepointITCase#testCanRestoreWithModifiedStatelessOperators[1] in Flink repo 
can help you. 


[1] [ 
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java#L1228
 | 
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java#L1228
 ] 

Best regards, 
Yuxia 


发件人: "Qing Lim"  
收件人: "User"  
发送时间: 星期三, 2022年 6 月 01日 下午 7:46:59 
主题: Can we resume a job from a savepoint from Java api? 



Hi, is it possible to resume a job from a savepoint in Java code? 



I wish to test failure recovery in my test code, I am thinking to simulate 
failure recovery by saving state to a save point and the recover from it, is 
this possible with local MiniCluster setup? 



Kind regards 




This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession. 


To find out more details about how we may collect, use and share your personal 
information, please see [ https://www.mwam.com/privacy-policy | 
https://www.mwam.com/privacy-policy ] . This includes details of how calls you 
make to us may be recorded in order for us to comply with our legal and 
regulatory obligations. 


To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email. 


Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. ("MWNA"), which is registered with the US 
Securities and Exchange Commission ("SEC") as an investment adviser. 
Registration with the SEC does not imply that MWNA or its employees possess a 
certain level of skill or training. 


Re: Status of File Sink Common (flink-file-sink-common)

2022-05-30 Thread yuxia
I'm afraid not. I can still find it in main repository[1].
[1] 
https://github.com/apache/flink/tree/master/flink-connectors/flink-file-sink-common

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jun Qin" 
收件人: "User" 
发送时间: 星期二, 2022年 5 月 31日 上午 5:24:10
主题: Status of File Sink Common (flink-file-sink-common) 

Hi,  

Has File Sink Common (flink-file-sink-common) been dropped? If so, since which 
version? I do not seem to find anything related in the release notes of 1.13.x, 
1.14.x and 1.15.0.

Thanks
Jun


Re: Large backpressure and slow checkpoints in StateFun

2022-05-30 Thread yuxia
May be you can use jstack or flame graph to analyze what's the bottleneck. 
BTW, about generating flame graph, arthas[1] is a good tool. 

[1] https://github.com/alibaba/arthas 

Best regards, 
Yuxia 


发件人: "Christopher Gustafson"  
收件人: "User"  
发送时间: 星期一, 2022年 5 月 30日 下午 2:29:19 
主题: Large backpressure and slow checkpoints in StateFun 



Hi, 




I am running some benchmarks using StateFun and have encountered a problem with 
backpressure and slow checkpoints that I can't figure out the reason for, and 
was hoping that someone might have an idea of what is causing it. My setup is 
the following: 



I am running the Shopping Cart application from the StateFun playground. The 
job is submitted as an uber jar to an existing Flink Cluster with 3 
TaskManagers and 1 JobManager. The functions are served using the Undertow 
example from the documentation and I am using Kafka ingresses and egresses. My 
workload is only at 1000 events/s. Everything is run in separate GCP VMs. 




The issue is with very long checkpoints, which I assume is caused by a 
backpressured ingress caused by the function dispatcher operator not being able 
to handle the workload. The only thing that has helped so far is to increase 
the parallelism of the job, but it feels like the still is some other 
bottleneck that is causing the issues. I have seen other benchmarks reaching 
much higher throughput than 1000 events/s, without more CPU or memory resources 
than I am using. 




Any ideas of bottlenecks or ways to figure them out are greatly appreciated. 




Best Regards, 

Christopher Gustafson 



Re: Exception when running Java UDF with Blink table planner

2022-05-26 Thread yuxia
It seems an exception thrown when Flink try to deserialize the object outputed 
by your udf. So is the obejct produced by your udf serializable? Does it 
contain any lambda function in the object/class? 

Best regards, 
Yuxia 


发件人: "Tom Thornton"  
收件人: "User"  
发送时间: 星期五, 2022年 5 月 27日 上午 6:47:04 
主题: Exception when running Java UDF with Blink table planner 

We are migrating from the legacy table planner to the Blink table planner. 
Previously we had a UDF defined like this that worked without issue: 
public class ListToString extends DPScalarFunction { 
public String eval (List list) { 
return "foo" ; 
} 
Since moving to the Blink table planner and receiving this error: 

Caused by: org.apache.flink.table.api.ValidationException: Given parameters of 
function 'ListToString' do not match any signature. 
Actual: (java.lang.String[]) 
Expected: (java.util.List) 

We refactored the UDF to take as input an Object[] to match what is received 
from Blink: 
public class ListToString extends DPScalarFunction { 
public String eval (Object[] arr) { return "foo" ; 
} 
} 
Now the UDF always fails (including for the simplified example above where we 
return a constant string regardless of input). For example, when we run on a 
query like this one: 
SELECT ListToString(`col1`) as col1_string FROM `table` 
Produces an IndexOutOfBoundsException: 
Caused by: java.lang.IndexOutOfBoundsException: Index 115 out of bounds for 
length 0 
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) 
at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
 
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) 
at java.base/java.util.Objects.checkIndex(Objects.java:372) 
at java.base/java.util.ArrayList.get(ArrayList.java:459) 
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
 
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) 
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) 
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
 
at 
org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:570)
 
at 
org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
 
at 
org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:700)
 
at 
org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:683)
 
at 
org.apache.flink.table.data.util.DataFormatConverters.arrayDataToJavaArray(DataFormatConverters.java:1175)
 
at 
org.apache.flink.table.data.util.DataFormatConverters.access$200(DataFormatConverters.java:104)
 
at 
org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1128)
 
at 
org.apache.flink.table.data.util.DataFormatConverters$ObjectArrayConverter.toExternalImpl(DataFormatConverters.java:1070)
 
at 
org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:406)
 
at StreamExecCalc$337.processElement(Unknown Source) 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 
at SourceConversion$328.processElement(Unknown Source) 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:757)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:732)
 
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:712)
 
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 
at 
org.apache.fl

Re: length value for some classes extending LogicalType.

2022-05-25 Thread yuxia
IMO, the behaviors depends on how you convert your string data from extern 
system to Flink's intern data or, conversely. 

I think it's more like a hint to tell how to convert the string data between 
extern system including source and sink. 


Best regards, 
Yuxia 


发件人: "Krzysztof Chmielewski"  
收件人: "User"  
发送时间: 星期三, 2022年 5 月 25日 下午 5:29:10 
主题: length value for some classes extending LogicalType. 

Hi, 
some classes extending LogicalType.java such as VarCharType, BinaryType, 
CharType and few others have an optional argument "length". If not specified, 
length is set to default value which is 1. 

I would like to ask, what are the implications of that? What can happen if I 
use the default length value 1 but the actual length of the data will be bigger 
than 1? 

For example: 
RowType.of("col1", new CharType()) <- this will use default length value 1. 

Regards, 
Krzysztof Chmielewski 



Re: OutputTag alternative with pyflink 1.15.0

2022-05-23 Thread yuxia
Yes, you're right. 

Hopefully, the master branch supported it [1]. But It haven't been released. If 
you want to use output tag in python in 1.15, you can apply this patch[1] to 
your Flink 1.15 and build it by yourself[3]. 
BTW, if you don't want to bother to build. You can use java/scala api. 


[1] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/
 ] 
[2] [ https://github.com/apache/flink/pull/19453 | 
https://github.com/apache/flink/pull/19453 ] 
[3] [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/ | 
https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/ ] 

Best regards, 
Yuxia 


发件人: "Lakshya Garg"  
收件人: "User"  
发送时间: 星期一, 2022年 5 月 23日 下午 12:02:26 
主题: OutputTag alternative with pyflink 1.15.0 

Hi Everyone, 
I see that in pyflink 1.15.0 there isnt support for Output tag to redirect the 
messages to other output streams. Is this understanding right? 
If yes, What can be the alternative for this? any example or reference link 
would be helpful. 

Lakshya 



Re: Incorrect checkpoint id used when job is recovering

2022-05-19 Thread yuxia
There's a simliar issue FLINK-19816[1] 

[1] [ https://issues.apache.org/jira/browse/FLINK-19816 | 
https://issues.apache.org/jira/browse/FLINK-19816 ] 

Best regards, 
Yuxia 


发件人: "tao xiao"  
收件人: "User"  
发送时间: 星期四, 2022年 5 月 19日 下午 9:16:34 
主题: Re: Incorrect checkpoint id used when job is recovering 

Hi team, 

Can anyone shed some light? 

On Sat, May 14, 2022 at 8:56 AM tao xiao < [ mailto:xiaotao...@gmail.com | 
xiaotao...@gmail.com ] > wrote: 



Hi team, 

Does anyone have any ideas? 

On Thu, May 12, 2022 at 9:20 PM tao xiao < [ mailto:xiaotao...@gmail.com | 
xiaotao...@gmail.com ] > wrote: 

BQ_BEGIN

Forgot to mention the Flink version is 1.13.2 and we use kubernetes native mode 

On Thu, May 12, 2022 at 9:18 PM tao xiao < [ mailto:xiaotao...@gmail.com | 
xiaotao...@gmail.com ] > wrote: 

BQ_BEGIN

Hi team, 
I met a weird issue when a job tries to recover from JM failure. The success 
checkpoint before JM crashed is 41205 

``` 
{"log":"2022-05-10 14:55:40,663 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed 
checkpoint 41205 for job  (9453840 bytes in 
1922 ms).\n","stream":"stdout","time":"2022-05-10T14:55:40.663286893Z"} 
``` 
However JM tries to recover the job with an old checkpoint 41051 which doesn't 
exist that leads to unrecoverable state 

``` 
"2022-05-10 14:59:38,949 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying 
to retrieve checkpoint 41051.\n" 
``` 

Full log attached 

-- 
Regards, 
Tao 





-- 
Regards, 
Tao 

BQ_END



-- 
Regards, 
Tao 

BQ_END



-- 
Regards, 
Tao 



Re: Incompatible data types while using firehose sink

2022-05-12 Thread yuxia
Firehose implements sink2 which is introduced in Flink 1.15. But the method 
inputStream#sinkTo(xxx) only accepts sink1 in Flink 1.13. 

If you still want to use Firehose in Flink 1.13, I guess you may need to 
implement a SinkV2Adapter Or to t ranslates Sink V2 into Sink V1 like 
SinkV1Adapter in Flink 1.15 or rewrite some code of Firehose connector to 
migrate it to sink1. 

Best regards, 
Yuxia 


发件人: "Zain Haider Nemati"  
收件人: "Martijn Visser"  
抄送: "yu'an huang" , "User"  
发送时间: 星期四, 2022年 5 月 12日 下午 3:36:46 
主题: Re: Incompatible data types while using firehose sink 

Hi, Appreciate your response. 
My flink version is 1.13. 
Is there any other way to sink data to kinesis without having to update to 1.15 

On Thu, May 12, 2022 at 12:25 PM Martijn Visser < [ 
mailto:martijnvis...@apache.org | martijnvis...@apache.org ] > wrote: 



I'm guessing this must be Flink 1.15 since Firehose was added in that version 
:) 

On Thu, 12 May 2022 at 08:41, yu'an huang < [ mailto:h.yuan...@gmail.com | 
h.yuan...@gmail.com ] > wrote: 

BQ_BEGIN

Hi, 

Your code is working fine in my computer. What is the Flink version you are 
using. 





BQ_BEGIN

On 12 May 2022, at 3:39 AM, Zain Haider Nemati < [ 
mailto:zain.hai...@retailo.co | zain.hai...@retailo.co ] > wrote: 

Hi Folks, 
Getting this error when sinking data to a firehosesink, would really appreciate 
some help ! 

DataStream inputStream = env.addSource(new FlinkKafkaConsumer<>("xxx", 
new SimpleStringSchema(), properties)); 

Properties sinkProperties = new Properties(); 
sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx"); 
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "xxx"); 
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "xxx"); 
KinesisFirehoseSink kdfSink = KinesisFirehoseSink.builder() 
.setFirehoseClientProperties(sinkProperties) 
.setSerializationSchema(new SimpleStringSchema()) 
.setDeliveryStreamName("xxx") 
.setMaxBatchSize(350) 
.build(); 



inputStream.sinkTo(kdfSink); 

incompatible types: 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSink 
cannot be converted to 
org.apache.flink.api.connector.sink.Sink 





BQ_END


BQ_END




Re: http stream as input data source

2022-05-12 Thread yuxia
The quick answer is no. 
There's no http data stream on hand. 
You can implement one by yourself. Here[1] is a guidance about how to implemet 
user-defined source & sink 

Btw, there's a jira for http sink[2] but is marked as won't fix. 

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/
 
[2]: https://issues.apache.org/jira/browse/FLINK-8047. 

Best regards, 
Yuxia 


发件人: "Harald Busch"  
收件人: "User"  
发送时间: 星期四, 2022年 5 月 12日 上午 11:37:29 
主题: http stream as input data source 

Hi, 
is there a http data stream as data source ? 
I only see socketTextStream and other predefined stream sources. 
It seems that I have to use fromCollection, fromElements ... and prepare the 
collection for myself. 
Thanks 
Regards 


Re: unsubscribe

2022-05-11 Thread yuxia
To unsubscribe, you can send email to user-unsubscr...@flink.apache.org with 
any object. 

Best regards, 
Yuxia 


发件人: "Henry Cai"  
收件人: "User"  
发送时间: 星期四, 2022年 5 月 12日 上午 1:14:43 
主题: unsubscribe 

unsubscribe 



Re: How can I set job parameter in flink sql

2022-05-11 Thread yuxia
Hi, AFAK, you can't get the parameter setted via Flink SQL client in udf. 

If you still want to get the parameters in your udf, you can use the following 
code to set the parameter: 

env = StreamExecutionEnvironment.getExecutionEnvironment 
parameter = new HashMap(); 
parameter .put(" black_list_path ", "") 
env.getConfig.setGlobalJobParameters(Configuration.fromMap(m)) 

Then, you can get the parameter using 
context.getJobParameter("black_list_path", "/config/list.properties"); in udf. 

Best regards, 
Yuxia 


发件人: "wang" <24248...@163.com> 
收件人: "User" , "user-zh"  
发送时间: 星期三, 2022年 5 月 11日 下午 2:44:20 
主题: How can I set job parameter in flink sql 

Hi dear engineer, 

I want to override the function open() in my UDF, like: 


public class BlackListConvertFunction extends ScalarFunction { 

@Override 
public void open(FunctionContext context) throws Exception { 
String path = context.getJobParameter("black_list_path", 
"/config/list.properties"); 
System.out.println(path); 
} 

public Double eval(String scores) { 
// some logics 
return 0.0; 
} 
} 





In open() function, I want to fetch the configred value "black_list_path", then 
simply print that value out. And I config this value in ./sql-client.sh 
console: 

SET black_list_path = /root/list.properties 

Then I run this UDF, but what printed is /config/list.properties (this is the 
default value as I set in context.getJobParameter("black_list_path", " 
/config/list/properties ")) , not /root/list.properties which I set in 
./sql-client.sh console. 

So could you please show me the correct way to set black_list_path is sql ? 
Thanks so much! 


Thanks && Reards, 
Hunk 








Re: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder

2022-05-08 Thread yuxia
Not exactly, the flink distribution just doesn't include the scala api by 
default. 

For using scala, you can pack in your job jar both flink-table-api-scala and 
flink-table-api-scala-bridge and keep the flink distribution structure, which 
mean flink-table-planner_2.12-1.15.0.jar is under opt folder and 
flink-table-planner-loader is under /lib. in such way, you can choose 
whaterever scala version you prefer. 

Also, you can swap flink-table-planner_2.12-1.15.0.jar and 
flink-table-planner-loader. But with such way, you are bind on Flink's 2.12. 
scala version, 


Best regards, 
Yuxia 


发件人: "Jeff Zhang"  
收件人: "yuxia"  
抄送: "User"  
发送时间: 星期日, 2022年 5 月 08日 下午 3:50:33 
主题: Re: Unable to start sql-client when putting 
flink-table-planner_2.12-1.15.0.jar to lib folder 

Thanks Yuxia, that works. Does that mean for one flink distribution, I can 
either use java or use scala ? If so, it seems not user friendly. 



On Sun, May 8, 2022 at 10:40 AM yuxia < [ mailto:luoyu...@alumni.sjtu.edu.cn | 
luoyu...@alumni.sjtu.edu.cn ] > wrote: 



Hi, you can move the flink-table-planner-loader to the /opt. See more in [ 
https://issues.apache.org/jira/browse/FLINK-25128 | 
https://issues.apache.org/jira/browse/FLINK-25128 ] 


Best regards, 
Yuxia 


发件人: "Jeff Zhang" < [ mailto:zjf...@gmail.com | zjf...@gmail.com ] > 
收件人: "User" < [ mailto:user@flink.apache.org | user@flink.apache.org ] > 
发送时间: 星期六, 2022年 5 月 07日 下午 10:05:55 
主题: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar 
to lib folder 

Hi folks, 
It looks like flink 1.15 changes its binary distribution because of scala free. 
The flink-table-planner_2.12-1.15.0.jar is put under the opt folder. Now I 
would like to use it for my scala flink app, so I move it to the lib folder, 
but after that, I can not start sql-client. Is it expected ? Here's the error I 
see 

-
 
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue. 
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) 
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
Caused by: org.apache.flink.table.api.TableException: Could not instantiate the 
executor. Make sure a planner module is on the classpath 
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:163)
 
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:111)
 
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.(ExecutionContext.java:66)
 
at 
org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:247)
 
at 
org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
 
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
 
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88) 
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
... 1 more 
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
for identifier 'default' that implement 
'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath. 

Ambiguous factory classes are: 

org.apache.flink.table.planner.delegation.DefaultExecutorFactory 
org.apache.flink.table.planner.loader.DelegateExecutorFactory 
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553)
 
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:154)
 
... 8 more 



-- 
Best Regards 

Jeff Zhang 






-- 
Best Regards 

Jeff Zhang 



Re: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar to lib folder

2022-05-07 Thread yuxia
Hi, you can move the flink-table-planner-loader to the /opt. See more in [ 
https://issues.apache.org/jira/browse/FLINK-25128 | 
https://issues.apache.org/jira/browse/FLINK-25128 ] 


Best regards, 
Yuxia 


发件人: "Jeff Zhang"  
收件人: "User"  
发送时间: 星期六, 2022年 5 月 07日 下午 10:05:55 
主题: Unable to start sql-client when putting flink-table-planner_2.12-1.15.0.jar 
to lib folder 

Hi folks, 
It looks like flink 1.15 changes its binary distribution because of scala free. 
The flink-table-planner_2.12-1.15.0.jar is put under the opt folder. Now I 
would like to use it for my scala flink app, so I move it to the lib folder, 
but after that, I can not start sql-client. Is it expected ? Here's the error I 
see 

-
 
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue. 
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201) 
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
Caused by: org.apache.flink.table.api.TableException: Could not instantiate the 
executor. Make sure a planner module is on the classpath 
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:163)
 
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:111)
 
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.(ExecutionContext.java:66)
 
at 
org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:247)
 
at 
org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
 
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
 
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88) 
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
... 1 more 
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
for identifier 'default' that implement 
'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath. 

Ambiguous factory classes are: 

org.apache.flink.table.planner.delegation.DefaultExecutorFactory 
org.apache.flink.table.planner.loader.DelegateExecutorFactory 
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553)
 
at 
org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:154)
 
... 8 more 



-- 
Best Regards 

Jeff Zhang 



Re: How to return JSON Object from UDF

2022-05-06 Thread yuxia
Does the DatatypeHint with bridgedTo can meet your requirements? 
For example: 
' 
public @DataTypeHint( 
value = "RAW", 
bridgedTo = JSONObject .class, 
rawSerializer = JSONObject Serializer.class) JSONObject eval(String str) { 
return JSONObject .parse(str); 
} 
' 
You may need to provide a class like JSONObject Serializer that extends 
TypeSerializerSingleton. 


Best regards, 
Yuxia 


发件人: "Surendra Lalwani"  
收件人: "User"  
发送时间: 星期五, 2022年 5 月 06日 下午 4:40:19 
主题: How to return JSON Object from UDF 

Hi Team, 

I am using Flink 1.13.6 and I have created a UDF and I want to return 
JSONObject from that UDF or basically an Object but it doesn't seems to work as 
there is no datatype hint compatible to Object. in earlier flink versions when 
DataTypeHint wasn't there, it used to work. Any help would be appreciated. 

Thanks and Regards , 
Surendra Lalwani 



IMPORTANT NOTICE: This e-mail, including any attachments, may contain 
confidential information and is intended only for the addressee(s) named above. 
If you are not the intended recipient(s), you should not disseminate, 
distribute, or copy this e-mail. Please notify the sender by reply e-mail 
immediately if you have received this e-mail in error and permanently delete 
all copies of the original message from your system. E-mail transmission cannot 
be guaranteed to be secure as it could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses. Company accepts no 
liability for any damage or loss of confidential information caused by this 
email or due to any virus transmitted by this email or otherwise.