Do we have date_sub function in flink sql?

2021-08-25 Thread 1095193...@qq.com
Hi
   I want to  substract 1 day from current date with Flink sql. Do we have this 
function like  date_sub()?



1095193...@qq.com


How to use 'CREATE FUNCTION' statements to create parameterize functions?

2021-08-03 Thread 1095193...@qq.com
Hi community,
For parameterize function, like 
public static class SubstringFunction extends ScalarFunction {
 
  private boolean endInclusive;
 
  public SubstringFunction(boolean endInclusive) {
this.endInclusive = endInclusive;
  }
 
  public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, endInclusive ? end + 1 : end);
  }
}
we can register this function by pass function instance instead of function 
classes.
env.createTemporarySystemFunction("SubstringFunction", new 
SubstringFunction(true));
How to register or create this parameterize  function with 'CREATE FUNTION' 
statements.
With standard 'CREATE FUNCTION' statement in Flink doc.[1]
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 
  [IF NOT EXISTS] [catalog_name.][db_name.]function_name 
  AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
We can only pass function_name in CREATE FUNCTION statement, is there way to 
pass function_instance ( parameterize function ) in this statement. 

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-function
 



1095193...@qq.com


Ho to convert flat json to nested complex json in Flink sql?

2021-07-09 Thread 1095193...@qq.com
Hi community,
I'll receive json message from Kafka, convert flat json to nested json and send 
it back to Kafka.
receive message from Kafka: {“id”:"001","name":"wang"}
send message back to Kafka:  {"employee":{“id”:"001","name":"wang"}} 
How to do it in Flink sql?



1095193...@qq.com


回复: Re: How to convert local to iso 8601 time in flink sql?

2021-06-28 Thread 1095193...@qq.com
Hi Xu
 I use Elasticsearch as Sink. If a timestamp field is sent into Elasticsearch 
without any timezone information, then it will be assumed to be UTC time 
(Coordinated Universal Time)[1]. 

[1] 
https://www.elastic.co/blog/converting-local-time-to-iso-8601-time-in-elasticsearch


1095193...@qq.com
 
发件人: Leonard Xu
发送时间: 2021-06-29 10:49
收件人: 1095193290
抄送: user
主题: Re: How to convert local to iso 8601 time in flink sql?
Hi,

Unfortunately Flink SQL doesn’t support TIMESTAMP WITH TIME ZONE type yet[1], 
maybe the you can try write an UDF to convert the timestamp '2021-06-29 
09:00:00’ field to String(the string representation like 
'2021-06-29T09:00:00+08:00’).

And could you share your scenario about using TIMESTAMP WITH TIME ZONE type?

[1]https://issues.apache.org/jira/browse/FLINK-20869
Best,
Leonard

在 2021年6月29日,09:56,1095193...@qq.com 写道:

Hi community,
Now I have a timestamp field with format '-MM-dd HH:mm:ss', such as 
'2021-06-29 09:00:00'. How to convert this field to iso 8601  time with offset 
, such as '2021-06-29T09:00:00+08:00'? 
Thanks.



1095193...@qq.com



How to convert local to iso 8601 time in flink sql?

2021-06-28 Thread 1095193...@qq.com
Hi community,
Now I have a timestamp field with format '-MM-dd HH:mm:ss', such as 
'2021-06-29 09:00:00'. How to convert this field to iso 8601  time with offset 
, such as '2021-06-29T09:00:00+08:00'? 
Thanks.



1095193...@qq.com


Re: Re: Flink sql regular join not working as expect.

2021-06-04 Thread 1095193...@qq.com
Thanks @JING ZHANG @Yun Gao. I will use processing time temporal join. BTW, 
would we support read changelog for JDBC source when it works as right stream 
of a regular join in future?



1095193...@qq.com
 
From: JING ZHANG
Date: 2021-06-04 18:32
To: Yun Gao
CC: 1095193...@qq.com; user
Subject: Re: Flink sql regular join not working as expect.
Hi,
JDBC source only does a snapshot and sends all datas in the snapshot to 
downstream when it works as a right stream of a regular join, it could not 
produce a changlog stream.
After you update the field 'target'  from '56.32.15.55:8080' to 
'56.32.15.54:8080', JDBC source would not send new data to downstream.

You could try to use Upsert kafka [1] as right side of the regular join and set 
`source` as primary key.

BTW, if use Processing TIme Temporal Join[2] in your case, you could always 
join the latest version of dimension table, but updates on dimension table 
would not trigger join because it only waits for look up by keys.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins

Best regards,
JING ZHANG


Yun Gao  于2021年6月4日周五 下午5:07写道:
Hi, 

I'm not the expert for the table/sql, but it seems to me that for regular 
joins, Flink would not re-read the dimension 
table after it has read it fully for the first time. If you want to always join 
the records with the latest version of 
dimension table, you may need to use the temporal joins [1]. 

Best,
Yun


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#temporal-joins


--
From:1095193...@qq.com <1095193...@qq.com>
Send Time:2021 Jun. 4 (Fri.) 16:45
To:user 
Subject:Flink sql regular join not working as expect.

Hi
   I am working on joining a Kafka stream with a Postgres Dimension table.  
Accoring to: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
 "Regular joins are the most generic type of join in which any new record, or 
changes to either side of the join, are visible and affect the entirety of the 
join result."
   However, in my test, change record in dimenstion table will not affect the 
result of the join.  My test steps:
   1. create Kafka table sql 
  CREATE TABLE test1 (  source String )  WITH (  'connector' = 'kafka',   
'topic' = 'test' ...)
   2.create dimension table sql 
 CREATE TABLE test2 (source String, target String)  WITH  ( 'connector' 
= 'jdbc'... )
 Prepared 1 record in dimenion table:
 source  |   target
  172.16.1.109:8080   | 56.32.15.55:8080 
   3. regular join sql
   select test1.source, test2.target from test1 join test2 on test1.source 
= test2.source
   4. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink could output result as expect:  +I[172.16.1.109:8080, 
56.32.15.55:8080]
   5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080' in 
dimension table:  
  source  |   target
172.16.1.109:8080 56.32.15.54:8080
   6. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink still output result as not affected by changes to dimension table:  
+I[172.16.1.109:8080, 56.32.15.55:8080]
  Expect result:  +I[172.16.1.109:8080, 56.32.15.54:8080]
Could you give me some suggestions why regualar join result not be affected 
by changes to dimension table in mytest? Appreciation.



1095193...@qq.com



Flink sql regular join not working as expect.

2021-06-04 Thread 1095193...@qq.com
Hi
   I am working on joining a Kafka stream with a Postgres Dimension table.  
Accoring to: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/
 "Regular joins are the most generic type of join in which any new record, or 
changes to either side of the join, are visible and affect the entirety of the 
join result."
   However, in my test, change record in dimenstion table will not affect the 
result of the join.  My test steps:
   1. create Kafka table sql 
  CREATE TABLE test1 (  source String )  WITH (  'connector' = 'kafka',   
'topic' = 'test' ...)
   2.create dimension table sql 
 CREATE TABLE test2 (source String, target String)  WITH  ( 'connector' 
= 'jdbc'... )
 Prepared 1 record in dimenion table:
 source  |   target
  172.16.1.109:8080   | 56.32.15.55:8080 
   3. regular join sql
   select test1.source, test2.target from test1 join test2 on test1.source 
= test2.source
   4. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink could output result as expect:  +I[172.16.1.109:8080, 
56.32.15.55:8080]
   5. change field 'target'  from '56.32.15.55:8080' to '56.32.15.54:8080' in 
dimension table:  
  source  |   target
172.16.1.109:8080 56.32.15.54:8080
   6. feed data into Kafka
  {"source":"172.16.1.109:8080"}
  Flink still output result as not affected by changes to dimension table:  
+I[172.16.1.109:8080, 56.32.15.55:8080]
  Expect result:  +I[172.16.1.109:8080, 56.32.15.54:8080]
Could you give me some suggestions why regualar join result not be affected 
by changes to dimension table in mytest? Appreciation.



1095193...@qq.com


Flink sql task failure recovery does not work.

2021-05-12 Thread 1095193...@qq.com
Hi team,
   Following Task Failure Recovery document  
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/
 ,  I have  enabled state.checkpoints.dir parameter in flink-conf.yaml.
 
state.checkpoints.dir: hdfs://172.16.1.192:9000/flink-checkpoints
state.savepoints.dir: hdfs://172.16.1.192:9000/flink-savepoints

However, Flink sql Task still throw Exception 
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy.  Apparently, no restart strategy enabled. Do we 
have enabled other configuration rather than state.checkpoints.dir.  Thanks for 
any suggestions.



1095193...@qq.com


How to recovery from last count when using CUMULATE window after restart flink-sql job?

2021-05-07 Thread 1095193...@qq.com
Hi
   I have tried cumalate window function in Flink-1.13 sql to accumulate data 
from Kafka. When I restart a cumulate window sql job,  last count state is not 
considered and the count state accumulates from 1. Any solutions can help 
recovery from last count state when restarting Flink-sql job?
Thank you 


1095193...@qq.com


how to split a column value into multiple rows in flink sql?

2021-05-07 Thread 1095193...@qq.com
Hi
  For example , a table like this:
   A B  C
  --
   a1   b1c1,c2,c3
  ---
  how to split c1,c2,c3 into multiple rows like this in flink sql function:
A  B   C

a1  b1  c1
a1  b1  c2
a1  b1  c3 Thank you



1095193...@qq.com


How to comsume and send data with two different kerberos cetified kafka in one flink task?

2021-05-07 Thread 1095193...@qq.com
Hi
   By setting security.kerberos.* configure, we can connect one kerberos 
certified  Kafka in Flink sql  task. How to consume and produce with two 
different kerberos cetified Kafka in one flink sql task? Kafka
allow multiple SASL authenticated Java clients in a single JVM process. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-83+-+Allow+multiple+SASL+authenticated+Java+clients+in+a+single+JVM+process.
  How to acheive this in Flink sql task?


1095193...@qq.com


How to implement a FTP connector Flink Table/sql support?

2021-02-01 Thread 1095193...@qq.com
Hi
 I have investigate the relevant document and code about Flink connector. Flink 
support local filesystem and several pluggable file system which not include 
FTP. Could you give me some suggestions how to make Flink read data from FTP. 
One way I have learned is  implementing  FTP conncector accoring to 
user-defined Sources & Sinks. Have any other ways to read data from FTP? 
Appreciating any suggestions. 



1095193...@qq.com