?????? blinkSQL????????????????????state??

2019-09-11 文章 ????



    tableEnv.registerDataStream("testCountTable", waterMarkStream, 
'curuserid,'timelong,'rowtime.rowtime)
    
    val result = tableEnv.sqlQuery(s"SELECT COUNT(0) as 
pv,COUNT(distinct curuserid)" +
      s" as uv,TUMBLE_END(rowtime, INTERVAL '10' MINUTE) FROM 
testCountTable GROUP BY TUMBLE(rowtime, INTERVAL '10' MINUTE)")


    val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](result)


    val data=dsRow.map(w => {
      val StrArrary = w.toString.split(",")
      val 
str="{\"pv\":"+"\""+StrArrary(0)+"\""+",\"uv\":"+"\""+StrArrary(1)+"\""+",\"rowtime\":"+"\""+StrArrary(2)+"\""+"}"
      str
    })
    data.print()





--  --
??: "Jark Wu"

Re: blinkSQL架构会自动清理过期的state吗

2019-09-11 文章 Jark Wu
Hi,
能提供下 SQL 么?

blink sql 的 window 理论上是会自动清理的。 

> 在 2019年9月11日,18:56,守护 <346531...@qq.com> 写道:
> 
> 社区各位大佬:
> 请教一个问题,flink1.9中使用blink 
> SQL语句,设置时间窗口,state存储方式选择FSStateBackend,现在发现State一直在增大,过了窗口后也没有删除过期state,是blink架构就不支持窗口自动清理state吗,还是我哪使用的不对,我测试1.9的flinkSQL是不会有这个问题的。



Re: Flink 写ES ConcurrentModificationException 异常

2019-09-11 文章 王佩
不是代码的问题,代码里边没有遍历List时进行了remove。看报错是从org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer类里报出来的。
BufferingNoOpRequestIndexer 类不是线程安全的。

wang jinhai  于2019年9月10日周二 下午4:36写道:

> 这不是flink问题吧。你代码遍历List时进行了remove操作,导致这个问题。解决方案是iterator遍历,并iterator.remove即可
>
> 在 2019/9/10 下午4:18,“王佩” 写入:
>
> 用Flink 写ES ConcurrentModificationException 遇到以下异常:
>
> 2019-09-10 08:13:14 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
> 1417 Job kafka_to_es (ba125eebbe5d09c7d224c7f2a05143b8) switched from
> state RUNNING to FAILING.
> java.util.ConcurrentModificationException
> at
> java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at
> org.apache.flink.streaming.connectors.elasticsearch.BufferingNoOpRequestIndexer.processBufferedRequests(BufferingNoOpRequestIndexer.java:64)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:387)
> at
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:307)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:745)
> 2019-09-10 08:13:14 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph transitionState
> 1417 Source: KafkaSource (1/7) (4ac0c0f04e782a8c0f6d102b2e524860)
> switched from RUNNING to CANCELING.
> 2

答复: flink sql中怎么表达窗口的提前触发或延迟触发

2019-09-11 文章 苏 欣
感谢大佬解答,对于处理窗口迟到数据的话是不是可以通过setIdleStateRetentionTime方法来设置?



发送自 Windows 10 版邮件应用




发件人: Benchao Li 
发送时间: Wednesday, September 11, 2019 6:38:44 PM
收件人: user-zh@flink.apache.org 
主题: Re: flink sql中怎么表达窗口的提前触发或延迟触发

目前社区的1.9版本的blink-planner在parser层面还不支持,可以通过全局config来配置:
table.exec.emit.early-fire.enabled
table.exec.emit.early-fire.delay

可以尝试一下。

苏 欣  于2019年9月11日周三 上午11:45写道:

> Blink文档中有介绍到EMIT Strategy,可以用WITH DELAY '1' MINUTE BEFORE WATERMARK或者EMIT
> WITHOUT DELAY AFTER WATERMARK等类似的语法来控制窗口触发。
> 但是我使用这种语法作业运行就会报SQL解析错误,请问有没有办法可以在sql中实现控制窗口触发的操作?
> Table result = tEnv.sqlQuery("select " +
> "count(*) " +
> "from dept group by tumble(crt_time, INTERVAL '10' SECOND)
> WITH DELAY '1' MINUTE BEFORE WATERMARK");
> 报错:
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> 
> ERR_ID:
>  SQL-00120001
> CAUSE:
>  SQL parse failed:
>  Encountered "WITH" at line 1, column 75.
>  Was expecting one of:
>  
>  "ORDER" ...
>  "LIMIT" ...
>  "OFFSET" ...
>  "FETCH" ...
>  "," ...
>
> 发送自 Windows 10 版邮件应用
>
>

--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


blinkSQL????????????????????state??

2019-09-11 文章 ????
??
??flink1.9??blink 
SQLstateFSStateBackend??Statestateblinkstate??1.9??flinkSQL

Re: flink sql中怎么表达窗口的提前触发或延迟触发

2019-09-11 文章 Benchao Li
目前社区的1.9版本的blink-planner在parser层面还不支持,可以通过全局config来配置:
table.exec.emit.early-fire.enabled
table.exec.emit.early-fire.delay

可以尝试一下。

苏 欣  于2019年9月11日周三 上午11:45写道:

> Blink文档中有介绍到EMIT Strategy,可以用WITH DELAY '1' MINUTE BEFORE WATERMARK或者EMIT
> WITHOUT DELAY AFTER WATERMARK等类似的语法来控制窗口触发。
> 但是我使用这种语法作业运行就会报SQL解析错误,请问有没有办法可以在sql中实现控制窗口触发的操作?
> Table result = tEnv.sqlQuery("select " +
> "count(*) " +
> "from dept group by tumble(crt_time, INTERVAL '10' SECOND)
> WITH DELAY '1' MINUTE BEFORE WATERMARK");
> 报错:
> Exception in thread "main" org.apache.flink.table.api.SqlParserException:
> 
> ERR_ID:
>  SQL-00120001
> CAUSE:
>  SQL parse failed:
>  Encountered "WITH" at line 1, column 75.
>  Was expecting one of:
>  
>  "ORDER" ...
>  "LIMIT" ...
>  "OFFSET" ...
>  "FETCH" ...
>  "," ...
>
> 发送自 Windows 10 版邮件应用
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复: 编译flink 1.9 flink-table-api-java 编译不过

2019-09-11 文章 venn
非常感谢,jdk 升到 1.8.0_111 解决了



-邮件原件-
发件人: user-zh-return-1139-wxchunjhyy=163@flink.apache.org 
 代表 Zili Chen
发送时间: Wednesday, September 11, 2019 10:35 AM
收件人: user-zh 
主题: Re: 编译flink 1.9 flink-table-api-java 编译不过

看起来是一个 JDK 的 bug
https://stackoverflow.com/questions/25523375/java8-lambdas-and-exceptions

你可以升级 JDK 的小版本吗?我在 8.0.212 上没遇到这个问题。

Best,
tison.


venn  于2019年9月11日周三 上午10:26写道:

> 各位大佬,请教一下编译Flink 1.9 的问题,编译 flink-table-api-java 的时候 
> 只
> 要有  “.orElseThrow(() -> new ValidationException("Undefined function: "
> + lookupCall.getUnresolvedName()));”  就不能通过编译,jdk版本是 
>  1.8.0_91,请
> 问各位大佬应该怎么处理。
>
> 报错如下:
>
>
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile
> (default-compile) on project flink-table-api-java: Compilation failure
>
> [ERROR]
>
> /home/venn/git/flink/flink-table/flink-table-api-java/src/main/java/or
> g/apac
>
> he/flink/table/operations/utils/factories/CalculatedTableFactory.java:
> [90,53 ] unreported exception X; must be caught or declared to be
> thrown
>
> [ERROR]
>
> [ERROR] -> [Help 1]
>
> [ERROR]
>
> [ERROR] To see the full stack trace of the errors, re-run Maven with
> the -e switch.
>
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>
> [ERROR]
>
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
>
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
>
> [ERROR]
>
> [ERROR] After correcting the problems, you can resume the build with
> the command
>
> [ERROR]   mvn  -rf :flink-table-api-java
>
>
>
>