Hadoop Error on ECS Fargate

2023-07-13 Thread Wang, Mengxi X via user
Hi community,

We got this kuerberos error with Hadoop as file system on ECS Fargate 
deployment.

Caused by: org.apache.hadoop.security.KerberosAuthException: failure to login: 
javax.security.auth.login.LoginException: java.lang.NullPointerException: 
invalid null input: name

Caused by: javax.security.auth.login.LoginException: 
java.lang.NullPointerException: invalid null input: name

We don't actually need Kerberos authentication so I've added properties to 
disable Hadoop Kerberos authentication to flink-config.yaml and I can see from 
logs they've been picked up. But still the errors persist. Can anybody help 
please?

Best wishes,
Mengxi Wang


This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.


Part files generated in reactive mode

2023-07-04 Thread Wang, Mengxi X via user
Hi,

We want to process one 2GB file and the output should also be a single 2GB 
file, but after we enabled reactive mode it generated several hundred small 
output files instead of one 2GB file. Can anybody help please?

Best wishes,
Mengxi Wang


This message is confidential and subject to terms at: 
https://www.jpmorgan.com/emaildisclaimer including on confidential, privileged 
or legal entity information, malicious content and monitoring of electronic 
messages. If you are not the intended recipient, please delete this message and 
notify the sender immediately. Any unauthorized use is strictly prohibited.


yarn per????????????rocksDB??????????localdir--v1.10.1

2021-01-19 Thread x
flinkLocal DB files directory XXX 
does not exist and cannot be created

??????????????????allowedLateness??????????????????????????????????????????????????????

2020-11-29 Thread x
??
.keyBy.window(TumblingEventTimeWindows.of(Time.minutes(1))).allowedLateness(Time.hours(1)).aggregate(new
 AggregateFunction,new ProcessWindowFunctionBloomFilter)

??????kafka broker ????????????????????????????

2020-10-26 Thread x
kafka0.10.1.1??flink1.10.1
??
2020-10-2421:52:44,053INFOorg.apache.flink.kafka.shaded.org.apache.kafka.clients.FetchSessionHandler-[ConsumerclientId=consumer-12,groupId=onlineTag]Errorsendingfetchrequest(sessionId=INVALID,epoch=INITIAL)tonode10:org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.DisconnectException.
2020-10-2421:53:13,254INFOorg.apache.flink.kafka.shaded.org.apache.kafka.clients.FetchSessionHandler-[ConsumerclientId=consumer-20,groupId=onlineTag]Errorsendingfetchrequest(sessionId=INVALID,epoch=INITIAL)tonode11:org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.DisconnectException.

?????? ????????????????????????????????????????

2020-10-19 Thread x
??KeyedProcessFunctionProcessWindowFunction.




----
??: 
   "user-zh"



??????????????????????????????????????

2020-10-19 Thread x
1224




----
??: 
   "user-zh"



??????????????????????????????????????

2020-10-19 Thread x
1224




--
??: 
   "user-zh"

<584680...@qq.com;
:2020??10??15??(??) 3:47
??:"user-zh"

????????????????????????????????????????

2020-10-19 Thread x
??v1.10.1
AggregateFunction+ProcessWindowFunction??ProcessWindowFunction??+ProcessWindowFunction??clear??23??59??00??override
 def clear(ctx: Context): Unit = {
  val dt = new SimpleDateFormat("HH:mm:00").format(ctx.window.getStart)
  if(dt.equals("23:59:00")){

state.clear()??keyBykey

?????? ????savepoint

2020-09-04 Thread x





----
??: 
   "x"  
  <35907...@qq.com;
:2020??9??3??(??) 12:22
??:"user-zh"https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html

????~

x <35907...@qq.com ??2020??9??3?? 11:30??

 /flink/flink-1.10.1/bin/flink cancel -s
 hdfs://nameservice1/user/flink_1.10.1/flink-savepoints
 f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
 Unrecognized option: -yid

?????? ????savepoint

2020-09-02 Thread x
??V1.10.1??


----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html

????~

x <35907...@qq.com ??2020??9??3?? 11:30??

 /flink/flink-1.10.1/bin/flink cancel -s
 hdfs://nameservice1/user/flink_1.10.1/flink-savepoints
 f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
 Unrecognized option: -yid

????savepoint

2020-09-02 Thread x
/flink/flink-1.10.1/bin/flink cancel -s 
hdfs://nameservice1/user/flink_1.10.1/flink-savepoints 
f95e51fd12a83907b6ca8c52eb2614ae -yid application_1583831804206_1106301
Unrecognized option: -yid

?????? ProcessWindowFunction??????clear??????????????????-v1.10.1

2020-08-27 Thread x
??8??
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))




----
??: 
   "x"  
  <35907...@qq.com;
:2020??8??27??(??) 2:00
??:"user-zh@flink.apache.org"

?????? ProcessWindowFunction??????clear??????????????????-v1.10.1

2020-08-27 Thread x
10??1??.window(TumblingEventTimeWindows.of(Time.minutes(10))).trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))10


----
??: 
   "user-zh"



??????????????????UV??????????????MapState??BloomFilter,??checkpoint????????????????????

2020-08-26 Thread x
UV??MapStateBloomFilter??,checkpoint??bloomMapState

ProcessWindowFunction??????clear??????????????????-v1.10.1

2020-08-25 Thread x
ProcessWindowFunction??clearenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)??
.window(TumblingEventTimeWindows.of(Time.days(1)))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new ProcessWindowFunction[IN,OUT,KEY,TimeWindow]{
private var state: MapState[String,Boolean] = _
override def open
override def process
override def clear(ctx: Context): Unit = {
state.clear()
}
}

回复: 求助:FLINKSQL1.10实时统计累计UV

2020-07-07 Thread x
您说的这种方式,V1.10.1 不支持吧,我看参数只有一个String类型的
void sqlUpdate(String stmt);




--原始邮件--
发件人:"seeksst"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time
 gt; gt; x 35907418@qq.comamp;gt; 于2020年7月6日周一 上午11:15写道: gt; gt; amp;gt; 
版本是1.10.1,最后sink的时候确实是一个window里面做count gt; amp;gt; 
distinct操作。请问是只要计算过程中含有一个window里面做count gt; amp;gt; gt; 
distinct操作,就会造成所有状态过期不自动清理吗?实际我window这步的状态很小,groupamp;amp;nbsp;DATE_FORMAT(rowtm,
 gt; amp;gt; '-MM-dd') 这个sql对应的状态很大。代码如下: gt; amp;gt; val rt_totaluv_view : 
Table = tabEnv.sqlQuery( gt; amp;gt;amp;nbsp;amp;nbsp; """ gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT MAX(DATE_FORMAT(rowtm, 
'-MM-dd gt; HH:mm:00')) gt; amp;gt; time_str,COUNT(DISTINCT userkey) uv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM source gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY DATE_FORMAT(rowtm, 
'-MM-dd') gt; amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view) gt; amp;gt; gt; 
amp;gt; val totaluvTmp = gt; 
tabEnv.toRetractStream[(String,Long)](rt_totaluv_view) gt; 
amp;gt;amp;nbsp;amp;nbsp; .filter( line =amp;amp;gt; line._1 == true ).map( 
line gt; =amp;amp;gt; line._2 ) gt; amp;gt; gt; amp;gt; val totaluvTabTmp = 
tabEnv.fromDataStream( totaluvTmp ) gt; amp;gt; gt; amp;gt; tabEnv.sqlUpdate( 
gt; amp;gt;amp;nbsp;amp;nbsp; s""" gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; INSERT INTO mysql_totaluv gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; SELECT _1,MAX(_2) gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; FROM $totaluvTabTmp gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; GROUP BY _1 gt; 
amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp; """) gt; amp;gt; 
--amp;amp;nbsp;原始邮件amp;amp;nbsp;-- gt; amp;gt; 
发件人:amp;amp;nbsp;"Benchao Li"libenchao@apache.orgamp;amp;gt;; gt; amp;gt; 
发送时间:amp;amp;nbsp;2020年7月3日(星期五) 晚上9:47 gt; amp;gt; 
收件人:amp;amp;nbsp;"user-zh"user-zh@flink.apache.orgamp;amp;gt;; gt; amp;gt; gt; 
amp;gt; 主题:amp;amp;nbsp;Re: 求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; gt; amp;gt; gt; 
amp;gt; gt; amp;gt; 你用的是哪个版本?之前是存在一个类似问题的[1],是在window里面做count distinct会有这个问题, 
gt; amp;gt; 这个已经在1.11中修复了。 gt; amp;gt; gt; amp;gt; [1] 
https://issues.apache.org/jira/browse/FLINK-17942 gt; amp;gt; gt; amp;gt; x 
35907418@qq.comamp;amp;gt; 于2020年7月3日周五 下午4:34写道: gt; amp;gt; gt; amp;gt; 
amp;amp;gt; 您好,我程序运行一段时间后,发现checkpoint文件总在增长,应该是状态没有过期, gt; amp;gt; amp;amp;gt; 
gt; amp;gt; amp;amp;gt; gt; amp;gt; gt; 
我配置了tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),按理说,日期是前一天的key对应的状态会在第二天过期的。
 gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
--amp;amp;amp;nbsp;原始邮件amp;amp;amp;nbsp;-- gt; 
amp;gt; amp;amp;gt; 发件人:amp;amp;amp;nbsp;"Jark 
Wu"imjark@gmail.comamp;amp;amp;gt;; gt; amp;gt; amp;amp;gt; 
发送时间:amp;amp;amp;nbsp;2020年6月18日(星期四) 中午12:16 gt; amp;gt; amp;amp;gt; 
收件人:amp;amp;amp;nbsp;"user-zh"user-zh@flink.apache.org gt; amp;amp;amp;gt;; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 主题:amp;amp;amp;nbsp;Re: 
求助:FLINKSQL1.10实时统计累计UV gt; amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; gt; 
amp;gt; amp;amp;gt; gt; amp;gt; amp;amp;gt; 是的,我觉得这样子是能绕过的。 gt; amp;gt; 
amp;amp;gt; gt; amp;gt; amp;amp;gt; On Thu, 18 Jun 2020 at 10:34, x 
35907418@qq.comamp;amp;amp;gt; gt; wrote: gt; amp;gt; amp;amp;gt; gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; 如果是1.10的话,我通过表转流,再转表的方式实现了,您看合理吗? gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt; val resTmpTab: Table = tabEnv.sqlQuery( gt; amp;gt; 
amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """ gt; amp;gt; 
amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 SELECT gt; amp;gt; MAX(DATE_FORMAT(ts, '-MM-dd gt; amp;gt; amp;amp;gt; 
HH:mm:00')) gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; time_str,COUNT(DISTINCT 
userkey) uv gt; amp;gt; amp;amp;gt; gt; 
amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp;
 FROM gt; amp;gt; 
user_behavioramp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; GROUP BY gt; 
amp;gt; amp;amp;gt; DATE_FORMAT(ts, gt; 
'-MM-dd')amp;amp;amp;nbsp;amp;amp;amp;nbsp;amp;amp;amp;nbsp; """) gt; 
amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; val 
gt; amp;gt; resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab) gt; 
amp;gt; amp;amp;gt; amp;amp;amp;gt;amp;amp;amp;nbsp;amp;amp;amp;nbsp; gt; 
amp;gt; amp;amp;gt; gt; 
.filter(line=amp;amp;amp;amp;gt;line._1==true).map(line=amp;amp;amp;amp;gt;line._2)
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; gt; amp;gt; amp;amp;gt; 
amp;amp;amp;gt; val res= tabEnv.fromDataStream(resTmpStream) gt; amp;gt; 
amp;amp;g

?????? ??????FLINKSQL1.10????????????UV

2020-07-06 Thread x
??blinkval setttings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()





----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

 x <35907...@qq.comgt; ??2020??7??6?? 11:15??

 gt; ??1.10.1??sinkwindow??count
 gt; distinct??window??count
 gt;
 
distinct??windowgroupamp;nbsp;DATE_FORMAT(rowtm,
 gt; '-MM-dd') sql??
 gt; val rt_totaluv_view : Table = tabEnv.sqlQuery(
 gt;nbsp;nbsp; """
 gt;nbsp;nbsp;nbsp;nbsp; SELECT 
MAX(DATE_FORMAT(rowtm, '-MM-dd
 HH:mm:00'))
 gt; time_str,COUNT(DISTINCT userkey) uv
 gt;nbsp;nbsp;nbsp;nbsp; FROM source
 gt;nbsp;nbsp;nbsp;nbsp; GROUP BY 
DATE_FORMAT(rowtm, '-MM-dd')
 gt;nbsp;nbsp;nbsp;nbsp; """)
 gt; tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)
 gt;
 gt; val totaluvTmp =
 tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
 gt;nbsp;nbsp; .filter( line =amp;gt; line._1 == true 
).map( line
 =amp;gt; line._2 )
 gt;
 gt; val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )
 gt;
 gt; tabEnv.sqlUpdate(
 gt;nbsp;nbsp; s"""
 gt;nbsp;nbsp;nbsp;nbsp; INSERT INTO mysql_totaluv
 gt;nbsp;nbsp;nbsp;nbsp; SELECT _1,MAX(_2)
 gt;nbsp;nbsp;nbsp;nbsp; FROM $totaluvTabTmp
 gt;nbsp;nbsp;nbsp;nbsp; GROUP BY _1
 gt;nbsp;nbsp;nbsp;nbsp; """)
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"Benchao 
Li"https://issues.apache.org/jira/browse/FLINK-17942
 gt;
 gt; x <35907...@qq.comamp;gt; ??2020??7??3?? 4:34??
 gt;
 gt; amp;gt; 
checkpoint??
 gt; amp;gt;
 gt; amp;gt;
 gt;
 
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt;
 
--amp;amp;nbsp;amp;amp;nbsp;--
 gt; amp;gt; ??:amp;amp;nbsp;"Jark 
Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; [2]
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; x 
<35907...@qq.com
 amp;amp;amp;amp;gt;
 gt; ??2020??6??17?? 11:14??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; 
??0??UV??UV??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; CREATE
 VIEW uv_per_10min AS
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 SELECTamp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 amp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; 
MAX(DATE_FORMAT(proctimeamp;amp;amp;amp;amp;nbsp;,
 gt; amp;gt; amp;amp;gt; '-MM-dd
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 HH:mm:00'))amp;amp;amp;amp;amp;nbsp;OVER w
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; AS
 gt; time_str,amp;amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 amp;amp;amp;amp;amp;nbsp;
 gt; COUNT(DISTINCT user_id) OVER
 gt; amp;gt; w AS uv
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; FROM
 user_behavior
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; WINDOW w
 AS (ORDER BY proctime
 gt; ROWS BETWEEN
 gt; amp;gt; UNBOUNDED
 gt; amp;gt; amp;amp;gt; PRECEDING AND
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; CURRENT
 ROW);
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; PARTITION
 BY
 gt; DATE_FORMAT(rowtm, '-MM-dd')
 gt; amp;gt; amp;amp;gt; ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt;
 PS??1.10??DDL??CREATE
 gt; VIEW??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;gt; 
 gt;
 gt;
 gt;
 gt; --
 gt;
 gt; Best,
 gt; Benchao Li



 --

 Best,
 Benchao Li



-- 

Best,
Benchao Li

?????? ??????FLINKSQL1.10????????????UV

2020-07-05 Thread x
sorry,group agg.
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7))??.


----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

x <35907...@qq.com ??2020??7??6?? 11:15??

 ??1.10.1??sinkwindow??count
 distinct??window??count
 
distinct??windowgroupnbsp;DATE_FORMAT(rowtm,
 '-MM-dd') sql??
 val rt_totaluv_view : Table = tabEnv.sqlQuery(
 """
 SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd 
HH:mm:00'))
 time_str,COUNT(DISTINCT userkey) uv
 FROM source
 GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
 """)
 tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)

 val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
 .filter( line =gt; line._1 == true ).map( line =gt; 
line._2 )

 val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )

 tabEnv.sqlUpdate(
 s"""
 INSERT INTO mysql_totaluv
 SELECT _1,MAX(_2)
 FROM $totaluvTabTmp
 GROUP BY _1
 """)
 --nbsp;nbsp;--
 ??:nbsp;"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942

 x <35907...@qq.comgt; ??2020??7??3?? 4:34??

 gt; 
checkpoint??
 gt;
 gt;
 
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; amp;gt; amp;amp;gt; [2]
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; x 
<35907...@qq.comamp;amp;amp;gt;
 ??2020??6??17?? 11:14??
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; 
??0??UV??UV??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; CREATE VIEW 
uv_per_10min AS
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
SELECTamp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;nbsp;
 gt; MAX(DATE_FORMAT(proctimeamp;amp;amp;amp;nbsp;,
 gt; amp;gt; '-MM-dd
 gt; amp;gt; amp;amp;gt; 
HH:mm:00'))amp;amp;amp;amp;nbsp;OVER w
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; AS
 time_str,amp;amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
amp;amp;amp;amp;nbsp;
 COUNT(DISTINCT user_id) OVER
 gt; w AS uv
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; FROM 
user_behavior
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; WINDOW w AS 
(ORDER BY proctime
 ROWS BETWEEN
 gt; UNBOUNDED
 gt; amp;gt; PRECEDING AND
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; CURRENT ROW);
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; PARTITION BY
 DATE_FORMAT(rowtm, '-MM-dd')
 gt; amp;gt; ??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 
PS??1.10??DDL??CREATE
 VIEW??
 gt; amp;gt; amp;amp;gt; amp;amp;amp;gt; 



 --

 Best,
 Benchao Li



-- 

Best,
Benchao Li

?????? ??????FLINKSQL1.10????????????UV

2020-07-05 Thread x
??1.10.1??sinkwindow??count 
distinct??window??count 
distinct??windowgroupDATE_FORMAT(rowtm,
 '-MM-dd') sql??
val rt_totaluv_view : Table = tabEnv.sqlQuery(
  """
SELECT MAX(DATE_FORMAT(rowtm, '-MM-dd HH:mm:00')) 
time_str,COUNT(DISTINCT userkey) uv
FROM source
GROUP BY DATE_FORMAT(rowtm, '-MM-dd')
""")
tabEnv.createTemporaryView("rt_totaluv_view",rt_totaluv_view)

val totaluvTmp = tabEnv.toRetractStream[(String,Long)](rt_totaluv_view)
  .filter( line = line._1 == true ).map( line = line._2 )

val totaluvTabTmp = tabEnv.fromDataStream( totaluvTmp )

tabEnv.sqlUpdate(
  s"""
INSERT INTO mysql_totaluv
SELECT _1,MAX(_2)
FROM $totaluvTabTmp
GROUP BY _1
""")
----
??:"Benchao Li"https://issues.apache.org/jira/browse/FLINK-17942

x <35907...@qq.com ??2020??7??3?? 4:34??

 
checkpoint??

 
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key




 --nbsp;nbsp;--
 ??:nbsp;"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; amp;gt; [2]
 gt; amp;gt;
 gt; amp;gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt; amp;gt;
 gt; amp;gt; x <35907...@qq.comamp;amp;gt; 
??2020??6??17?? 11:14??
 gt; amp;gt;
 gt; amp;gt; amp;amp;gt;
 
??0??UV??UV??
 gt; amp;gt; amp;amp;gt; CREATE VIEW uv_per_10min AS
 gt; amp;gt; amp;amp;gt; SELECTamp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;nbsp;
 MAX(DATE_FORMAT(proctimeamp;amp;amp;nbsp;,
 gt; '-MM-dd
 gt; amp;gt; HH:mm:00'))amp;amp;amp;nbsp;OVER w
 gt; amp;gt; amp;amp;gt; AS time_str,amp;amp;amp;nbsp;
 gt; amp;gt; amp;amp;gt; amp;amp;amp;nbsp; 
COUNT(DISTINCT user_id) OVER
 w AS uv
 gt; amp;gt; amp;amp;gt; FROM user_behavior
 gt; amp;gt; amp;amp;gt; WINDOW w AS (ORDER BY proctime ROWS 
BETWEEN
 UNBOUNDED
 gt; PRECEDING AND
 gt; amp;gt; amp;amp;gt; CURRENT ROW);
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt;
 gt; amp;gt; amp;amp;gt; ??
 gt; amp;gt; amp;amp;gt; PARTITION BY DATE_FORMAT(rowtm, 
'-MM-dd')
 gt; ??
 gt; amp;gt; amp;amp;gt; 
PS??1.10??DDL??CREATE VIEW??
 gt; amp;gt; amp;amp;gt; 



-- 

Best,
Benchao Li

?????? ??????FLINKSQL1.10????????????UV

2020-07-03 Thread x
checkpoint??
tableConfig.setIdleStateRetentionTime(Time.minutes(2),Time.minutes(7)),??key




----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 gt; [2]
 gt;
 gt;
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
 gt;
 gt; x <35907...@qq.comamp;gt; ??2020??6??17?? 11:14??
 gt;
 gt; amp;gt; 
??0??UV??UV??
 gt; amp;gt; CREATE VIEW uv_per_10min AS
 gt; amp;gt; SELECTamp;amp;nbsp;
 gt; amp;gt; amp;amp;nbsp; 
MAX(DATE_FORMAT(proctimeamp;amp;nbsp;,
 '-MM-dd
 gt; HH:mm:00'))amp;amp;nbsp;OVER w
 gt; amp;gt; AS time_str,amp;amp;nbsp;
 gt; amp;gt; amp;amp;nbsp; COUNT(DISTINCT user_id) OVER w AS 
uv
 gt; amp;gt; FROM user_behavior
 gt; amp;gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED
 PRECEDING AND
 gt; amp;gt; CURRENT ROW);
 gt; amp;gt;
 gt; amp;gt;
 gt; amp;gt; ??
 gt; amp;gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd')
 ??
 gt; amp;gt; PS??1.10??DDL??CREATE VIEW??
 gt; amp;gt; 

?????? ??????FLINKSQL1.10????????????UV

2020-06-17 Thread x
??1.10??,???
val resTmpTab: Table = tabEnv.sqlQuery(
  """
SELECT MAX(DATE_FORMAT(ts, '-MM-dd HH:mm:00')) time_str,COUNT(DISTINCT 
userkey) uv
FROM user_behaviorGROUP BY DATE_FORMAT(ts, '-MM-dd')""")

val resTmpStream=tabEnv.toRetractStream[(String,Long)](resTmpTab)
  .filter(line=line._1==true).map(line=line._2)

val res= tabEnv.fromDataStream(resTmpStream)
tabEnv.sqlUpdate(
  s"""
INSERT INTO rt_totaluv
SELECT _1,MAX(_2)
FROM $res
GROUP BY _1
""")


----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 [2]

 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

 x <35907...@qq.comgt; ??2020??6??17?? 11:14??

 gt; 
??0??UV??UV??
 gt; CREATE VIEW uv_per_10min AS
 gt; SELECTamp;nbsp;
 gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;, '-MM-dd
 HH:mm:00'))amp;nbsp;OVER w
 gt; AS time_str,amp;nbsp;
 gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
 gt; FROM user_behavior
 gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING 
AND
 gt; CURRENT ROW);
 gt;
 gt;
 gt; ??
 gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
 gt; PS??1.10??DDL??CREATE VIEW??
 gt; 

?????? ??????FLINKSQL1.10????????????UV

2020-06-17 Thread x





----
??:"Jark Wu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 [2]

 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

 x <35907...@qq.comgt; ??2020??6??17?? 11:14??

 gt; 
??0??UV??UV??
 gt; CREATE VIEW uv_per_10min AS
 gt; SELECTamp;nbsp;
 gt; amp;nbsp; MAX(DATE_FORMAT(proctimeamp;nbsp;, '-MM-dd
 HH:mm:00'))amp;nbsp;OVER w
 gt; AS time_str,amp;nbsp;
 gt; amp;nbsp; COUNT(DISTINCT user_id) OVER w AS uv
 gt; FROM user_behavior
 gt; WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING 
AND
 gt; CURRENT ROW);
 gt;
 gt;
 gt; ??
 gt; PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
 gt; PS??1.10??DDL??CREATE VIEW??
 gt; 

?????? ??????FLINKSQL1.10????????????UV

2020-06-16 Thread x
??"??"UV??
sink??
tm uv
2020/06/17 13:46:00 1
2020/06/17 13:47:00 2
2020/06/17 13:48:00 3


group by ??


----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html

x <35907...@qq.com ??2020??6??17?? 11:14??

 
??0??UV??UV??
 CREATE VIEW uv_per_10min AS
 SELECTnbsp;
 nbsp; MAX(DATE_FORMAT(proctimenbsp;, '-MM-dd 
HH:mm:00'))nbsp;OVER w
 AS time_str,nbsp;
 nbsp; COUNT(DISTINCT user_id) OVER w AS uv
 FROM user_behavior
 WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND
 CURRENT ROW);


 ??
 PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') 
??
 PS??1.10??DDL??CREATE VIEW??
 

??????FLINKSQL1.10????????????UV

2020-06-16 Thread x
??0??UV??UV??
CREATE VIEW uv_per_10min AS
SELECT
 MAX(DATE_FORMAT(proctime, '-MM-dd HH:mm:00'))OVER w AS 
time_str,
 COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW);


??
PARTITION BY DATE_FORMAT(rowtm, '-MM-dd') ??
PS??1.10??DDL??CREATE VIEW??


Flink window operation based on event time is triggered when watermark is less than the end of window ends

2018-11-29 Thread X L
Please refer to the stackoverflow
.
Thanks.

-- 
Thanks.

·
Lx
wlxwol...@gmail.com