flink-1.11 写 hive 报错

2020-11-04 文章 nashcen



flink 读 kafka 写 hive,之前运行好好的。在IDEA也能正常运行,打成jar包,提交到 flink 集群,报错如下。请问是什么原因?

2020-11-05 15:34:36
org.apache.flink.connectors.hive.FlinkHiveException:
org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create
Hive RecordWriter
at
org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159)
at
org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257)
at
org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)
at
org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$43.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$19.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused 

Re: flink 1.11.2 cep rocksdb 性能调优

2020-11-04 文章 Peihui He
hi,


@Override
public UV get(UK userKey) throws IOException, RocksDBException {
   byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey,
userKeySerializer);
   byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);

   return (rawValueBytes == null ? null :
deserializeUserValue(dataInputView, rawValueBytes,
userValueSerializer));
}

@Override
public void put(UK userKey, UV userValue) throws IOException, RocksDBException {

   byte[] rawKeyBytes =
serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey,
userKeySerializer);
   byte[] rawValueBytes = serializeValueNullSensitive(userValue,
userValueSerializer);

   backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
}

通过源码跟踪发现,RocksDBMapState每次get和put都需要序列化和反序列化。。。应该是这个原因导致比较耗时。

Peihui He  于2020年11月5日周四 上午11:05写道:

> hi
>
> 我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。
>
> private void bufferEvent(IN event, long currentTime) throws Exception {
> long currentTs = System.currentTimeMillis();
> List elementsForTimestamp =  elementQueueState.get(currentTime);
> if (elementsForTimestamp == null) {
> this.bufferEventGetNullhistogram.update(System.currentTimeMillis() - 
> currentTs);
> elementsForTimestamp = new ArrayList<>();
> }else {
> 
> this.bufferEventGethistogram.update(System.currentTimeMillis()-currentTs);
> }
> elementsForTimestamp.add(event);
> long secondCurrentTs = System.currentTimeMillis();
> elementQueueState.put(currentTime, elementsForTimestamp);
> this.bufferEventPuthistogram.update(System.currentTimeMillis() - 
> secondCurrentTs);
> this.bufferEventhistogram.update(System.currentTimeMillis() - currentTs);
> }
>
> 通过复写CepOperator,加入了一些metics发现
>
> this.bufferEventhistogram = metrics.histogram("buffer_event_delay", new
> DescriptiveStatisticsHistogram(1000));
> this.bufferEventGethistogram = metrics.histogram("buffer_event_get_delay", new
> DescriptiveStatisticsHistogram(1000));
> this.bufferEventGetNullhistogram = 
> metrics.histogram("buffer_event_get_null_delay", new
> DescriptiveStatisticsHistogram(1000));
> this.bufferEventPuthistogram = metrics.histogram("buffer_event_put_delay", new
> DescriptiveStatisticsHistogram(1000));
>
> 在get和put比较耗时,整个bufferEvent 能达到200ms
> 从rocksdb的metric来看没有进行太多flush和compaction。
>
> [image: image.png]
> [image: image.png]
>
> 也参考了https://www.jianshu.com/p/2e61c2c83c57这篇文章调优过,发现效果也不是很好,一样反压。
> 也看过类似的问题http://apache-flink.147419.n8.nabble.com/rocksDB-td1785.html
> ,但是我这sst文件很小。
> 请教大家,为啥get和put这么耗时呢?有什么好的优化方案不?谢谢。
>
> Best Wishes.
>
>
>
> Best Wishes.
>
>


Re:Re:union all 丢失部分数据

2020-11-04 文章 夜思流年梦









flink 版本是1.11的版本了








在 2020-11-05 00:02:12,"hailongwang" <18868816...@163.com> 写道:
>Hi liaobiao,
>
>
>你的 flink 版本是什么呢?
>根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 upsert 
>key,这样就会产生覆盖的情况。
>你看下结果是否是这种情况的?
>
>
>Best,
>Hailong Wang
>
>
>
>
>在 2020-11-04 17:20:23,"夜思流年梦"  写道:
>>开发者好:
>>   目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all  ,但是实际情况发现 union 
>> all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了
>>   如果把union  all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对
>>
>>
>>
>>
>>原sql :
>>
>>
>>insert into dws_
>>
>>
>>select 
>>0 as id
>>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as bigint) as ftime
>>,case 
>>when dept_name like '%XX%' then 'X1'
>>when dept_name = 'xXX' then 'X2'
>>else 'X3' end as paytype
>>,count(orderid) as paynum_h 
>>,round(sum(amt)) as paymoney_h 
>>from dwd_XXX
>>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>>group by
>>DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH'), 
>>case 
>>when dept_name like '%XX%' then 'X1'
>>when dept_name = 'xXX' then 'X2'
>>else 'X3' end ;
>>
>>
>>
>>
>>union all
>>
>>
>>
>>
>>select 0 as id
>>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as int) as ftime
>>,'all' as paytype
>>,count(orderid) as paynum_h  
>>,round(sum(amt)) as paymoney_h  
>>from dwd_XXX
>>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>>group by DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') ;


Re:回复: union all 丢失部分数据

2020-11-04 文章 夜思流年梦






哦,不好意思,因为我把这条SQL是分成两段了,所以在恢复原SQL的时候没有把 分号去掉 ;


当时在union的时候,中间是不会有分号的,不然也提不上去











在 2020-11-05 10:00:01,"史 正超" <792790...@qq.com> 写道:
>你的union all上面,也就是insert into 的第一条select 末尾有 分号 ‘;’,, 
>这样的话我感觉第二条select是不会执行的。还有另一个问题是你把分号去掉,我感觉数据会被 覆盖的。因为我最近也有使用union all的场景。
>我觉得你应该这样组装 你的sql :
>```sql
>
>Insert into xxx
>Select
>   d1,
>   d2,
>   count(1)
>From (
>   Select * from a
>   Union all
>   Select * from b,
>)
>Group by d1, d2
>
>```
>
>发送自 Windows 10 版邮件应用
>
>发件人: 夜思流年梦
>发送时间: 2020年11月4日 18:21
>收件人: user-zh@flink.apache.org
>主题: union all 丢失部分数据
>
>开发者好:
>   目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all  ,但是实际情况发现 union 
> all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了
>   如果把union  all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对
>
>
>
>
>原sql :
>
>
>insert into dws_
>
>
>select
>0 as id
>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as bigint) as ftime
>,case
>when dept_name like '%XX%' then 'X1'
>when dept_name = 'xXX' then 'X2'
>else 'X3' end as paytype
>,count(orderid) as paynum_h
>,round(sum(amt)) as paymoney_h
>from dwd_XXX
>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>group by
>DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH'),
>case
>when dept_name like '%XX%' then 'X1'
>when dept_name = 'xXX' then 'X2'
>else 'X3' end ;
>
>
>
>
>union all
>
>
>
>
>select 0 as id
>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as int) as ftime
>,'all' as paytype
>,count(orderid) as paynum_h
>,round(sum(amt)) as paymoney_h
>from dwd_XXX
>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>group by DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') ;
>


Re: Re: cdc代码报错

2020-11-04 文章 Jark Wu
可以看下 flink-cdc-connectors 中用的是什么版本,使用一样的kafka-connect版本。

On Thu, 5 Nov 2020 at 14:07, hl9...@126.com  wrote:

> 确实是pom中存在一个kafka的依赖包,冲突了,去掉后问题解决。感谢大佬。
> 另外,如果我想使用kafka,就必须引入kafka的包,还是会冲突,有什么解决办法吗?
>
>
>
> hl9...@126.com
>
> 发件人: Jark Wu
> 发送时间: 2020-11-05 11:55
> 收件人: user-zh
> 主题: Re: cdc代码报错
> 环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。
>
>
> On Thu, 5 Nov 2020 at 11:35, hl9...@126.com  wrote:
>
> > flink版本1.11.2,有没有大佬遇到这个问题?
> >
> >
> >
> > hl9...@126.com
> >
> > 发件人: hl9...@126.com
> > 发送时间: 2020-11-04 16:43
> > 收件人: user-zh
> > 主题: cdc代码报错
> > Hi,all:
> > 我运行ververica/flink-cdc-connectors git上的demo代码,报错:
> > 2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out
> > (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom
> > Source -> Sink: Print to Std. Out (1/1)
> (7c3ccf7686ccfb33254e8cb785cd339d)
> > switched from RUNNING to FAILED.
> > java.lang.AbstractMethodError:
> > org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
> > at
> >
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
> > at
> >
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
> > at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:583)
> > at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:80)
> > at
> >
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
> > at
> >
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
> > at
> >
> io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
> > at
> >
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> >
> > 源码:
> > public class CdcTest {
> > public static void main(String[] args) throws Exception {
> > SourceFunction sourceFunction =
> > MySQLSource.builder()
> > .hostname("localhost")
> > .port(3306)
> > .databaseList("sohay") // monitor all tables under
> > inventory database
> > .username("root")
> > .password("123456")
> > .deserializer(new StringDebeziumDeserializationSchema())
> > // converts SourceRecord to String
> > .build();
> >
> > StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> > env.addSource(sourceFunction).print().setParallelism(1); // use
> > parallelism 1 for sink to keep message ordering
> >
> > env.execute();
> > }
> > }
> >
> >
> >
> > hl9...@126.com
> >
>


Re: Re: flink tm cpu cores设置

2020-11-04 文章 zjfpla...@hotmail.com
你说加启动参数 而不是flink-conf.yaml?



zjfpla...@hotmail.com
 
发件人: JasonLee
发送时间: 2020-11-05 13:59
收件人: user-zh
主题: Re: Re: flink tm cpu cores设置
hi
可以这么设置-yD yarn.containers.vcores=你设置的值
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: cdc代码报错

2020-11-04 文章 hl9...@126.com
确实是pom中存在一个kafka的依赖包,冲突了,去掉后问题解决。感谢大佬。
另外,如果我想使用kafka,就必须引入kafka的包,还是会冲突,有什么解决办法吗?



hl9...@126.com
 
发件人: Jark Wu
发送时间: 2020-11-05 11:55
收件人: user-zh
主题: Re: cdc代码报错
环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。
 
 
On Thu, 5 Nov 2020 at 11:35, hl9...@126.com  wrote:
 
> flink版本1.11.2,有没有大佬遇到这个问题?
>
>
>
> hl9...@126.com
>
> 发件人: hl9...@126.com
> 发送时间: 2020-11-04 16:43
> 收件人: user-zh
> 主题: cdc代码报错
> Hi,all:
> 我运行ververica/flink-cdc-connectors git上的demo代码,报错:
> 2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out
> (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom
> Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d)
> switched from RUNNING to FAILED.
> java.lang.AbstractMethodError:
> org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
> at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
> at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
> at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:583)
> at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:80)
> at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
> at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
> at
> io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
> at
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
> 源码:
> public class CdcTest {
> public static void main(String[] args) throws Exception {
> SourceFunction sourceFunction =
> MySQLSource.builder()
> .hostname("localhost")
> .port(3306)
> .databaseList("sohay") // monitor all tables under
> inventory database
> .username("root")
> .password("123456")
> .deserializer(new StringDebeziumDeserializationSchema())
> // converts SourceRecord to String
> .build();
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.addSource(sourceFunction).print().setParallelism(1); // use
> parallelism 1 for sink to keep message ordering
>
> env.execute();
> }
> }
>
>
>
> hl9...@126.com
>


Re: Re: flink tm cpu cores设置

2020-11-04 文章 JasonLee
hi
可以这么设置-yD yarn.containers.vcores=你设置的值



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink tm cpu cores设置

2020-11-04 文章 zjfpla...@hotmail.com
1.8中在flink-conf.yaml中设置无效



zjfpla...@hotmail.com
 
发件人: JasonLee
发送时间: 2020-11-05 13:49
收件人: user-zh
主题: Re: flink tm cpu cores设置
hi 设置yarn.containers.vcores这个参数就可以了
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink tm cpu cores设置

2020-11-04 文章 zjfpla...@hotmail.com
这个再flink-conf.yaml中设置过没用



zjfpla...@hotmail.com
 
发件人: JasonLee
发送时间: 2020-11-05 13:49
收件人: user-zh
主题: Re: flink tm cpu cores设置
hi 设置yarn.containers.vcores这个参数就可以了
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink tm cpu cores设置

2020-11-04 文章 JasonLee
hi 设置yarn.containers.vcores这个参数就可以了



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink tm cpu cores设置

2020-11-04 文章 zjfpla...@hotmail.com
flink on yarn per模式,tm能设置cpu cores的数量吗?



zjfpla...@hotmail.com


Re: UDAF函数在over窗口使用问题

2020-11-04 文章 Tianwang Li
有没有文章,介绍过期时间清理的?需不需要用户自己设置TTL。

例如:我有一个TOPN计算,怎么做过期数据清理?(还是会自动处理)

SELECT cnt, word, time_hour
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY time_hour ORDER BY cnt desc) AS rownum
FROM test_word_count)
WHERE rownum <= 100;



Benchao Li  于2020年9月14日周一 下午1:03写道:

> Hi,
>
> 看起来你并没有实现`retract` 方法,正常来讲,over window在处理过期数据的时候,会将过期的数据进行一次retract计算。
> 所以你需要正确的实现一下retract方法。
>
> chen310 <1...@163.com> 于2020年9月14日周一 上午10:01写道:
>
> > flink版本 1.11.1
> >
> > 实现了一个UDAF聚集函数,将窗口内某些字段合并成一个字符串。代码如下:
> >
> > public class AggDistinctDetail extends AggregateFunction > AggDistinctDetail.Details> {
> > private static final Logger logger =
> > LoggerFactory.getLogger(AggDistinctDetail.class);
> >
> > public static class Details {
> > public Set set;
> > }
> >
> > @Override
> > public Details createAccumulator() {
> > return new Details();
> > }
> >
> > @Override
> > public String getValue(Details acc) {
> > return JSON.toJSONString(acc.set);
> > }
> >
> > public void accumulate(Details acc, String val) {
> > if (acc.set == null) {
> > acc.set = new HashSet<>();
> > }
> > acc.set.add(val);
> > }
> >
> > public void retract(Details acc, String val) {
> > //now, agg detail don't need support retraction
> > }
> >
> > public void merge(Details acc, Iterable it) {
> > Iterator iter = it.iterator();
> > if (acc.set == null) {
> > acc.set = new HashSet<>();
> > }
> > while (iter.hasNext()) {
> > Details a = iter.next();
> > acc.set.addAll(a.set);
> > }
> > }
> >
> > public void resetAccumulator(Details acc) {
> > acc.set = null;
> > }
> > }
> >
> > 将此UDAF使用在over窗口上,此窗口按realIp分区,以消息中事件时间(EventTime)
> > requestDateTime向前推24小时作为窗口,统计窗口内realIp对应的所有userId,作为明细输出userId聚集后的字符串。
> >
> > drop function if exists UDF_InfoDistinctMerge;
> > create function UDF_InfoDistinctMerge AS
> > 'com.binance.risk.flink.udf.AggDistinctDetail';
> >
> > select
> > realIp ,
> > UDF_InfoDistinctMerge(userId) over w1 as userSet
> > from source_table
> > window w1 as (partition by realIp order by requestDateTime asc RANGE
> > BETWEEN
> > INTERVAL '24' hour preceding AND CURRENT ROW) ;
> >
> > 实际测试下来,发现聚集后的字符串userSet是一直在增长,即使窗口时间已经超过24小时,依然被聚集到userSet这个结果中,这和预期不符。
> >
> > 问题:
> > 是上面UDAF的实现有啥问题么?还是UDAF在over窗口上有bug?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 
**
 tivanli
**


Re: Re: flink-cdc-mysql 使用时遇到的问题

2020-11-04 文章 Jark Wu
你上面flink sql client 中的 ClassNotFound 异常,应该是你 flink cluster 没重启导致的 (你的 sql
client 是重启了,但是cluster 应该没重启)。

API 测试是另外的问题?能 share 以下你的 API 测试代码吗?

Best,
Jark

On Mon, 2 Nov 2020 at 11:53, yangxusun9  wrote:

> 我在使用API的时候导入的依赖如下,
> 
>
> 
> org.apache.flink
> flink-clients_${scala.binary.version}
> ${flink.version}
> 
> 
> com.alibaba.ververica
>
> flink-connector-mysql-cdc
> 1.1.0
> 
> ,
> 其中flink 版本是1.11.1,2.11,
> *当我刚运行的时候是能读到数据的*,但是我一旦对mysql中的表执行了变更操作,程序就会抛出Encountered change event for
> table test.order_info whose schema isn't known to this connector
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re:flink sql cdc任务提交报错

2020-11-04 文章 Jark Wu
好像堆栈不完整,真正的 root cause 没有截出来。
另外图片还是看不到,还是粘贴文字吧。

On Tue, 3 Nov 2020 at 11:06, flink小猪 <18579099...@163.com> wrote:

> 
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Unable to create a sink for writing table
> 'default_catalog.default_database.test_wide_performance_ceres_can_renewal'.
>
>
> Table options are:
>
>
> 'connector'='jdbc'
>
> 'password'='123456'
>
> 'sink.buffer-flush.interval'='2s'
>
> 'sink.buffer-flush.max-rows'='1000'
>
> 'table-name'='test_wide_performance_ceres_can_renewal'
>
> 'url'='jdbc:mysql://localhost:3306/stat'
>
> 'username'='root'
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create a sink for writing table
> 'default_catalog.default_database.test_wide_performance_ceres_can
>
>
> 以上是堆栈信息。
>
>
>
>
> 在 2020-11-03 11:00:44,"flink小猪" <18579099...@163.com> 写道:
> >
> >
> >
> >
> >
> >
> >我明明上传了图片呀
> >
> >
> >
> >
> >
> >在 2020-11-03 10:41:57,"flink小猪" <18579099...@163.com> 写道:
> >>当我提交flink sql任务到集群上时,报以下错误。
> >>首先感觉像是缺少依赖jar包,但是我使用反编译软件打开时发现依赖存在,那这是什么问题呢,我应该如何去解决?
>
>
>
>
>
>


Re: 使用flink-CDC checkpoint超时问题

2020-11-04 文章 Jark Wu
看下这篇文章的第四条有提到:https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ

On Tue, 3 Nov 2020 at 12:50, 丁浩浩 <18579099...@163.com> wrote:

> 1.我发现那张小表的checkpoint很快就完成了,只有数据量20万左右的表一直没有完成直到超时,数据量并不大,但是我发现
> 处理的速度是不是太慢了,写入mysql的数据大概是200条/s。
> 2.我发现cdc首次全量加载数据好像需要将表全量的数据处理完成之后才能checkpoint完成(之后增量的checkpoint应该就是binlog
> offset的方式)
> 所以当我的表数据比较大时,checkpoint永远都无法完成,我能增加checkpoint的超时时间吗,我在官网的配置里好像没有看到这个选项。
>
> > 在 2020年11月3日,上午8:54,zhisheng  写道:
> >
> > hi
> >
> > 我建议可以从两方面排查一下:
> >
> > 1、检查 checkpoint 的大小,是不是很大?
> >
> > 2、检查作业是否反压?反压的情况下 checkpoint 一般很难成功,这种情况可以先解决反压的问题。
> >
> > Best
> > zhisheng
> >
> > 丁浩浩 <18579099...@163.com> 于2020年11月2日周一 下午4:08写道:
> >
> >> 我刚调研flink sql cdc功能,我有一个这样的需求,就是三张CDC的表做left join关联,由于三张表都会变化,所以使用cdc来做。
> >> 前两张表数据每张大概20万条数据,最后一张表只有几十条数据,我是讲三张表关联之后做成宽表写入的mysql中。
> >> 每次数据写到10万条左右任务失败,查看了一下日志,是checkpoint超时造成的。状态后端用的rocksDB。我想问
> >> 数据量也不是很大,为什么checkpoint会超时呢,也就是10分钟没有完成checkpoint?
> >>
> >>
>
>


Re: flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-04 文章 Jark Wu
附去重文档链接:
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D

On Thu, 5 Nov 2020 at 12:01, Jark Wu  wrote:

> 1. 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。
> 2. 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only
> stream,无法识别 cdc 流。这个功能规划在了1.13 版本中。
>
> 对于你的场景,有以下几种解决办法:
> 1. 如果你的流中只有插入和更新,没有删除。那么用 DataStream 先注册成一个 insert-only 的 Table,然后用
> Flink SQL 的去重语法 [1] 保留 pk 下的最后一条记录即可。
> 2. 如果你的流中有删除,那么你得自己开发一个 sql connector,把 cdc
> 抓取以及“mapfunction对流做了些顺序的限制”的逻辑实现在你的 source 中。
>
> Best,
> Jark
>
>
>
> On Thu, 5 Nov 2020 at 10:07, jindy_liu <286729...@qq.com> wrote:
>
>> 目前有两个DataStream的流,通过mapfunction,
>> 转成DataStream流,请问DataStream怎么转成table,并使用flink sql进行操作。
>> *(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)*
>>
>> *目前我的做法会报错:*
>>
>> StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,
>> fsSettings);
>>
>> DataStreamSource json1 // canal json的格式
>> DataStreamSource json2  // canal json的格式
>> ConnectedStreams connect=
>> caliber_cdc_json.connect(caliber_snapshot_json); //connect
>> DataStream snapshot_cdc_stream = connect.flatMap(
>> new SnapshotCdcCoRichFlatMapFunction()
>> ); //做连接
>>
>> //3, 注册表,将表数据,直接输出
>> Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream);
>> fsTableEnv.createTemporaryView("test", snapshot_cdc_table);
>>
>> String output = "CREATE TABLE test_mirror (\n" +
>> "`id` INT,\n" +
>> "`name` VARCHAR(255),\n" +
>> "`time` TIMESTAMP(3),\n" +
>> "PRIMARY KEY(id) NOT ENFORCED\n" +
>> ") WITH (\n" +
>> "  'connector' = 'print'\n" +
>> ")";
>>
>>  //4, app logic
>> String op = "INSERT into test_mirror SELECT * from test";
>> fsTableEnv.executeSql(output);
>> fsTableEnv.executeSql(op);
>>
>>
>> *但提交任务失败,错误信息:*
>> serializationSchema:root
>>  |-- id: INT NOT NULL
>>  |-- name: VARCHAR(255)
>>  |-- time: TIMESTAMP(3)
>>  |-- status: INT
>>  |-- CONSTRAINT PK_3386 PRIMARY KEY (id)
>>
>> snapshot_cdc_table:UnnamedTable$0
>> ++
>> | table name |
>> ++
>> | UnnamedTable$0 |
>> |   test |
>> |test_mirror |
>> ++
>> 3 rows in set
>>
>> 
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error: A raw type backed by type information has no serializable
>> string representation. It needs to be resolved into a proper raw type.
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>> at
>> org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown
>> Source)
>> at
>>
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>> *Caused by: org.apache.flink.table.api.TableException: A raw type backed
>> by
>> type information has no serializable string representation. It needs to be
>> resolved into a proper raw type.*
>> at
>>
>> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
>> at
>>
>> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>> at
>>
>> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
>> at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>>
>> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImpli

Re: flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-04 文章 Jark Wu
1. 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。
2. 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only
stream,无法识别 cdc 流。这个功能规划在了1.13 版本中。

对于你的场景,有以下几种解决办法:
1. 如果你的流中只有插入和更新,没有删除。那么用 DataStream 先注册成一个 insert-only 的 Table,然后用
Flink SQL 的去重语法 [1] 保留 pk 下的最后一条记录即可。
2. 如果你的流中有删除,那么你得自己开发一个 sql connector,把 cdc
抓取以及“mapfunction对流做了些顺序的限制”的逻辑实现在你的 source 中。

Best,
Jark



On Thu, 5 Nov 2020 at 10:07, jindy_liu <286729...@qq.com> wrote:

> 目前有两个DataStream的流,通过mapfunction,
> 转成DataStream流,请问DataStream怎么转成table,并使用flink sql进行操作。
> *(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)*
>
> *目前我的做法会报错:*
>
> StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,
> fsSettings);
>
> DataStreamSource json1 // canal json的格式
> DataStreamSource json2  // canal json的格式
> ConnectedStreams connect=
> caliber_cdc_json.connect(caliber_snapshot_json); //connect
> DataStream snapshot_cdc_stream = connect.flatMap(
> new SnapshotCdcCoRichFlatMapFunction()
> ); //做连接
>
> //3, 注册表,将表数据,直接输出
> Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream);
> fsTableEnv.createTemporaryView("test", snapshot_cdc_table);
>
> String output = "CREATE TABLE test_mirror (\n" +
> "`id` INT,\n" +
> "`name` VARCHAR(255),\n" +
> "`time` TIMESTAMP(3),\n" +
> "PRIMARY KEY(id) NOT ENFORCED\n" +
> ") WITH (\n" +
> "  'connector' = 'print'\n" +
> ")";
>
>  //4, app logic
> String op = "INSERT into test_mirror SELECT * from test";
> fsTableEnv.executeSql(output);
> fsTableEnv.executeSql(op);
>
>
> *但提交任务失败,错误信息:*
> serializationSchema:root
>  |-- id: INT NOT NULL
>  |-- name: VARCHAR(255)
>  |-- time: TIMESTAMP(3)
>  |-- status: INT
>  |-- CONSTRAINT PK_3386 PRIMARY KEY (id)
>
> snapshot_cdc_table:UnnamedTable$0
> ++
> | table name |
> ++
> | UnnamedTable$0 |
> |   test |
> |test_mirror |
> ++
> 3 rows in set
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: A raw type backed by type information has no serializable
> string representation. It needs to be resolved into a proper raw type.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at
> org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown
> Source)
> at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> *Caused by: org.apache.flink.table.api.TableException: A raw type backed by
> type information has no serializable string representation. It needs to be
> resolved into a proper raw type.*
> at
>
> org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
> at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
>
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
> at scala.Option.map(Option.scala:146)
>  

Re: cdc代码报错

2020-11-04 文章 Jark Wu
环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。


On Thu, 5 Nov 2020 at 11:35, hl9...@126.com  wrote:

> flink版本1.11.2,有没有大佬遇到这个问题?
>
>
>
> hl9...@126.com
>
> 发件人: hl9...@126.com
> 发送时间: 2020-11-04 16:43
> 收件人: user-zh
> 主题: cdc代码报错
> Hi,all:
> 我运行ververica/flink-cdc-connectors git上的demo代码,报错:
> 2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out
> (1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom
> Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d)
> switched from RUNNING to FAILED.
> java.lang.AbstractMethodError:
> org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
> at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
> at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
> at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:583)
> at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:80)
> at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
> at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
> at
> io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
> at
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
>
> 源码:
> public class CdcTest {
> public static void main(String[] args) throws Exception {
> SourceFunction sourceFunction =
> MySQLSource.builder()
> .hostname("localhost")
> .port(3306)
> .databaseList("sohay") // monitor all tables under
> inventory database
> .username("root")
> .password("123456")
> .deserializer(new StringDebeziumDeserializationSchema())
> // converts SourceRecord to String
> .build();
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.addSource(sourceFunction).print().setParallelism(1); // use
> parallelism 1 for sink to keep message ordering
>
> env.execute();
> }
> }
>
>
>
> hl9...@126.com
>


回复: cdc代码报错

2020-11-04 文章 hl9...@126.com
flink版本1.11.2,有没有大佬遇到这个问题?



hl9...@126.com
 
发件人: hl9...@126.com
发送时间: 2020-11-04 16:43
收件人: user-zh
主题: cdc代码报错
Hi,all:
我运行ververica/flink-cdc-connectors git上的demo代码,报错:
2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out 
(1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source 
-> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched 
from RUNNING to FAILED.
java.lang.AbstractMethodError: 
org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:80)
at 
io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at 
io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at 
io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at 
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
 
源码:
public class CdcTest {
public static void main(String[] args) throws Exception {
SourceFunction sourceFunction = MySQLSource.builder()
.hostname("localhost")
.port(3306)
.databaseList("sohay") // monitor all tables under inventory 
database
.username("root")
.password("123456")
.deserializer(new StringDebeziumDeserializationSchema()) // 
converts SourceRecord to String
.build();
 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
env.addSource(sourceFunction).print().setParallelism(1); // use 
parallelism 1 for sink to keep message ordering
 
env.execute();
}
}
 
 
 
hl9...@126.com


flink 1.11.2 cep rocksdb 性能调优

2020-11-04 文章 Peihui He
hi

我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。

private void bufferEvent(IN event, long currentTime) throws Exception {
long currentTs = System.currentTimeMillis();
List elementsForTimestamp =  elementQueueState.get(currentTime);
if (elementsForTimestamp == null) {
this.bufferEventGetNullhistogram.update(System.currentTimeMillis()
- currentTs);
elementsForTimestamp = new ArrayList<>();
}else {

this.bufferEventGethistogram.update(System.currentTimeMillis()-currentTs);
}
elementsForTimestamp.add(event);
long secondCurrentTs = System.currentTimeMillis();
elementQueueState.put(currentTime, elementsForTimestamp);
this.bufferEventPuthistogram.update(System.currentTimeMillis() -
secondCurrentTs);
this.bufferEventhistogram.update(System.currentTimeMillis() - currentTs);
}

通过复写CepOperator,加入了一些metics发现

this.bufferEventhistogram = metrics.histogram("buffer_event_delay", new
DescriptiveStatisticsHistogram(1000));
this.bufferEventGethistogram = metrics.histogram("buffer_event_get_delay", new
DescriptiveStatisticsHistogram(1000));
this.bufferEventGetNullhistogram =
metrics.histogram("buffer_event_get_null_delay", new
DescriptiveStatisticsHistogram(1000));
this.bufferEventPuthistogram = metrics.histogram("buffer_event_put_delay", new
DescriptiveStatisticsHistogram(1000));

在get和put比较耗时,整个bufferEvent 能达到200ms
从rocksdb的metric来看没有进行太多flush和compaction。

[image: image.png]
[image: image.png]

也参考了https://www.jianshu.com/p/2e61c2c83c57这篇文章调优过,发现效果也不是很好,一样反压。
也看过类似的问题http://apache-flink.147419.n8.nabble.com/rocksDB-td1785.html
,但是我这sst文件很小。
请教大家,为啥get和put这么耗时呢?有什么好的优化方案不?谢谢。

Best Wishes.



Best Wishes.


关于flink任务挂掉报警的监控指标选择

2020-11-04 文章 bradyMk
请问各位大佬,我基于grafana+prometheus构建的Flink监控,现在想实现flink任务挂掉后,grafana就发出报警的功能,但是目前不知道该用什么指标去监控,我之前想监控flink_jobmanager_job_uptime这个指标,设置的监控规则是:max_over_time(flink_jobmanager_job_uptime[1m])
-
min_over_time(flink_jobmanager_job_uptime[1m])的差小于等于0就报警,但是任务刚启动,会有误报,想请教下有没有更好的办法



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-04 文章 jindy_liu
目前有两个DataStream的流,通过mapfunction,
转成DataStream流,请问DataStream怎么转成table,并使用flink sql进行操作。
*(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)*

*目前我的做法会报错:*

StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv,
fsSettings);

DataStreamSource json1 // canal json的格式
DataStreamSource json2  // canal json的格式
ConnectedStreams connect=
caliber_cdc_json.connect(caliber_snapshot_json); //connect
DataStream snapshot_cdc_stream = connect.flatMap(
new SnapshotCdcCoRichFlatMapFunction()
); //做连接

//3, 注册表,将表数据,直接输出
Table snapshot_cdc_table = fsTableEnv.fromDataStream(snapshot_cdc_stream);
fsTableEnv.createTemporaryView("test", snapshot_cdc_table);

String output = "CREATE TABLE test_mirror (\n" +
"`id` INT,\n" +
"`name` VARCHAR(255),\n" +
"`time` TIMESTAMP(3),\n" +
"PRIMARY KEY(id) NOT ENFORCED\n" +
") WITH (\n" +
"  'connector' = 'print'\n" +
")";

 //4, app logic
String op = "INSERT into test_mirror SELECT * from test";
fsTableEnv.executeSql(output);
fsTableEnv.executeSql(op);


*但提交任务失败,错误信息:*
serializationSchema:root
 |-- id: INT NOT NULL
 |-- name: VARCHAR(255)
 |-- time: TIMESTAMP(3)
 |-- status: INT
 |-- CONSTRAINT PK_3386 PRIMARY KEY (id)

snapshot_cdc_table:UnnamedTable$0
++
| table name |
++
| UnnamedTable$0 |
|   test |
|test_mirror |
++
3 rows in set


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: A raw type backed by type information has no serializable
string representation. It needs to be resolved into a proper raw type.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at
org.apache.flink.client.cli.CliFrontend$$Lambda$58/1706292388.call(Unknown
Source)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
*Caused by: org.apache.flink.table.api.TableException: A raw type backed by
type information has no serializable string representation. It needs to be
resolved into a proper raw type.*
at
org.apache.flink.table.types.logical.TypeInformationRawType.asSerializableString(TypeInformationRawType.java:101)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$2.apply(TableSinkUtils.scala:92)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:92)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(Iterab

回复: union all 丢失部分数据

2020-11-04 文章 史 正超
你的union all上面,也就是insert into 的第一条select 末尾有 分号 ‘;’,, 
这样的话我感觉第二条select是不会执行的。还有另一个问题是你把分号去掉,我感觉数据会被 覆盖的。因为我最近也有使用union all的场景。
我觉得你应该这样组装 你的sql :
```sql

Insert into xxx
Select
   d1,
   d2,
   count(1)
From (
   Select * from a
   Union all
   Select * from b,
)
Group by d1, d2

```

发送自 Windows 10 版邮件应用

发件人: 夜思流年梦
发送时间: 2020年11月4日 18:21
收件人: user-zh@flink.apache.org
主题: union all 丢失部分数据

开发者好:
   目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all  ,但是实际情况发现 union 
all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了
   如果把union  all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对




原sql :


insert into dws_


select
0 as id
,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as bigint) as ftime
,case
when dept_name like '%XX%' then 'X1'
when dept_name = 'xXX' then 'X2'
else 'X3' end as paytype
,count(orderid) as paynum_h
,round(sum(amt)) as paymoney_h
from dwd_XXX
where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
group by
DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH'),
case
when dept_name like '%XX%' then 'X1'
when dept_name = 'xXX' then 'X2'
else 'X3' end ;




union all




select 0 as id
,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as int) as ftime
,'all' as paytype
,count(orderid) as paynum_h
,round(sum(amt)) as paymoney_h
from dwd_XXX
where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
group by DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') ;



Re:flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

2020-11-04 文章 hailongwang
Hi wind,


 从这行报错堆栈来看:` at 
org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) ` 
,
 应该是在对 row.startTime 或者 row. duration validate 阶段,推断类型时识别出不兼容类型,可以检测下用法有没有错误。


Best,
Hailong Wang




在 2020-11-04 16:29:37,"wind.fly@outlook.com"  写道:
>Hi,all
> 本人使用flink版本为1.11.0,自定义udaf如下:
>
>
>public class GetContinuousListenDuration extends AggregateFunctionContinuousListenDuration> {
>
>private static final DateTimeFormatter dateTimeFormatter = 
> DateTimeFormatter.ofPattern("-MM-dd HH:mm");
>
>@Override
>@DataTypeHint("ROW")
>public Row getValue(ContinuousListenDuration acc) {
>return Row.of(acc.getStartTime(), acc.getDuration());
>}
>
>@Override
>public ContinuousListenDuration createAccumulator() {
>return new ContinuousListenDuration();
>}
>
>public void accumulate(ContinuousListenDuration acc, 
> @DataTypeHint("TIMESTAMP(3)") LocalDateTime dt, Boolean isListening) {
>// 此处省略逻辑
>}
>}
>
>聚合时以Timestamp(3)、Boolean作为参数,getValue返回类型是ROWBIGINT>,函数名定义为get_continuous_listen_duration,调用该函数的sql如下:
>
>insert into
>  report.result
>select
>  id,
>  city_code,
>  get_continuous_listen_duration(
>dt,
>(order_no is null)
>or (trim(order_no) = '')
>  ).startTime as start_time,
>  get_continuous_listen_duration(
>dt,
>(order_no is null)
>or (trim(order_no) = '')
>  ).duration as duration
>from
>  (
>select
>  o.id,
>  o.dt,
>  o.order_no,
>  r.city_code
>from
>  (
>select
>  req [1] as id,
>  dt,
>  proctime,
>  req [2] as order_no
>from
>  tmp_v
>where
>  extra [1] is null
>  or extra [1] <> 'false'
>  ) o
>  JOIN dim.right FOR SYSTEM_TIME AS OF o.proctime AS r ON r.id = o.id
>  ) a
>group by
>  id,
>  city_code
>having
>  get_continuous_listen_duration(
>dt,
>(order_no is null)
>or (trim(order_no) = '')
>  ).duration >= 2
>
>运行时发生如下异常:
>
>Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Incompatible 
>types
>at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method) ~[?:1.8.0_171]
>at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_171]
>at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_171]
>at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
> ~[?:1.8.0_171]
>at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) 
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) 
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) 
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) 
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) 
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
>  ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
>  ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) 
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
>  ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
>  ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46) 
> ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980)
>  ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateHavingClause(SqlValidatorImpl.java:4214)
>  ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3515)
>  ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>  ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
>  ~[flink-table_2.11-1.11.0.jar:1.11.0]
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValid

Re:union all 丢失部分数据

2020-11-04 文章 hailongwang
Hi liaobiao,


你的 flink 版本是什么呢?
根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 upsert 
key,这样就会产生覆盖的情况。
你看下结果是否是这种情况的?


Best,
Hailong Wang




在 2020-11-04 17:20:23,"夜思流年梦"  写道:
>开发者好:
>   目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all  ,但是实际情况发现 union 
> all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了
>   如果把union  all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对
>
>
>
>
>原sql :
>
>
>insert into dws_
>
>
>select 
>0 as id
>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as bigint) as ftime
>,case 
>when dept_name like '%XX%' then 'X1'
>when dept_name = 'xXX' then 'X2'
>else 'X3' end as paytype
>,count(orderid) as paynum_h 
>,round(sum(amt)) as paymoney_h 
>from dwd_XXX
>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>group by
>DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH'), 
>case 
>when dept_name like '%XX%' then 'X1'
>when dept_name = 'xXX' then 'X2'
>else 'X3' end ;
>
>
>
>
>union all
>
>
>
>
>select 0 as id
>,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as int) as ftime
>,'all' as paytype
>,count(orderid) as paynum_h  
>,round(sum(amt)) as paymoney_h  
>from dwd_XXX
>where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
>group by DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') ;


Re:flink 1.11.1 web ui 页面查看source算子的detail数据,recoreds sent等度量状态永远为0

2020-11-04 文章 hailongwang
Hi Asahi,
因为 对于 Records Sent 等指标 Flink 只统计内部的 Metrics,对于 Source input 和 Sink 的output 
没有这些指标。
所以你的任务应该是 chain 成了一个operator,导致没有指标。如果真的需要看的话,可以点 UI 上 Metrics tab 进行选择查看。
或者可以将 operator 并发度设置成不一样导致没有 chain在一起;
PS:在生产上建议用 chain,它是在StrreamGraph 转 JobGraph上的优化,这样会减少数据网络的传递的开销以及序列化和反序列化等。


Best,
Hailong Wang

在 2020-11-04 19:03:02,"Asahi Lee" <978466...@qq.com> 写道:
>你好!
>     我的flink程序正常执行,但是我在web ui监控页面查看source算子的detail信息,里面的Records 
>Sent等度量信息,永远为0。请问是什么问题?


Re:1.11.1 报OutOfMemoryError: Metaspace. 错误

2020-11-04 文章 hailongwang
Hi Asahi,


你用的 Flink 版本是哪个呢?
对于 Metaspace OOM 的话,如果是稳定在某些值,确实需要加大这个内存大小的话,可以设置 参数 
`taskmanager-memory-jvm-metaspace-size` [1]。
对于根本原因的话,需要看下为什么有那么多的无效 Conection,是不是长时间没有数据发送导致 connection 
invalid,然后再次使用时候就会重启,导致进程级别的缓存无效 Connection对象,这样的话,有线程一直在检测这个对象,就无法 GC。对于 
connection 是否 invalid 的处理目前有个 issue[2] 已经解决了。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-jvm-metaspace-size
[2] https://issues.apache.org/jira/browse/FLINK-16681


Best,
Hailong Wang
在 2020-11-04 19:08:37,"Asahi Lee" <978466...@qq.com> 写道:
>你好!
>      我是用flink sql,通过JDBC源读取mysql表数据,多次提交后,报OutOfMemoryError: 
>Metaspace.错误,最后分析是mysql驱动包中有个守护线程一直在检测清理失效的connection,导致有线程一直存在,class资源无法释放导致的,请问这个如何处理?


Re: flink 1.11.1 web ui 页面查看source算子的detail数据,recoreds sent等度量状态永远为0

2020-11-04 文章
你上下游并行度设置的不一样试试。看看正确显示不。

admin <17626017...@163.com> 于2020年11月4日周三 下午8:10写道:

> Hi,
> 你任务的DAG是什么样子的呢,可能的原因:
> 1.source本来就没有收到数据,或者没有发送到下游
> 2.source和下游算子chain在一起看不出来
>
> > 2020年11月4日 下午8:03,Asahi Lee <978466...@qq.com> 写道:
> >
> > 你好!
> >      我的flink程序正常执行,但是我在web
> ui监控页面查看source算子的detail信息,里面的Records Sent等度量信息,永远为0。请问是什么问题?
>
>


Re: flink 1.11.1 web ui 页面查看source算子的detail数据,recoreds sent等度量状态永远为0

2020-11-04 文章 admin
Hi,
你任务的DAG是什么样子的呢,可能的原因:
1.source本来就没有收到数据,或者没有发送到下游
2.source和下游算子chain在一起看不出来

> 2020年11月4日 下午8:03,Asahi Lee <978466...@qq.com> 写道:
> 
> 你好!
>      我的flink程序正常执行,但是我在web 
> ui监控页面查看source算子的detail信息,里面的Records Sent等度量信息,永远为0。请问是什么问题?



1.11.1 ??OutOfMemoryError: Metaspace. ????

2020-11-04 文章 Asahi Lee
??
      ??flink 
sql,JDBC??mysql??OutOfMemoryError: 
Metaspace.mysqlconnection??class??

flink 1.11.1 web ui ????????source??????detail??????recoreds sent????????????????0

2020-11-04 文章 Asahi Lee
??
     flink??web 
uisource??detailRecords 
Sent??0??

Re: 提交flink sql任务报错

2020-11-04 文章 admin
Hi,
你是不是使用的flink 
1.11版本,在调用了tableEnv.executeSql,最后又调用了TableEnvironment.execute或StreamExecutionEnvironment.execute方法。
可以参考[1]

[1]https://blog.csdn.net/weixin_41608066/article/details/107769826 


> 2020年11月4日 下午7:20,丁浩浩 <18579099...@163.com> 写道:
> 
> 这个任务通过读取mysql CDC 然后关联之后写入到mysql中,每次提交任务都会报错,但是会正确提交的集群上去,并且成功执行。
> 我想问问是什么原因?
> 
> The program finished with the following exception:
> 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: No operators defined in streaming topology. Cannot execute.
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>   at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: java.lang.IllegalStateException: No operators defined in streaming 
> topology. Cannot execute.
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
>   at 
> com.gaotu.data.performance.flink.job.sql.CeresCanRenewalWide.main(CeresCanRenewalWide.java:150)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>   ... 8 more
> bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/stop-cluster.sh 
> Stopping taskexecutor daemon (pid: 92273) on host bjhldeMacBook-Pro.local.
> Stopping standalonesession daemon (pid: 92004) on host 
> bjhldeMacBook-Pro.local.
> bjhldeMacBook-Pro:flink-1.11.2 dinghh$ vim conf/flink-conf.yaml 
> bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-x
> -bash: bin/start-x: No such file or directory
> bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-cluster.sh 
> Starting cluster.
> Starting standalonesession daemon on host bjhldeMacBook-Pro.local.
> Starting taskexecutor daemon on host bjhldeMacBook-Pro.local.



Re:退订订阅

2020-11-04 文章 hailongwang
Hi wangleigis,


退订需发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考[1] [1] 
https://flink.apache.org/community.html#mailing-lists


Best,
Hailong Wang

在 2020-11-04 17:59:45,"wangleigis"  写道:
>
>
>
>退订
>
>
>
>
>--
>
>祝:工作顺利,完事如意!
>
>


提交flink sql任务报错

2020-11-04 文章 丁浩浩
这个任务通过读取mysql CDC 然后关联之后写入到mysql中,每次提交任务都会报错,但是会正确提交的集群上去,并且成功执行。
我想问问是什么原因?

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: No operators defined in streaming topology. Cannot execute.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.IllegalStateException: No operators defined in streaming 
topology. Cannot execute.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
at 
com.gaotu.data.performance.flink.job.sql.CeresCanRenewalWide.main(CeresCanRenewalWide.java:150)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 8 more
bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 92273) on host bjhldeMacBook-Pro.local.
Stopping standalonesession daemon (pid: 92004) on host bjhldeMacBook-Pro.local.
bjhldeMacBook-Pro:flink-1.11.2 dinghh$ vim conf/flink-conf.yaml 
bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-x
-bash: bin/start-x: No such file or directory
bjhldeMacBook-Pro:flink-1.11.2 dinghh$ bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host bjhldeMacBook-Pro.local.
Starting taskexecutor daemon on host bjhldeMacBook-Pro.local.

退订订阅

2020-11-04 文章 wangleigis



退订




--

祝:工作顺利,完事如意!




union all 丢失部分数据

2020-11-04 文章 夜思流年梦
开发者好:
   目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all  ,但是实际情况发现 union 
all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了
   如果把union  all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对




原sql :


insert into dws_


select 
0 as id
,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as bigint) as ftime
,case 
when dept_name like '%XX%' then 'X1'
when dept_name = 'xXX' then 'X2'
else 'X3' end as paytype
,count(orderid) as paynum_h 
,round(sum(amt)) as paymoney_h 
from dwd_XXX
where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
group by
DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH'), 
case 
when dept_name like '%XX%' then 'X1'
when dept_name = 'xXX' then 'X2'
else 'X3' end ;




union all




select 0 as id
,cast (DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') as int) as ftime
,'all' as paytype
,count(orderid) as paynum_h  
,round(sum(amt)) as paymoney_h  
from dwd_XXX
where write_time >=DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd')
group by DATE_FORMAT(LOCALTIMESTAMP, 'MMddHH') ;

回复: flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

2020-11-04 文章 wind.fly....@outlook.com
补充一下,sql中dt是timestamp(3)类型,同时是watermark

发件人: wind.fly@outlook.com 
发送时间: 2020年11月4日 17:29
收件人: user-zh@flink.apache.org 
主题: flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

Hi,all
 本人使用flink版本为1.11.0,自定义udaf如下:


public class GetContinuousListenDuration extends AggregateFunction {

private static final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("-MM-dd HH:mm");

@Override
@DataTypeHint("ROW")
public Row getValue(ContinuousListenDuration acc) {
return Row.of(acc.getStartTime(), acc.getDuration());
}

@Override
public ContinuousListenDuration createAccumulator() {
return new ContinuousListenDuration();
}

public void accumulate(ContinuousListenDuration acc, 
@DataTypeHint("TIMESTAMP(3)") LocalDateTime dt, Boolean isListening) {
// 此处省略逻辑
}
}

聚合时以Timestamp(3)、Boolean作为参数,getValue返回类型是ROW,函数名定义为get_continuous_listen_duration,调用该函数的sql如下:

insert into
  report.result
select
  id,
  city_code,
  get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
  ).startTime as start_time,
  get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
  ).duration as duration
from
  (
select
  o.id,
  o.dt,
  o.order_no,
  r.city_code
from
  (
select
  req [1] as id,
  dt,
  proctime,
  req [2] as order_no
from
  tmp_v
where
  extra [1] is null
  or extra [1] <> 'false'
  ) o
  JOIN dim.right FOR SYSTEM_TIME AS OF o.proctime AS r ON r.id = o.id
  ) a
group by
  id,
  city_code
having
  get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
  ).duration >= 2

运行时发生如下异常:

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Incompatible 
types
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
Method) ~[?:1.8.0_171]
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 ~[?:1.8.0_171]
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_171]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_171]
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateHavingClause(SqlValidatorImpl.java:4214)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3515)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorIm

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-11-04 文章 Fy
您好,我也遇到了同样的问题。
MountVolume.SetUp failed for volume "flink-config-volume" : configmap
"flink-config-flink-mm" not found
Back-off restarting failed container
查看对应namespace 下的configmap,flink-config-flink-mm已经存在。但是JobManager pod
还是一直在重试,不能提供服务。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

2020-11-04 文章 wind.fly....@outlook.com
Hi,all
 本人使用flink版本为1.11.0,自定义udaf如下:


public class GetContinuousListenDuration extends AggregateFunction {

private static final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("-MM-dd HH:mm");

@Override
@DataTypeHint("ROW")
public Row getValue(ContinuousListenDuration acc) {
return Row.of(acc.getStartTime(), acc.getDuration());
}

@Override
public ContinuousListenDuration createAccumulator() {
return new ContinuousListenDuration();
}

public void accumulate(ContinuousListenDuration acc, 
@DataTypeHint("TIMESTAMP(3)") LocalDateTime dt, Boolean isListening) {
// 此处省略逻辑
}
}

聚合时以Timestamp(3)、Boolean作为参数,getValue返回类型是ROW,函数名定义为get_continuous_listen_duration,调用该函数的sql如下:

insert into
  report.result
select
  id,
  city_code,
  get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
  ).startTime as start_time,
  get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
  ).duration as duration
from
  (
select
  o.id,
  o.dt,
  o.order_no,
  r.city_code
from
  (
select
  req [1] as id,
  dt,
  proctime,
  req [2] as order_no
from
  tmp_v
where
  extra [1] is null
  or extra [1] <> 'false'
  ) o
  JOIN dim.right FOR SYSTEM_TIME AS OF o.proctime AS r ON r.id = o.id
  ) a
group by
  id,
  city_code
having
  get_continuous_listen_duration(
dt,
(order_no is null)
or (trim(order_no) = '')
  ).duration >= 2

运行时发生如下异常:

Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Incompatible 
types
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
Method) ~[?:1.8.0_171]
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 ~[?:1.8.0_171]
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_171]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_171]
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1980)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateHavingClause(SqlValidatorImpl.java:4214)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3515)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) 
~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
 ~[flink-table_2.11-1.11.0.jar:1.11.0]
at 
org.apache.

cdc代码报错

2020-11-04 文章 hl9...@126.com
Hi,all:
我运行ververica/flink-cdc-connectors git上的demo代码,报错:
2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out 
(1/1)] WARN  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source 
-> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched 
from RUNNING to FAILED.
java.lang.AbstractMethodError: 
org.apache.kafka.connect.json.JsonSerializer.configure(Ljava/util/Map;Z)V
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:583)
at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:80)
at 
io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:301)
at 
io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:217)
at 
io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at 
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

源码:
public class CdcTest {
public static void main(String[] args) throws Exception {
SourceFunction sourceFunction = MySQLSource.builder()
.hostname("localhost")
.port(3306)
.databaseList("sohay") // monitor all tables under inventory 
database
.username("root")
.password("123456")
.deserializer(new StringDebeziumDeserializationSchema()) // 
converts SourceRecord to String
.build();

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.addSource(sourceFunction).print().setParallelism(1); // use 
parallelism 1 for sink to keep message ordering

env.execute();
}
}



hl9...@126.com


Re: kafka table connector eventTime的问题

2020-11-04 文章 WeiXubin
Hi,我这有一个使用Datastream开发简单例子,接收Kafka(Event
Time)数据并进行开窗聚合。Kafka数据格式如:{"word":"a","count":1,"time":1604286564},可以看看该Demo对你是否有所帮助。

public class MyExample {

public static void main(String[] args) throws Exception {
// 创建环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 设置时间特性为
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 水印策略
WatermarkStrategy watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new
SerializableTimestampAssigner() {
@Override
public long extractTimestamp(WC wc, long l) {
return wc.getEventTime() * 1000;
}
});

// Kafka 配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "Kafka地址:9092");
properties.setProperty("group.id", "test");

env.addSource(new FlinkKafkaConsumer<>("flinktest1", new
JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest())
// map 构建 WC 对象
.map(new MapFunction() {
@Override
public WC map(ObjectNode jsonNode) throws Exception {
JsonNode valueNode = jsonNode.get("value");
WC wc = new
WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());
return wc;
}
})
// 设定水印策略
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(WC::getWord)
// 窗口设置,这里设置为滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 设置窗口延迟
.allowedLateness(Time.seconds(2))
.reduce(new ReduceFunction() {
@Override
public WC reduce(WC wc, WC t1) throws Exception {
return new WC(wc.getWord(), wc.getCount() +
t1.getCount());
}
})
.print();

env.execute();
}


static class WC {
public String word;
public int count;
public long eventTime;

public long getEventTime() {
return eventTime;
}

public void setEventTime(long eventTime) {
this.eventTime = eventTime;
}

public String getWord() {
return word;
}

public void setWord(String word) {
this.word = word;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public WC(String word, int count) {
this.word = word;
this.count = count;
}

public WC(String word, int count,long eventTime) {
this.word = word;
this.count = count;
this.eventTime = eventTime;
}
  
@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/