?????? ??????Flink SQL?????????????????

2020-07-07 Thread 1193216154
hi Jark Wu.
??table.exec.source.idle-timeoutwatermarkwatermarkwatermarkwatermark??
??kafka??watermark??
eventTimewaterMark(??),??




--  --
??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout

On Tue, 7 Jul 2020 at 17:35, noake 

?????? ??????Flink SQL?????????????????

2020-07-07 Thread 1193216154
   Jark??flink??




--  --
??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
>
> On Tue, 7 Jul 2020 at 17:35, noake 

?????? ??????Flink SQL?????????????????

2020-07-08 Thread 1193216154
  https://issues.apache.org/jira/browse/FLINK-18523


--  --
??: "Jark Wu"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-source-idle-timeout
> >
> 
;
> ;
> > On Tue, 7 Jul 2020 at 17:35, noake 

?????? Flink SQL??????????????????????(??????)????????????

2020-07-08 Thread cs
join




--  --
??: "godfrey he"

flink sql????????????

2020-08-09 Thread op
Hi
    ??flink sql??
  val config = tableConfig.getConfiguration()
    config.setString("table.exec.mini-batch.enabled", "true") 
    config.setString("table.exec.mini-batch.allow-latency", "5s")
    config.setString("table.exec.mini-batch.size", "20")  
FsStateBackendRocksDBStateBackend??checkpoint
  
??checkpoint

?????? flink sql????????????

2020-08-09 Thread op
??minIdleStateRetentionTime ??
val tConfig = tableEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.minutes(5), Time.minutes(10)) 
1.11.0??sql??sessionid groupby 
count(*)??sessionid1
minibatch


--  --
??: 
   "user-zh"



?????? ?????? flink sql????????????

2020-08-10 Thread op
hi
grouby count(*)??




--  --
??: 
   "user-zh"



????????????flink sql ????????????????

2020-09-13 Thread xuzh
Dear all:
??sql??
https://github.com/wuchong/flink-sql-submit
https://github.com/springMoon/sqlSubmit


flink sql????????

2020-09-23 Thread ang
hi
??flink sqlkafka??event 
time5s??5s??waterwark??
WATERMARK FOR ts AS ts  - INTERVAL '5' SECODND 
??5sdatastream
 apiallowed lateness??sql??


Flink1.10.1
 

?????? flink sql????????

2020-09-23 Thread ang
benchao??config??sql??


 




--  --
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

ang <806040...@qq.com> ??2020??9??23?? 4:24??

> hi????
> ??flink 
sqlkafka??event
> 
time5s??5s??waterwark??
> WATERMARK FOR ts AS ts&nbsp; - INTERVAL '5' SECODND
> 
??5sdatastream
 apiallowed
> lateness??sql??
>
>
> Flink1.10.1
> &nbsp;



-- 

Best,
Benchao Li

flink sql??????????????????????

2020-11-26 Thread ??????????
??
    flink 
sql??percentile
    

flink sql??????????????????????

2020-12-02 Thread ??????????
??
    flink 
sql??percentile
    

??????flink sql????????????????

2020-12-19 Thread ?Y??????????????????
??Kafka??schemacsv??json??avro??schema??




--  --
??: 
   "user-zh"



flink sql ????????????

2021-01-31 Thread ???????L
flink sql+8,??

flink sql????????

2021-01-31 Thread ???????L
streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8"))

?????? flink sql????????

2021-01-31 Thread ???????L
 streamTableEnv.getConfig().setLocalTimeZone(ZoneId.of("+8")) 
.   ,??, 




--  --
??: 
   "user-zh"



flink sql

2021-02-01 Thread ???????L
hi, ?? ??1.12flink sql  
??datastream?,

Flink??????????????????????sql????????????

2021-03-04 Thread ????
    
 flink??sql??sql?
      
      example:
      tEnv.registerDataStream("tableName", dataStream, "id, 
name, age ,time");
      Table result = tEnv.sqlQuery("SQL" ); 
  
??SQL??

?????? flink sql??????????????????

2021-06-16 Thread ??????
FlinkSql WebIDE?? 
FlinkSQLSQL??SqlCli??
 https://github.com/DataLinkDC/dlink




--  --
??: "todd"https://github.com/todd5167/flink-spark-submiter??

spi

??
   - ??Flink 
client
??
   - ??
   - ??



--
Sent from: http://apache-flink.147419.n8.nabble.com/

?????? Flink sql ????????????????

2021-08-04 Thread ????
??lookup??on??key,??
??batch




--  --
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join

carlc 

flink sql????????????

2021-09-28 Thread z
hi??kafkaflink
 
sqlmysql??Aid??tsjoin??
??011:55A??0:01A??join??
??10+515??

??????Flink sql ??????????

2022-02-09 Thread ??????
https://github.com/DataLinkDC/dlink
??


--  --
??: 
   "user-zh"



?????? Flink sql ??????????

2022-02-09 Thread ??????
??




--  --
??: 
   "user-zh"

https://github.com/DataLinkDC/dlink
> ??
>
>
> --  --
> 
??:   
 
"user-zh"   
 

flink sql

2023-03-02 Thread 小昌同学
各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能


| |
小昌同学
|
|
ccc0606fight...@163.com
|

flink sql ????????

2020-03-19 Thread hiliuxg
hi all??
flink sql  count(disitinct)   ??bitmap ? 
java??set??

FLINK SQL????????????????????

2020-03-23 Thread ??????
??
kafkajson??
{"id":5,"price":40,"timestamp":1584942626828,"type":"math"}
{"id":2,"price":70,"timestamp":1584942629638,"type":"math"}
{"id":2,"price":70,"timestamp":1584942634951,"type":"math"}


timestamp??13SQL??
  - name: bookpojo
type: source-table
connector: 
  property-version: 1
  type: kafka
  version: "universal"
  topic: pojosource
  startup-mode: earliest-offset
  properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format: 
  property-version: 1
  type: json
  schema: "ROW

Flink SQL????????

2020-05-14 Thread Senior.Hu
Hi All??
  FlinkSqlParserImpl.FACTORYFlink DML SQLJoin with 
Temporal TableLATERAL
  LEFT JOIN
        side_room FOR SYSTEM_TIME AS OF a1.proctime as a2
    ON
        a1.rowkey_room = a2.rowkey
  
  LEFT JOIN LATERAL `side_room` FOR SYSTEM_TIME AS OF `a1`.`proctime` 
AS `a2` ON `a1`.`rowkey_room` = `a2`.`rowkey`
  
  ??SQL??Flink SQL??
  Caused by: org.apache.flink.table.api.SqlParserException: SQL parse 
failed. Encountered "`side_room`" at line 7, column 19.
  Was expecting one of:
    "TABLE" ...
    "(" ...
  
 SqlParser.Config
 private final SqlParser.Config config = SqlParser.configBuilder()
            
.setParserFactory(FlinkSqlParserImpl.FACTORY)
            .setQuoting(Quoting.BACK_TICK)
            .setUnquotedCasing(Casing.UNCHANGED)
            .setQuotedCasing(Casing.UNCHANGED)
            .setCaseSensitive(true)
            .build();
  
  ??

?????? Flink SQL????????

2020-05-18 Thread Senior.Hu
1.10




--  --
??: "Jark Wu"

??????flink sql ??????????????

2020-06-02 Thread 1048262223
Hi
    group by


select floor_day_hour_5minutes(timestamp), count(1)
from source table
group by floor_day_hour_5minutes(timestamp)


sqlretract


https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html


Best,
Yichao Yang


--  --
??: "steven chen"

??????flink sql ??????????????

2020-06-02 Thread 1530130567
Hi
   
   
  
 ??EventTime??




--  --
??: "steven chen"

Flink SQL ??????????????

2020-06-04 Thread 1048262223
Hi all


??
Flink ??1.10
Planner??old planner / blink planner



??Flink SQL

source??Tuple2.of(1, "{\"name\": \"a\"}");




query??select a.id, a.name, a.name from (select id, body_json_to_map(name) as 
name from data) a




sink??print




udf??body_json_to_map



public Map

Flink sql ????????????

2020-06-10 Thread op
hi??



..
val tConfig = 
bstEnv.getConfigconfg.withIdleStateRetentionTime(Time.minutes(10),Time.minutes(25))..val
 q1=bstEnv.sqlQuery(
  """select createTime,feedid from source
|where circleName is not null
|and circleName not in('','_')
|and action = 'C_FEED_EDIT_SEND'
|""".stripMargin)
 bstEnv.createTemporaryView("sourcefeed",q1)
val q2=bstEnv.sqlQuery(
  """select feedid,postfeedid,action from source
|where circleName is not null
|and circleName not in('','_')
|and action in('C_PUBLISH','C_FORWARD_PUBLISH')
|""".stripMargin)

bstEnv.createTemporaryView("postfeed",q2)
bstEnv.sqlQuery(
  """
|select count(b.postfeedid) from
|sourcefeed a
|join postfeed b
|on a.feedid=b.postfeedid
  """.stripMargin).toRetractStream[Row](confg).print("")
//25webid1??join??state

??Flink Sql ????????????????????????????

2020-06-27 Thread ????????
Hi,all:


Flink sql 
,??A??c1,c2,c3c1c4c1,c4,c2,c3.


.

?????? ??Flink Sql ????????????????????????????

2020-06-28 Thread ????????
Hi,


??c1,c2,c3??c4
alter tablec4c1,c4,c2,c3??
??.






--  --
??: "Jark Wu"

????flink sql????

2020-06-30 Thread zya
Hi ??
    ??flink 
sqlsource??kafka??sink??mysql??
??mysql??keymysqlflink1.10??

?????? ????flink sql????

2020-06-30 Thread zya
sinksink





 




--  --
??: "Benchao Li"

?????? ????flink sql????

2020-07-01 Thread zya
blink-planner??temporal 
table??


sqlinsert into mysql_sink select C.log_id, C.vic from (select A.log_id, 
case when B.cnt>0 and A.server>0 then B.cnt+1 else A.server end as vic 
from (select log_id, server, PROCTIME() as proctime from kafka_source) A left 
join mysql_source for SYSTEM_TIME AS OF A.proctime AS B on A.log_id=B.log_id ) 
C group by log_id,vic 


??https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html


redis ?? temporal 
table??source??sink

~


 




--  --
??: "Benchao Li"

flink sql??????????????????

2020-07-05 Thread op
??sql??  select day,
           count(id),
           sum(v1) from
(
select 
          day ,
          id ,
          sum(v1) v1 from source
      group by day,
                   id
)t


group by day



tConfig.setIdleStateRetentionTime(Time.minutes(1440),Time.minutes(1450))


??id??14??checkpoint
??1.10.0

??Flink sql 1.10.0??????

2020-07-26 Thread ????????
Hi,all:

Flink 1.10.0  sql???
.

flink sql eos

2020-08-05 Thread sllence
大家好

   请问目前flink sql是不是不能没有开启全局端到端精确一致性(eos)的方
式,

目前仅有kafka实现了TwoPhaseCommitSinkFunction,但kafka的ddl中也没有属性去设
置Semantic为EXACTLY_ONCE

 

我们是否可以去支持更多的事务性connector,并可以在flink sql维度支持开启全局的
端到端一致性,并为每个connector是否支持EXACTLY_ONCE进行验证,

当开启全局EXACTLY_ONCE并且所有使用的connector都支持EXACTLY_ONCE,是否整个应
用程序就可以做到端到端的精确一致性



flink sql count????

2020-09-27 Thread zya
??
sqlcountcountcount
??hive??count(if(name like '%',1 , null))??????flink 
sql??count??null
flink1.10.1 blink
 

??????flink sql count????

2020-09-27 Thread zya
??sum??mysql??
 




--  --
??: 
   "user-zh"



??????flink sql count????

2020-09-27 Thread zya





--  --
??: 
   "user-zh"



??????flink sql count????

2020-09-27 Thread zya
??


--  --
??: 
   "user-zh"



??????flink sql count????

2020-09-27 Thread zya
??




--  --
??: 
   "user-zh"



flink sql ????mysql????

2020-09-28 Thread lemon
hi??
??mysql??20??insert 
into??insert??
insert 
intomysql??
insert into mysql select a,b c from 
kafkaa,b??c??
flink1.10.1 blink

?????? flink sql count????

2020-09-29 Thread lemon
count0
flinkcount??ifwhere??
?? select count(if(name like '%',1 , null)) where name 
like '%'  or name like '%'  group by ** 


--  --
??: 
   "user-zh"

http://apache-flink.147419.n8.nabble.com/

????????flink sql???????? result????????

2020-10-17 Thread ??????
??


??
https://paste.ubuntu.com/p/Mx96MWjQ83/


result1??
??

????????flink sql???????? result????????

2020-10-17 Thread ??????
??


??
https://paste.ubuntu.com/p/Mx96MWjQ83/


result1??
??

flink sql ????mysql ??????????

2020-11-25 Thread ??????
Hi All,
    Ideaflink-jdbc-connectormysql, 
env.executeSql("select * from 
my_table").print() flink1.11.2

??????flink sql ????mysql ??????????

2020-11-25 Thread ??????
mysql




--  --
??: 
   "??" 
   


?????? flink sql ????mysql ??????????

2020-11-25 Thread ??????
Hi,
  ??execute??
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useAnyPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, 
settings);


bsTableEnv.getConfig().getConfiguration().set(
      ExecutionCheckpointingOptions.CHECKPOINTING_MODE, 
CheckpointingMode.EXACTLY_ONCE);
bsTableEnv.getConfig().getConfiguration().set(
      ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(10));


String ddl = "CREATE TABLE meson_budget_data (\n" +
                "  ID BIGINT,\n" +
                "  Gangtise_Code 
STRING,\n" +
                "  InnerCode 
INT,\n" +
                "  Tradingday 
DATE, \n" +
                "  ClosePrice 
DOUBLE,\n" +
                "  Close_3day 
DOUBLE,\n" +
                "  Close_5day 
DOUBLE,\n" +
                "  Close_10day 
DOUBLE,\n" +
                "  Close_20day 
DOUBLE,\n" +
                "  Close_60day 
DOUBLE,\n" +
                "  Close_120day 
DOUBLE,\n" +
                "  Close_250day 
DOUBLE,\n" +
                "  PRIMARY KEY 
(ID) NOT ENFORCED\n" +
                "  ) WITH(\n" +
                "    
'connector' = 'jdbc',\n" +
                "    'url' = 
'jdbc:mysql://gangtisedb.mysql.database.chinacloudapi.cn_3306',\n" +
                "    
'table-name' = 'gangtise_budget.meson_budget_data',\n" +
                // "    
'username' = 'root',\n" +
                // "    
'password' = '123456',\n" +
                "    'driver' 
= 'com.mysql.jdbc.Driver'\n" +
                "  )";


bsTableEnv.executeSql(ddl);
TableResult mtr = bsTableEnv.executeSql("select * from meson_budget_data");
mtr.print();


env.execute("sql test");
// bsTableEnv.execute("sql test");



??


--  --
??: 
   "user-zh"

http://apache-flink.147419.n8.nabble.com/

?????? flink sql ????mysql ??????????

2020-11-25 Thread ??????
Hi
DEBUG??mysql




--  --
??: 
   "user-zh"



flink sql 1.11.1

2020-12-01 Thread zzy
遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql


sql语句如下:
CREATE TABLE sls_log_sz_itsp (
request STRING,
http_bundleId STRING,
upstream_addr STRING,
http_appid STRING,
bodyUserId STRING,
http_sequence STRING,
http_version STRING,
response_body STRING,
uri STRING,
bytes_sent STRING,
http_userId STRING,
http_cityId STRING,
http_user_agent STRING,
http_deviceType STRING,
record_time STRING,
rt AS TO_TIMESTAMP(DATE_FORMAT(record_time,'-MM-dd HH:mm:ss')),
WATERMARK FOR rt AS rt - INTERVAL '5' SECOND,
request_time STRING,
request_body STRING,
request_length STRING,
nginx_id STRING,
proxy_add_x_forwarded_for STRING,
http_deviceId STRING,
host STRING,
upstream_response_time STRING,
status STRING
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'sls',
'connector.properties.zookeeper.connect' = 
'hadoop85:2181,hadoop86:2181,hadoop87:2181',
'connector.properties.bootstrap.servers' = 
'hadoop85:9092,hadoop86:9092,hadoop87:9092',
'connector.properties.group.id' = 'log-sz-itsp',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
);



CREATE TABLE sz_itsp_test(
request STRING,
request_count BIGINT NOT NULL,
window_end TIMESTAMP(3)
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 
'jdbc:mysql://hadoop85:3306/test?useSSL=false&autoReconnect=true',
'connector.table' = 'sz_itsp_test',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = '00',
'connector.write.flush.max-rows' = '1',
'connector.write.flush.interval' = '2s',
'connector.write.max-retries' = '3'
);


INSERT INTO sz_itsp_test
SELECT
request,
count(request) request_count,
TUMBLE_END(rt, INTERVAL '5' MINUTE) AS window_end
FROM sls_log_sz_itsp
WHERE nginx_id = 'sz-itsp' AND nginx_id IS NOT NULL
GROUP BY TUMBLE(rt, INTERVAL '5' MINUTE), request
;


sql client使用中出现如下报错:
Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
Caused by: java.lang.RuntimeException: Error running SQL job.
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:608)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:529)
at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537)
at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:605)
... 8 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:999)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$14(FutureUtils.java:427)
at java.util.concurrent.Executors$RunnableAda

Flink SQL????source ????

2020-12-02 Thread zz
hi??
??source 
table??topic??6??sinkinsert 
mysqlinsert
source 
tablekafkakafka 
topic??
topic??18??18

flink sql????????UpsertStreamTableSink

2020-12-07 Thread ??????????
??
    flink1.10??hbase
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.

    s"""
   |INSERT INTO ${databaseName}.response_time_sink
   |SELECT
   |  rowkey,
   |  ROW(`day`, `time`, initialize_route_avg_time, update_detour_avg_time, 
replace_avg_time, deviate_avg_time) AS cf
   |FROM
   |(
   |select CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range) 
rowkey,`day`,`time`,
   |  MAX(CASE req_type WHEN '0' THEN num else 0 END) 
initialize_route_avg_time,
   |  MAX(CASE req_type WHEN '1' THEN num else 0 END) 
update_detour_avg_time,
   |  MAX(CASE req_type WHEN '2' THEN num else 0 END) replace_avg_time,
   |  MAX(CASE req_type WHEN '3' THEN num else 0 END) deviate_avg_time
   |from
   |  (SELECT DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, 
INTERVAL '10' SECOND)), '-MM-dd') `day`,
   |  UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, 
TUMBLE_START(proctime, INTERVAL '10' SECOND)), '-MM-dd HH:mm:ss')) * 1000 
AS `time`,
   |  req_type,
   |  (CASE WHEN ResponseRemainingMile<=50 THEN '1'
   | WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 
250 THEN '2'
   | WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 
500 THEN '3'
   | WHEN ResponseRemainingMile> 500 THEN '4' END) as 
distance_range,
   |  CAST(AVG(`value`) AS INT) num
   |FROM
   |  ${databaseName}.metric_stream
   |WHERE
   |  metric = 'http_common_request'
   |GROUP BY
   |  TUMBLE(proctime, INTERVAL '10' SECOND),req_type,(CASE WHEN 
ResponseRemainingMile<=50 THEN '1'
   |WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 250 
THEN '2'
   |WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 500 
THEN '3'
   |WHEN ResponseRemainingMile> 500 THEN '4' END)) 
   |  group by CONCAT_WS('_',CAST(`time` AS 
VARCHAR),distance_range),`day`,`time`
   |) a
   |""".stripMargin    

flink sql????????UpsertStreamTableSink

2020-12-07 Thread ??????????
??
    flink1.10??hbase
Exception in thread "main" org.apache.flink.table.api.TableException: 
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.

    s"""
   |INSERT INTO ${databaseName}.response_time_sink
   |SELECT
   |  rowkey,
   |  ROW(`day`, `time`, initialize_route_avg_time, update_detour_avg_time, 
replace_avg_time, deviate_avg_time) AS cf
   |FROM
   |(
   |select CONCAT_WS('_',CAST(`time` AS VARCHAR),distance_range) 
rowkey,`day`,`time`,
   |  MAX(CASE req_type WHEN '0' THEN num else 0 END) 
initialize_route_avg_time,
   |  MAX(CASE req_type WHEN '1' THEN num else 0 END) 
update_detour_avg_time,
   |  MAX(CASE req_type WHEN '2' THEN num else 0 END) replace_avg_time,
   |  MAX(CASE req_type WHEN '3' THEN num else 0 END) deviate_avg_time
   |from
   |  (SELECT DATE_FORMAT(TIMESTAMPADD(HOUR, 8, TUMBLE_START(proctime, 
INTERVAL '10' SECOND)), '-MM-dd') `day`,
   |  UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, 
TUMBLE_START(proctime, INTERVAL '10' SECOND)), '-MM-dd HH:mm:ss')) * 1000 
AS `time`,
   |  req_type,
   |  (CASE WHEN ResponseRemainingMile<=50 THEN '1'
   | WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 
250 THEN '2'
   | WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 
500 THEN '3'
   | WHEN ResponseRemainingMile> 500 THEN '4' END) as 
distance_range,
   |  CAST(AVG(`value`) AS INT) num
   |FROM
   |  ${databaseName}.metric_stream
   |WHERE
   |  metric = 'http_common_request'
   |GROUP BY
   |  TUMBLE(proctime, INTERVAL '10' SECOND),req_type,(CASE WHEN 
ResponseRemainingMile<=50 THEN '1'
   |WHEN ResponseRemainingMile> 50 AND ResponseRemainingMile<= 250 
THEN '2'
   |WHEN ResponseRemainingMile> 250 AND ResponseRemainingMile<= 500 
THEN '3'
   |WHEN ResponseRemainingMile> 500 THEN '4' END)) 
   |  group by CONCAT_WS('_',CAST(`time` AS 
VARCHAR),distance_range),`day`,`time`
   |) a
   |""".stripMargin    

flink-sql????-??????????state

2020-12-17 Thread ??????
?? flink 
sql??cdccdc??state?? 
 state??
val config: TableConfig = tabEnv.getConfig
config.setIdleStateRetention(Duration.ofHours(1))

??????????flink-sql????????????????????????????State????

2021-01-05 Thread ??????
flink??flink-on-yarn??jobTimeStamp>current_dateenv.setStateBackend(new
 
MemoryStateBackend())??job??State??connector??upsert-kafka??dwd??sqlsqldwdupsert-kafka
|  select
|   TO_DATE(cast(doi.DeliveryTime as String),'-MM-dd') 
as  days,
|   doi.UserId,
|   count(doi.Code) as   SendTime,
|   sum(doi.PayAmount / 100) as SendCashcharge,
|   sum(doi.PayAmount / 100 - ChargeAmount / 100 + 
UseBalance / 100) as  SendCashuse,
|   sum(doi.CashMoney / 100)as  SendCash
|from dwd_order_info doi
|where doi.DeliveryTime >cast(current_date AS TIMESTAMP) 
and doi.OrderType = 29 and doi.Status >= 50 and doi.Status <> 60
|group by TO_DATE(cast(doi.DeliveryTime as 
String),'-MM-dd'), doi.UserId

flink sql hop????????????????????

2021-01-14 Thread bigdata
??
        
flink1.10.1,??=-??

flink sql hop????????????????????

2021-01-14 Thread bigdata
??
        
flink1.10.1,??=-??
SELECT
|DATE_FORMAT(TIMESTAMPADD(HOUR, 8, HOP_START(proctime, INTERVAL '$slide' 
SECOND, INTERVAL '$size' MINUTE)), '-MM-dd HH:mm:ss') start_time,
|UNIX_TIMESTAMP(DATE_FORMAT(TIMESTAMPADD(HOUR, 8, HOP_START(proctime, 
INTERVAL '$slide' SECOND, INTERVAL '$size' MINUTE)), '-MM-dd HH:mm:ss')) * 
1000 AS `time`,
|CAST(COUNT(distinct drive_id) AS INT) num
|  FROM
|${databaseName}.log_stream
|  WHERE
|req_type = '1' and navigation_flag=' '
|  GROUP BY
|HOP(proctime, INTERVAL '$slide' SECOND, INTERVAL '$size' MINUTE)
3> 2021-01-15 13:43:20,161068940,1
4> 2021-01-15 13:43:30,161068941,2
1> 2021-01-15 13:43:40,161068942,3
2> 2021-01-15 13:43:50,161068943,3
3> 2021-01-15 13:44:00,161068944,3
4> 2021-01-15 13:44:10,161068945,3
1> 2021-01-15 13:44:20,161068946,3
2> 2021-01-15 13:44:30,161068947,3
3> 2021-01-15 13:44:40,161068948,3
4> 2021-01-15 13:44:50,161068949,3
1> 2021-01-15 13:45:00,161068950,3
2> 2021-01-15 13:45:10,161068951,3
3> 2021-01-15 13:45:20,161068952,3

Re: flink sql

2021-02-04 Thread HunterXHunter
我做了。。
添加了一个sql语法类似

"select " +
"msg," +
"count(1) cnt" +
" from test" +
" where msg = 'hello' " +
" group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " +
" EMIT \n" +
"  WITH DELAY '10' SECOND BEFORE WATERMARK,\n" +
"  WITHOUT DELAY AFTER WATERMARK";

每10s触发一次窗口计算。
参考阿里云的Emit。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


?????? flink sql??????????????io??????????

2021-03-05 Thread ????
??




--  --
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/legacy.html#temporal-table-function-join
flink sql?? temporal-table join io

casel.chen 

Flink SQL CodeGenException

2021-05-15 Thread sherlock c
Flink version: 1.12.0 

在使用 Flink 执行 Flink SQL  流表 join 维表, 运行报错(流表SQL 和维表SQL单独运行都没有问题), 错误堆栈信息如下:

Exception in thread "main" java.lang.RuntimeException: 
org.apache.flink.table.planner.codegen.CodeGenException: Unable to find common 
type of GeneratedExpression(field$18,isNull$17,,STRING,None) and 
ArrayBuffer(GeneratedExpression(((int) 4),false,,INT NOT NULL,Some(4)), 
GeneratedExpression(((int) 8),false,,INT NOT NULL,Some(8))).
at com.hmd.stream.SqlSubmit.main(SqlSubmit.java:47)
Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unable to 
find common type of GeneratedExpression(field$18,isNull$17,,STRING,None) and 
ArrayBuffer(GeneratedExpression(((int) 4),false,,INT NOT NULL,Some(4)), 
GeneratedExpression(((int) 8),false,,INT NOT NULL,Some(8))).
at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.$anonfun$generateIn$2(ScalarOperatorGens.scala:307)
at scala.Option.orElse(Option.scala:289)
at 
org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens$.generateIn(ScalarOperatorGens.scala:307)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:724)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:507)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$visitCall$2(ExprCodeGenerator.scala:526)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:517)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:56)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:155)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:143)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.produceProjectionCode$1(CalcCodeGenerator.scala:143)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateProcessCode(CalcCodeGenerator.scala:190)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:84)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:84)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:59)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:57)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLookupJoin.translateToPlan(StreamExecLookupJoin.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCal

??????Flink SQL ????????????DynamoDB

2021-06-14 Thread Asahi Lee
https://flink-packages.org/packages/streaming-flink-dynamodb-connector




--  --
??: 
   "user-zh"



Re:?????? flink sql??????????????????

2021-06-21 Thread Michael Ran
k8s ??
?? 2021-06-16 18:22:29??"??" <809097...@qq.com> ??
>FlinkSql WebIDE?? 
>FlinkSQLSQL??SqlCli??
> https://github.com/DataLinkDC/dlink
>
>
>
>
>--  --
>??: "todd": 2021??6??16??(??) 5:48
>??: "user-zh": Re: flink sql??
>
>
>
>Flink 
>apihttps://github.com/todd5167/flink-spark-submiter??
>
>spi
>
>??
>   - ??Flink 
>client
>??
>   - ??
>   - ??
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


?????? flink sql streaming????????????????????????

2021-09-07 Thread ??????
??


--  --
??: 
   "user-zh"



flink??sql??hdfs????????????????

2021-11-17 Thread ??????
flinksql??HDFS:part-c4a19762-bde3-4f37-8b3c-b92d182b450c-task-0-file-0
??,flink??sql??hdfs
??


 

??????flink-sql??????kafka ??????????????????????

2022-03-22 Thread ??????
--  --
??: 
   "user-zh"



flink sql????????????GroupWindowAggregate????????????????

2022-06-30 Thread ????
HI???6?7??flink1.12??Sql??GroupWindowAggregateSql??GroupWindowAggregate
CREATE TEMPORARY TABLE RawSource (
`key` STRING,
`accessNum` INT,
`status` STRING,
rowTime TIMESTAMP(3),
WATERMARK FOR rowTime AS rowTime - INTERVAL '10' SECOND
) WITH (
'connector' = 'datagen'
);

CREATE TEMPORARY TABLE TrashSink (
`tag` STRING,
`key` STRING,
`value` BIGINT
) WITH (
'connector' = 'blackhole'
);

CREATE TEMPORARY VIEW AccView AS SELECT
COUNT(*) AS accAll,
COUNT(*) FILTER (WHERE status in ('error')) AS accError,
`key`
FROM RawSource
GROUP BY TUMBLE(rowTime, INTERVAL '60' SECOND),`key`;

INSERT INTO TrashSink SELECT * FROM (
SELECT 'accAll', `key`, accAll FROM AccView
UNION ALL
SELECT 'accErr', `key`, accError FROM AccView
);

Re: flink sql

2023-03-03 Thread 小昌同学
好滴  谢谢大佬呀


| |
小昌同学
|
|
ccc0606fight...@163.com
|
 Replied Message 
| From | 17610775726<17610775...@163.com> |
| Date | 3/3/2023 15:55 |
| To | user-zh@flink.apache.org |
| Cc | user-zh |
| Subject | Re:flink sql |
Hi



可以通过设置 pipeline.operator-chaining = false 来实现。


Best
JasonLee


 Replied Message 
| From | 小昌同学 |
| Date | 03/3/2023 15:50 |
| To | user-zh |
| Subject | flink sql |
各位大佬,请教一下如何使用flink sql实现DataStreaming的disableOperatorChaining功能


| |
小昌同学
|
|
ccc0606fight...@163.com
|

??Flink SQL??????????env.yaml

2019-03-31 Thread ??????



   ??Flink SQL 
??yaml,
 hive??'\036'


[root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded -e 
/home/hadoop/flink_test/env.yaml
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
No default environment specified.
Searching for '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: 
file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
Reading session environment from: file:/home/hadoop/flink_test/env.yaml




Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Could not parse environment file. Cause: YAML decoding problem: while parsing a 
block collection
 in 'reader', line 2, column 2:
 - name: MyTableSource
 ^
expected , but found BlockMappingStart
 in 'reader', line 17, column 3:
  schema:
  ^
 (through reference chain: 
org.apache.flink.table.client.config.Environment["tables"])
at 
org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
at 
org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)










--env.yaml
tables:
 - name: MyTableSource
   type: source-table
   update-mode: append
   connector:
 type: filesystem
 path: "/home/hadoop/flink_test/input.csv"
   format:
type: csv
fields:
- name: MyField1
  type: INT
- name: MyField2
  type: VARCHAR
line-delimiter: "\n"
comment-prefix: "#"
  schema:
- name: MyField1
type: INT
- name: MyField2
type: VARCHAR
 - name: MyCustomView
   type: view
   query: "SELECT MyField2 FROM MyTableSource"
# Execution properties allow for changing the behavior of a table program.
execution:
 type: streaming # required: execution mode either 'batch' or 'streaming'
 result-mode: table # required: either 'table' or 'changelog'
 max-table-result-rows: 100 # optional: maximum number of maintained rows in
 # 'table' mode (100 by default, smaller 1 means unlimited)
 time-characteristic: event-time # optional: 'processing-time' or 'event-time' 
(default)
 parallelism: 1 # optional: Flink's parallelism (1 by default)
 periodic-watermarks-interval: 200 # optional: interval for periodic 
watermarks(200 ms by default)
 max-parallelism: 16 # optional: Flink's maximum parallelism (128by default)
 min-idle-state-retention: 0 # optional: table program's minimum idle state time
 max-idle-state-retention: 0 # optional: table program's maximum idle state time
 restart-strategy: # optional: restart strategy
   type: fallback # "fallback" to global restart strategy by
default
# Deployment properties allow for describing the cluster to which table 
programsare submitted to.
deployment:
  response-timeout: 5000

?????? ??Flink SQL??????????env.yaml

2019-04-01 Thread ????
--  --
??: "Zhenghua Gao";
: 2019??4??1??(??) 3:40
??: "user-zh";

????: Re: ??Flink SQL??env.yaml



yaml??
??yaml  [1]
yaml [2][3]

[1] http://nodeca.github.io/js-yaml/
[2] http://www.ruanyifeng.com/blog/2016/07/yaml.html
[3] https://en.wikipedia.org/wiki/YAML

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 11:51 AM ?? <469663...@qq.com> wrote:

> ????
>
>??Flink SQL 
> ??yaml,
> hive??'\036'
>
> [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Could not parse
> environment file. Cause: YAML decoding problem: while parsing a block
> collection
>  in 'reader', line 2, column 2:
>  - name: MyTableSource
>  ^
> expected , but found BlockMappingStart
>  in 'reader', line 17, column 3:
>   schema:
>   ^
>  (through reference chain:
> org.apache.flink.table.client.config.Environment["tables"])
> at
> org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
> at
> org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
>
>
>
>
> --env.yaml
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
> fields:
> - name: MyField1
>   type: INT
> - name: MyField2
>   type: VARCHAR
> line-delimiter: "\n"
> comment-prefix: "#"
>   schema:
> - name: MyField1
> type: INT
> - name: MyField2
> type: VARCHAR
>  - name: MyCustomView
>type: view
>query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the behavior of a table program.
> execution:
>  type: streaming # required: execution mode either 'batch' or 'streaming'
>  result-mode: table # required: either 'table' or 'changelog'
>  max-table-result-rows: 100 # optional: maximum number of maintained
> rows in
>  # 'table' mode (100 by default, smaller 1 means unlimited)
>  time-characteristic: event-time # optional: 'processing-time' or
> 'event-time' (default)
>  parallelism: 1 # optional: Flink's parallelism (1 by default)
>  periodic-watermarks-interval: 200 # optional: interval for periodic
> watermarks(200 ms by default)
>  max-parallelism: 16 # optional: Flink's maximum parallelism (128by
> default)
>  min-idle-state-retention: 0 # optional: table program's minimum idle
> state time
>  max-idle-state-retention: 0 # optional: table program's maximum idle
> state time
>  restart-strategy: # optional: restart strategy
>type: fallback # "fallback" to global restart strategy by
> default
> # Deployment properties allow for describing the cluster to which table
> programsare submitted to.
> deployment:
>   response-timeout: 5000
>
>

?????? ??Flink SQL??????????env.yaml

2019-04-01 Thread ????
??


--  --
??: "Zhenghua Gao";
: 2019??4??1??(??) 3:40
??: "user-zh";

????: Re: ??Flink SQL??env.yaml



yaml??
??yaml  [1]
yaml [2][3]

[1] http://nodeca.github.io/js-yaml/
[2] http://www.ruanyifeng.com/blog/2016/07/yaml.html
[3] https://en.wikipedia.org/wiki/YAML

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 11:51 AM ?? <469663...@qq.com> wrote:

> ????
>
>??Flink SQL 
> ??yaml,
> hive??'\036'
>
> [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Could not parse
> environment file. Cause: YAML decoding problem: while parsing a block
> collection
>  in 'reader', line 2, column 2:
>  - name: MyTableSource
>  ^
> expected , but found BlockMappingStart
>  in 'reader', line 17, column 3:
>   schema:
>   ^
>  (through reference chain:
> org.apache.flink.table.client.config.Environment["tables"])
> at
> org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
> at
> org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
>
>
>
>
> --env.yaml
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
> fields:
> - name: MyField1
>   type: INT
> - name: MyField2
>   type: VARCHAR
> line-delimiter: "\n"
> comment-prefix: "#"
>   schema:
> - name: MyField1
> type: INT
> - name: MyField2
> type: VARCHAR
>  - name: MyCustomView
>type: view
>query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the behavior of a table program.
> execution:
>  type: streaming # required: execution mode either 'batch' or 'streaming'
>  result-mode: table # required: either 'table' or 'changelog'
>  max-table-result-rows: 100 # optional: maximum number of maintained
> rows in
>  # 'table' mode (100 by default, smaller 1 means unlimited)
>  time-characteristic: event-time # optional: 'processing-time' or
> 'event-time' (default)
>  parallelism: 1 # optional: Flink's parallelism (1 by default)
>  periodic-watermarks-interval: 200 # optional: interval for periodic
> watermarks(200 ms by default)
>  max-parallelism: 16 # optional: Flink's maximum parallelism (128by
> default)
>  min-idle-state-retention: 0 # optional: table program's minimum idle
> state time
>  max-idle-state-retention: 0 # optional: table program's maximum idle
> state time
>  restart-strategy: # optional: restart strategy
>type: fallback # "fallback" to global restart strategy by
> default
> # Deployment properties allow for describing the cluster to which table
> programsare submitted to.
> deployment:
>   response-timeout: 5000
>
>

?????? ??Flink SQL??????????env.yaml

2019-04-01 Thread ??????
-result-rows: 100 
# optional: maximum number of maintained rows in table mode 100 by default, 
smaller 1 means unlimited
 time-characteristic: event-time 
# optional: 'processing-time' or 'event-time' (default)
 parallelism: 1 
# optional: Flink's parallelism (1 by default)
 periodic-watermarks-interval: 200 
# optional: interval for periodic watermarks(200 ms by default)
 max-parallelism: 16 
# optional: Flink's maximum parallelism (128by default)
 min-idle-state-retention: 0 
# optional: table program's minimum idle state time
 max-idle-state-retention: 0 
# optional: table program's maximum idle state time
 restart-strategy: 
# optional: restart strategy
   type: fallback 
# "fallback" to global restart strategy by default # Deployment properties 
allow for describing the cluster to which table programsare submitted to.
deployment:
  response-timeout: 5000





--  --
??: ""<1543332...@qq.com>;
: 2019??4??1??(??) 3:46
??: "user-zh";

: ?? ??Flink SQL??env.yaml



??


--  --
??: "Zhenghua Gao";
: 2019??4??1??(??) 3:40
??: "user-zh";

: Re: ??Flink SQL??env.yaml



yaml??
??yaml  [1]
yaml [2][3]

[1] http://nodeca.github.io/js-yaml/
[2] http://www.ruanyifeng.com/blog/2016/07/yaml.html
[3] https://en.wikipedia.org/wiki/YAML

*Best Regards,*
*Zhenghua Gao*


On Mon, Apr 1, 2019 at 11:51 AM ?? <469663...@qq.com> wrote:

> 
>
>??Flink SQL 
> ??yaml,
> hive??'\036'
>
> [root@server2 bin]# /home/hadoop/flink-1.7.2/bin/sql-client.sh embedded
> -e /home/hadoop/flink_test/env.yaml
> Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was
> set.
> No default environment specified.
> Searching for
> '/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hadoop/flink-1.7.2/conf/sql-client-defaults.yaml
> Reading session environment from: file:/home/hadoop/flink_test/env.yaml
>
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Could not parse
> environment file. Cause: YAML decoding problem: while parsing a block
> collection
>  in 'reader', line 2, column 2:
>  - name: MyTableSource
>  ^
> expected , but found BlockMappingStart
>  in 'reader', line 17, column 3:
>   schema:
>   ^
>  (through reference chain:
> org.apache.flink.table.client.config.Environment["tables"])
> at
> org.apache.flink.table.client.config.Environment.parse(Environment.java:146)
> at
> org.apache.flink.table.client.SqlClient.readSessionEnvironment(SqlClient.java:162)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:90)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
>
>
>
>
> --env.yaml
> tables:
>  - name: MyTableSource
>type: source-table
>update-mode: append
>connector:
>  type: filesystem
>  path: "/home/hadoop/flink_test/input.csv"
>format:
> type: csv
> fields:
> - name: MyField1
>   type: INT
> - name: MyField2
>   type: VARCHAR
> line-delimiter: "\n"
> comment-prefix: "#"
>   schema:
> - name: MyField1
> type: INT
> - name: MyField2
> type: VARCHAR
>  - name: MyCustomView
>type: view
>query: "SELECT MyField2 FROM MyTableSource"
> # Execution properties allow for changing the behavior of a table program.
> execution:
>  type: streaming # required: execution mode either 'batch' or 'streaming'
>  result-mode: table # required: either 'table' or 'changelog'
>  max-table-result-rows: 100 # optional: maximum number of maintained
> rows in
>  # 'table' mode (100 by default, smaller 1 means unlimited)
>  time-characteristic: event-time # optional: 'processing-time' or
> 'event-time' (default)
>  parallelism: 1 # optional: Flink's parallelism (1 by default)
>  periodic-watermarks-interval: 200 # optional: interval for periodic
> watermarks(200 ms by default)
>  max-parallelism: 16 # optional: Flink's maximum parallelism (128by
> default)
>  min-idle-state-retention: 0 # optional: table program's minimum idle
> state time
>  max-idle-state-retention: 0 # optional: table program's maximum idle
> state time
>  restart-strategy: # optional: restart strategy
>type: fallback # "fallback" to global restart strategy by
> default
> # Deployment properties allow for describing the cluster to which table
> programsare submitted to.
> deployment:
>   response-timeout: 5000
>
>

Flink sql count ??????????????

2020-02-26 Thread ??????
??:
        
flink??count
        count([all] Expression | distinct Expression1 [, 
Expression2])
        
distinct??
 select count(distinct a, b) from mytable??
        sql  
codegenexpression. No matching accumulate methods for Aggregation function. 
..CountAggFunction with Parameters (Java.lang.String, java.lang.Long)
         Flink1.8??1.9

??????Flink sql count ??????????????

2020-02-26 Thread apache22
??  
count(distinct )   ,  distinct  a,b 
count(distinct  concat(a,b))


| |
apache22
|
|
apach...@163.com
|
??
??2020??2??26?? 18:21 ??
??:
        
flink??count
        count([all] Expression | distinct Expression1 [, 
Expression2])
        
distinct??
 select count(distinct a, b) from mytable??
        sql  
codegenexpression. No matching accumulate methods for Aggregation function. 
..CountAggFunction with Parameters (Java.lang.String, java.lang.Long)
         Flink1.8??1.9

flink SQL ??OverWindow

2020-04-09 Thread ??????
Flink SQLOverWindow

Flink SQL????????????proctime()????????????????

2020-04-10 Thread Night_xing
Flink??1.10.0

??BlinkPlanner??java??CSVproctime??
??

        tableEnv.connect(new FileSystem()
                
.path("file:///Users/test/csv/demo.csv")
        )
                .withFormat(new Csv())
                .withSchema(
                      
  new Schema()
                      
          .field("id", DataTypes.STRING())
                      
          .field("name", DataTypes.STRING())
                      
          .field("user_action_time", 
DataTypes.TIMESTAMP(3)).proctime()
                )
                
.registerTableSource("csv_table");

??
Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Unsupported property keys:
schema.#.proctime

proctime??DDL
        tableEnv.sqlUpdate("create table csv_table(" +
                "id string," +
                "name string," +
                "user_action_time as 
PROCTIME()" +
                ") with (" +
                " 'connector.type' = 
'filesystem'," +
                " 'connector.path' = 
'file:///Users/test/csv/demo.csv'," +
                " 'format.type' = 
'csv'" +
                ")");

Flink SQL????????????proctime()????????????????

2020-04-10 Thread Night_xing
Flink??1.10.0

??BlinkPlanner??java??CSVproctime??

??
tableEnv.connect(new FileSystem()
.path("file:///Users/test/csv/demo.csv")
)
.withFormat(new Csv())
.withSchema(
new Schema()
.field("id", DataTypes.STRING())
.field("name", DataTypes.STRING())
.field("user_action_time", 
DataTypes.TIMESTAMP(3)).proctime()
)
.registerTableSource("csv_table");

??
Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Unsupported property keys:
schema.#.proctime

proctime??DDL
tableEnv.sqlUpdate("create table csv_table(" +
"id string," +
"name string," +
"user_action_time as PROCTIME()" +
") with (" +
" 'connector.type' = 'filesystem'," +
" 'connector.path' = 'file:///Users/test/csv/demo.csv'," +
" 'format.type' = 'csv'" +
")");

?????????????? Flink SQL State ????????

2020-05-26 Thread ????
hi!

 Flink SQL  job state 

SQL??source ?? sink  kafka ??5 
server,reason  role_id 

state 
source state 
?? SQL ??

??

??



CREATE TABLE source_kafka (
  dtime string,
  wm as cast(dtime as TIMESTAMP(3)),
  server string,
  reason string,
  role_id string,
  WATERMARK FOR wm AS wm - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'connector.properties.group.id' = 'xxx',
  'format.type' = 'json',
)
-

CREATE TABLE sink_kafka (
  window_time string,
  server string,
  reason string,
  role_id_distinct_cnt BIGINT,
  log_cnt BIGINT
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.11',
  'connector.topic' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'format.type' = 'json'
)
-

INSERT INTO sink_kafka 
SELECT 
 DATE_FORMAT(TUMBLE_START(wm, INTERVAL '5' MINUTE), '-MM-dd HH:mm:ss') AS 
window_time,
 server,
 reason,
 COUNT(DISTINCT role_id) AS role_id_distinct_cnt,
 COUNT(1) AS log_cnt
FROM source_kafka 
GROUP BY TUMBLE(wm, INTERVAL '5' MINUTE),server,reason

?????? ?????????????? Flink SQL State ????????

2020-05-26 Thread ????
Hi,


??




--  --
??: "Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942

LakeShen 

??????Flink SQL UDF ????????

2020-06-08 Thread 1048262223
Hi


pbpb??schema(descriptor)??TypeInformationenv.addSource().returns()??TypeInformation??TypeInformation??


??udfudfudf??udfudf


??


Best,
Yichao Yang




--  --
??: "forideal"

?????? Flink SQL UDF ????????

2020-06-08 Thread 1048262223
Hi


+1??avro??json??formatpb??flink-protobuf??formats??git
https://github.com/yangyichao-mango/flink-protobuf
pb format


Best,
Yichao Yang




--  --
??: "Jark Wu"

?????? Flink SQL UDF ????????

2020-06-09 Thread kcz
udfudf




--  --
??: "Benchao Li"

?????? Flink SQL UDF ????????

2020-06-09 Thread 1048262223
Hi


Map


Best,
Yichao Yang




--  --
??: "kcz"<573693...@qq.com>;
: 2020??6??9??(??) 4:49
??: "user-zh"

?????? Flink SQL UDF ????????

2020-06-09 Thread kcz
map ??tks??




--  --
??: "1048262223"<1048262...@qq.com>;
: 2020??6??9??(??) 4:51
??: "user-zh"

??Flink SQL????NULL??????????????????

2020-06-25 Thread ????????
Hi,all:


Flink sql ??<>!=nullhive?
hive.
.

????????Flink SQL????NULL??????????????????

2020-06-26 Thread Yichao Yang
Hi


??


[1]http://apache-flink.147419.n8.nabble.com/flink-sql-null-false-td3640.html#a3658


Best,
Yichao Yang




--  --
??: ""<153488...@qq.com>;
: 2020??6??25??(??) 11:31
??: "user-zh"

flink sql if ????????????

2020-06-30 Thread kcz
flink-1.10.1 blink_planner
if 
Cannot apply 'IF' to arguments of type 'IF(

?????? flink sql if ????????????

2020-06-30 Thread kcz
tks 




--  --
??: "Benchao Li"

flink sql ??????kafka??????????????????????key??

2020-07-07 Thread op
hi??
  flink sql ??kafka??key
kafka connectorkey??

 

?????? flink sql ??????kafka??????????????????????key??

2020-07-07 Thread op





--  --
??: "Leonard Xu"https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.
 
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records#FLIP107:Readingtablecolumnsfromdifferentpartsofsourcerecords-Kafka:ETL:read,transformandwritebackwithkey,value.Allfieldsofthekeyarepresentinthevalueaswell.>;

> ?? 2020??7??717:01??op <520075...@qq.com> ??
> 
> hi??
> &nbsp; flink sql ??kafka??key
> kafka connectorkey??
> 
> &nbsp;

Flink Sql 问题

2020-07-27 Thread air23
你好 



Re: flink sql eos

2020-08-05 Thread Leonard Xu
Hi

> 目前仅有kafka实现了TwoPhaseCommitSinkFunction,但kafka的ddl中也没有属性去设
> 置Semantic为EXACTLY_ONCE

除了Kafka还有filesystem connector也是支持 EXACTLY ONCE的,kafka 的已经在1.12支持了[1]


> 当开启全局EXACTLY_ONCE并且所有使用的connector都支持EXACTLY_ONCE,是否整个应
> 用程序就可以做到端到端的精确一致性

是的。 

祝好
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-15221 


Flink SQL No Watermark

2020-08-11 Thread forideal
大家好,请教一个问题


   我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 
watermark。消费大量的数据的时候,就无法生成watermark。
   一直是No Watermark。 暂时找不到排查问题的思路。
  Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 
EventTime mode 模式,Blink Planner。
|
No Watermark |
   SQL如下


  DDL:
   create table test(
   user_id varchar,
   action varchar,
   event_time TIMESTAMP(3),
   WATERMARK FOR event_time AS event_time - INTERVAL '10' 
SECOND
   ) with();


  DML:
insert into
  console
select
  user_id,
  f_get_str(bind_id) as id_list
from
  (
select
  action as bind_id,
  user_id,
  event_time
from
  (
SELECT
  user_id,
  action,
  PROCTIME() as proc_time,
  event_time
FROM
  test
  ) T
where
  user_id is not null
  and user_id <> ''
  and CHARACTER_LENGTH(user_id) = 24
  ) T
group by
  SESSION(event_time, INTERVAL '10' SECOND),
  user_id
 
Best forideal


Flink SQL 问题;

2020-08-27 Thread air23
你好 我用idea本地运行的jdbc sink没有问题。但是在服务器上运行报错如下


麻烦帮忙看下什么问题 是1.11 版本的


'connector'='jdbc'
'password'=''
'sink.buffer-flush.interval'='10s'
'sink.buffer-flush.max-rows'='500'
'table-name'='flink_test3'
'url'='jdbc:mysql://**:4000/test'
'username'='root'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at com.air.Tidb1.main(Tidb1.java:132)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
connector using option ''connector'='jdbc''.
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 34 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'jdbc' that implements 
'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.


Available factory identifiers are:


blackhole
print
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 35 more

flink-sql 1.11??????????????????checkpoint??

2020-09-08 Thread ??????
enableCheckpointing??checkpoint??

?????? flink-sql 1.11??????????????????checkpoint??

2020-09-08 Thread ??????
savapoint??cancel??


--  --
??: 
   "user-zh"



Flink sql权限

2020-09-11 Thread 163

请问,flink sql支持元数据的权限校验吗?例如使用hive catalog时,支持hive的权限检查?如果目前不支持,未来是否会考虑?



  1   2   3   4   5   6   7   8   9   10   >