Re: Question about processing a 3-level List data type in parquet

2020-11-04 Thread Naehee Kim
Hi Jingsong, Thanks for the feedback. Can you let me know the concept and timeline of BulkFormat/ParquetBulkFormat and the difference with ParquetInputFormat? Our use case is for backfill to process parquet files in case of any data issue is found in the normal processing of kafka input. Thus,

flink-1.11 写 hive 报错

2020-11-04 Thread nashcen
flink 读 kafka 写 hive,之前运行好好的。在IDEA也能正常运行,打成jar包,提交到 flink 集群,报错如下。请问是什么原因? 2020-11-05 15:34:36 org.apache.flink.connectors.hive.FlinkHiveException: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive RecordWriter at

Re: flink 1.11.2 cep rocksdb 性能调优

2020-11-04 Thread Peihui He
hi, @Override public UV get(UK userKey) throws IOException, RocksDBException { byte[] rawKeyBytes = serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer); byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); return (rawValueBytes == null ? null :

Re:Re:union all 丢失部分数据

2020-11-04 Thread 夜思流年梦
flink 版本是1.11的版本了 在 2020-11-05 00:02:12,"hailongwang" <18868816...@163.com> 写道: >Hi liaobiao, > > >你的 flink 版本是什么呢? >根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 upsert >key,这样就会产生覆盖的情况。 >你看下结果是否是这种情况的? > > >Best, >Hailong Wang > > > > >在 2020-11-04

Re:回复: union all 丢失部分数据

2020-11-04 Thread 夜思流年梦
哦,不好意思,因为我把这条SQL是分成两段了,所以在恢复原SQL的时候没有把 分号去掉 ; 当时在union的时候,中间是不会有分号的,不然也提不上去 在 2020-11-05 10:00:01,"史 正超" <792790...@qq.com> 写道: >你的union all上面,也就是insert into 的第一条select 末尾有 分号 ‘;’,, >这样的话我感觉第二条select是不会执行的。还有另一个问题是你把分号去掉,我感觉数据会被 覆盖的。因为我最近也有使用union all的场景。 >我觉得你应该这样组装 你的sql :

Re: Re: cdc代码报错

2020-11-04 Thread Jark Wu
可以看下 flink-cdc-connectors 中用的是什么版本,使用一样的kafka-connect版本。 On Thu, 5 Nov 2020 at 14:07, hl9...@126.com wrote: > 确实是pom中存在一个kafka的依赖包,冲突了,去掉后问题解决。感谢大佬。 > 另外,如果我想使用kafka,就必须引入kafka的包,还是会冲突,有什么解决办法吗? > > > > hl9...@126.com > > 发件人: Jark Wu > 发送时间: 2020-11-05 11:55 > 收件人: user-zh > 主题: Re: cdc代码报错 >

Re: Re: flink tm cpu cores设置

2020-11-04 Thread zjfpla...@hotmail.com
你说加启动参数 而不是flink-conf.yaml? zjfpla...@hotmail.com 发件人: JasonLee 发送时间: 2020-11-05 13:59 收件人: user-zh 主题: Re: Re: flink tm cpu cores设置 hi 可以这么设置-yD yarn.containers.vcores=你设置的值 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: cdc代码报错

2020-11-04 Thread hl9...@126.com
确实是pom中存在一个kafka的依赖包,冲突了,去掉后问题解决。感谢大佬。 另外,如果我想使用kafka,就必须引入kafka的包,还是会冲突,有什么解决办法吗? hl9...@126.com 发件人: Jark Wu 发送时间: 2020-11-05 11:55 收件人: user-zh 主题: Re: cdc代码报错 环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。 On Thu, 5 Nov 2020 at 11:35, hl9...@126.com wrote: > flink版本1.11.2,有没有大佬遇到这个问题? >

Re: Re: flink tm cpu cores设置

2020-11-04 Thread zjfpla...@hotmail.com
1.8中在flink-conf.yaml中设置无效 zjfpla...@hotmail.com 发件人: JasonLee 发送时间: 2020-11-05 13:49 收件人: user-zh 主题: Re: flink tm cpu cores设置 hi 设置yarn.containers.vcores这个参数就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: flink tm cpu cores设置

2020-11-04 Thread zjfpla...@hotmail.com
这个再flink-conf.yaml中设置过没用 zjfpla...@hotmail.com 发件人: JasonLee 发送时间: 2020-11-05 13:49 收件人: user-zh 主题: Re: flink tm cpu cores设置 hi 设置yarn.containers.vcores这个参数就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink tm cpu cores设置

2020-11-04 Thread JasonLee
hi 设置yarn.containers.vcores这个参数就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink tm cpu cores设置

2020-11-04 Thread zjfpla...@hotmail.com
flink on yarn per模式,tm能设置cpu cores的数量吗? zjfpla...@hotmail.com

Re: UDAF函数在over窗口使用问题

2020-11-04 Thread Tianwang Li
有没有文章,介绍过期时间清理的?需不需要用户自己设置TTL。 例如:我有一个TOPN计算,怎么做过期数据清理?(还是会自动处理) SELECT cnt, word, time_hour FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY time_hour ORDER BY cnt desc) AS rownum FROM test_word_count) WHERE rownum <= 100; Benchao Li 于2020年9月14日周一 下午1:03写道: > Hi, > > 看起来你并没有实现`retract`

Re: Re: flink-cdc-mysql 使用时遇到的问题

2020-11-04 Thread Jark Wu
你上面flink sql client 中的 ClassNotFound 异常,应该是你 flink cluster 没重启导致的 (你的 sql client 是重启了,但是cluster 应该没重启)。 API 测试是另外的问题?能 share 以下你的 API 测试代码吗? Best, Jark On Mon, 2 Nov 2020 at 11:53, yangxusun9 wrote: > 我在使用API的时候导入的依赖如下, > > > > org.apache.flink >

Re: Re:flink sql cdc任务提交报错

2020-11-04 Thread Jark Wu
好像堆栈不完整,真正的 root cause 没有截出来。 另外图片还是看不到,还是粘贴文字吧。 On Tue, 3 Nov 2020 at 11:06, flink小猪 <18579099...@163.com> wrote: > > > The program finished with the following exception: > > >

Re: 使用flink-CDC checkpoint超时问题

2020-11-04 Thread Jark Wu
看下这篇文章的第四条有提到:https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ On Tue, 3 Nov 2020 at 12:50, 丁浩浩 <18579099...@163.com> wrote: > 1.我发现那张小表的checkpoint很快就完成了,只有数据量20万左右的表一直没有完成直到超时,数据量并不大,但是我发现 > 处理的速度是不是太慢了,写入mysql的数据大概是200条/s。 >

Re: flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-04 Thread Jark Wu
附去重文档链接: https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D On Thu, 5 Nov 2020 at 12:01, Jark Wu wrote: > 1. 目前不支持注册一个 RowData 类型的 DataStream,因为 RowData 会被识别成 非结构化类型。 > 2. 目前不支持注册一个 cdc stream,也就是说 DataStream -> Table 只支持 insert-only >

Re: cdc代码报错

2020-11-04 Thread Jark Wu
环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。 On Thu, 5 Nov 2020 at 11:35, hl9...@126.com wrote: > flink版本1.11.2,有没有大佬遇到这个问题? > > > > hl9...@126.com > > 发件人: hl9...@126.com > 发送时间: 2020-11-04 16:43 > 收件人: user-zh > 主题: cdc代码报错 > Hi,all: > 我运行ververica/flink-cdc-connectors git上的demo代码,报错: >

Re: I have some interesting result with my test code

2020-11-04 Thread Jark Wu
Hi Kevin, Could you share the code of how you register the FlinkKafkaConsumer as a table? Regarding your initialization of FlinkKafkaConsumer, I would recommend to setStartFromEarliest() to guarantee it consumes all the records in partitions. Regarding the flush(), it seems it is in the foreach

Re: Dependency injection and flink.

2020-11-04 Thread Dan Diephouse
Just want to chime in here that it would be fantastic to have a way to DI in Flink. Ideally the injected services themselves don't get serialized at all since they're just singletons in our case. E.g. we have an API client that looks up data from our API and caches it for all the functions that

Re: A question about flink sql retreact stream

2020-11-04 Thread Jark Wu
Thanks Henry for the detailed example, I will explain why so many records at time 5. That is because the retraction mechanism is per-record triggered in Flink SQL, so there is record amplification in your case. At time 5, the LAST_VALUE aggregation for stream a will first emit -(1, 12345, 0) and

回复: cdc代码报错

2020-11-04 Thread hl9...@126.com
flink版本1.11.2,有没有大佬遇到这个问题? hl9...@126.com 发件人: hl9...@126.com 发送时间: 2020-11-04 16:43 收件人: user-zh 主题: cdc代码报错 Hi,all: 我运行ververica/flink-cdc-connectors git上的demo代码,报错: 2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN

Re: Multi-stream SQL-like processing

2020-11-04 Thread Jark Wu
Yes. The dynamism might be a problem. Does Kafka Connect support discovering new tables and synchronizing them dynamically? Best, Jark On Thu, 5 Nov 2020 at 04:39, Krzysztof Zarzycki wrote: > Hi Jark, thanks for joining the discussion! > I understand your point of view that SQL environment is

flink 1.11.2 cep rocksdb 性能调优

2020-11-04 Thread Peihui He
hi 我这边用flink1.11.2 cep做一些模式匹配,发现一旦开启rocksdb做为状态后端,就会出现反压。cpu使用率是之前的10倍。 private void bufferEvent(IN event, long currentTime) throws Exception { long currentTs = System.currentTimeMillis(); List elementsForTimestamp = elementQueueState.get(currentTime); if (elementsForTimestamp ==

flink 1.11 cdc: 如何将DataStream 要如何转成flink sql cdc里的table?

2020-11-04 Thread jindy_liu
目前有两个DataStream的流,通过mapfunction, 转成DataStream流,请问DataStream怎么转成table,并使用flink sql进行操作。 *(注:因为mapfunction对流做了些顺序的限制,目前无法无法直接用flink sql cdc直接定义表!!!)* *目前我的做法会报错:* StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); DataStreamSource json1 // canal json的格式

回复: union all 丢失部分数据

2020-11-04 Thread 史 正超
你的union all上面,也就是insert into 的第一条select 末尾有 分号 ‘;’,, 这样的话我感觉第二条select是不会执行的。还有另一个问题是你把分号去掉,我感觉数据会被 覆盖的。因为我最近也有使用union all的场景。 我觉得你应该这样组装 你的sql : ```sql Insert into xxx Select d1, d2, count(1) From ( Select * from a Union all Select * from b, ) Group by

union stream vs multiple operators

2020-11-04 Thread Alexey Trenikhun
Hello, I have two Kafka topics ("A" and "B") which provide similar structure wise data but with different load pattern, for example hundreds records per second in first topic while 10 records per second in second topic. Events processed using same algorithm and output in common sink, currently

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-04 Thread Fuyao Li
Hi Flink Users and Community, For the first part of the question, the 12 hour time difference is caused by a time extraction bug myself. I can get the time translated correctly now. The type cast problem does have some workarounds to solve it.. My major blocker right now is the onTimer part is

Re: Error while retrieving the leader gateway after making Flink config changes

2020-11-04 Thread Claude M
This issue had to do with the update strategy for the Flink deployment. When I changed it to the following, it will work: strategy: type: RollingUpdate rollingUpdate: maxSurge: 0 maxUnavailable: 1 On Tue, Nov 3, 2020 at 1:39 PM Robert Metzger wrote: > Thanks a lot for

Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-04 Thread Rex Fenley
Thank you for the info! Is there a timetable for when the next version with this change might release? On Wed, Nov 4, 2020 at 2:44 AM Timo Walther wrote: > Hi Rex, > > sorry for the late reply. POJOs will have much better support in the > upcoming Flink versions because they have been fully

Re:flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

2020-11-04 Thread hailongwang
Hi wind, 从这行报错堆栈来看:` at org.apache.calcite.sql.fun.SqlDotOperator.deriveType(SqlDotOperator.java:101) ` , 应该是在对 row.startTime 或者 row. duration validate 阶段,推断类型时识别出不兼容类型,可以检测下用法有没有错误。 Best, Hailong Wang 在 2020-11-04 16:29:37,"wind.fly@outlook.com" 写道: >Hi,all >

Re:union all 丢失部分数据

2020-11-04 Thread hailongwang
Hi liaobiao, 你的 flink 版本是什么呢? 根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 upsert key,这样就会产生覆盖的情况。 你看下结果是否是这种情况的? Best, Hailong Wang 在 2020-11-04 17:20:23,"夜思流年梦" 写道: >开发者好: > 目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all ,但是实际情况发现 union >

A question about flink sql retreact stream

2020-11-04 Thread Henry Dai
Dear flink developers I have a question about flink sql, It gives me a lot of trouble, Thank you very much for some help. Lets's assume we have two data stream, `order` and `order_detail`, they are from mysql binlog. Table `order` schema: id int primary key

Re:flink 1.11.1 web ui 页面查看source算子的detail数据,recoreds sent等度量状态永远为0

2020-11-04 Thread hailongwang
Hi Asahi, 因为 对于 Records Sent 等指标 Flink 只统计内部的 Metrics,对于 Source input 和 Sink 的output 没有这些指标。 所以你的任务应该是 chain 成了一个operator,导致没有指标。如果真的需要看的话,可以点 UI 上 Metrics tab 进行选择查看。 或者可以将 operator 并发度设置成不一样导致没有 chain在一起; PS:在生产上建议用 chain,它是在StrreamGraph 转 JobGraph上的优化,这样会减少数据网络的传递的开销以及序列化和反序列化等。 Best,

A question about flink sql retreact stream

2020-11-04 Thread Henry Dai
Dear flink developers I have a question about flink sql, It gives me a lot of trouble, Thank you very much for some help. Lets's assume we have two data stream, `order` and `order_detail`, they are from mysql binlog. Table `order` schema: id int primary key

A question of Flink SQL aggregation

2020-11-04 Thread Henry Dai
Hi, Let's assume we have two stream, order stream& order detail stream, they are from mysql binlog. Table `order` schema: id primary key, order_id and order_status Table `order_detail` schema: id primary key, order_id and quantity one order item have several order_detail items if we have

Re: flink 1.11.1 web ui 页面查看source算子的detail数据,recoreds sent等度量状态永远为0

2020-11-04 Thread
你上下游并行度设置的不一样试试。看看正确显示不。 admin <17626017...@163.com> 于2020年11月4日周三 下午8:10写道: > Hi, > 你任务的DAG是什么样子的呢,可能的原因: > 1.source本来就没有收到数据,或者没有发送到下游 > 2.source和下游算子chain在一起看不出来 > > > 2020年11月4日 下午8:03,Asahi Lee <978466...@qq.com> 写道: > > > > 你好! > > 我的flink程序正常执行,但是我在web >

a couple of memory questions

2020-11-04 Thread Colletta, Edward
Using Flink 1.9.2 with FsStateBackend, Session cluster. 1. Does heap state get cleaned up when a job is cancelled? We have jobs that we run on a daily basis. We start each morning and cancel each evening. We noticed that the process size does not seem to shrink. We are looking at the

Re: Increase in parallelism has very bad impact on performance

2020-11-04 Thread Arvid Heise
Hi Sidney, could you describe how your aggregation works and how your current pipeline looks like? Is the aggregation partially applied before shuffling the data? I'm a bit lost on how aggregation without keyby looks like. A decrease in throughput may also be a result of more overhead and less

Re: flink 1.11.1 web ui 页面查看source算子的detail数据,recoreds sent等度量状态永远为0

2020-11-04 Thread admin
Hi, 你任务的DAG是什么样子的呢,可能的原因: 1.source本来就没有收到数据,或者没有发送到下游 2.source和下游算子chain在一起看不出来 > 2020年11月4日 下午8:03,Asahi Lee <978466...@qq.com> 写道: > > 你好! > 我的flink程序正常执行,但是我在web > ui监控页面查看source算子的detail信息,里面的Records Sent等度量信息,永远为0。请问是什么问题?

1.11.1 ??OutOfMemoryError: Metaspace. ????

2020-11-04 Thread Asahi Lee
?? ??flink sql,JDBC??mysql??OutOfMemoryError: Metaspace.mysqlconnection??class??

flink 1.11.1 web ui ????????source??????detail??????recoreds sent????????????????0

2020-11-04 Thread Asahi Lee
?? flink??web uisource??detailRecords Sent??0??

Re: 提交flink sql任务报错

2020-11-04 Thread admin
Hi, 你是不是使用的flink 1.11版本,在调用了tableEnv.executeSql,最后又调用了TableEnvironment.execute或StreamExecutionEnvironment.execute方法。 可以参考[1] [1]https://blog.csdn.net/weixin_41608066/article/details/107769826 > 2020年11月4日 下午7:20,丁浩浩

Re:退订订阅

2020-11-04 Thread hailongwang
Hi wangleigis, 退订需发邮件到 user-zh-unsubscr...@flink.apache.org 更多详细情况可以参考[1] [1] https://flink.apache.org/community.html#mailing-lists Best, Hailong Wang 在 2020-11-04 17:59:45,"wangleigis" 写道: > > > >退订 > > > > >-- > >祝:工作顺利,完事如意! > >

提交flink sql任务报错

2020-11-04 Thread 丁浩浩
这个任务通过读取mysql CDC 然后关联之后写入到mysql中,每次提交任务都会报错,但是会正确提交的集群上去,并且成功执行。 我想问问是什么原因? The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators

Re: Increase in parallelism has very bad impact on performance

2020-11-04 Thread Sidney Feiner
You're right, this is scale problem (for me that's performance). As for what you were saying about the data skew, that could be it so I tried running the job without using keyBy and I wrote an aggregator that accumulates the events per country and then wrote a FlatMap that takes that map and

退订订阅

2020-11-04 Thread wangleigis
退订 -- 祝:工作顺利,完事如意!

Re: Missing help about run-application action in Flink CLI client

2020-11-04 Thread Flavio Pompermaier
Here it is: https://issues.apache.org/jira/browse/FLINK-19969 Best, Flavio On Wed, Nov 4, 2020 at 11:08 AM Kostas Kloudas wrote: > Could you also post the ticket here @Flavio Pompermaier and we will > have a look before the upcoming release. > > Thanks, > Kostas > > On Wed, Nov 4, 2020 at

Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-04 Thread Timo Walther
Hi Rex, sorry for the late reply. POJOs will have much better support in the upcoming Flink versions because they have been fully integrated with the new table type system mentioned in FLIP-37 [1] (e.g. support for immutable POJOs and nested DataTypeHints etc). For queries, scalar, and

Re: Flink kafka - Message Prioritization

2020-11-04 Thread Aljoscha Krettek
I'm afraid there's nothing in Flink that would make this possible right now. Have you thought about if this would be possible by using the vanilla Kafka Consumer APIs? I'm not sure that it's possible to read messages with prioritization using their APIs. Best, Aljoscha On 04.11.20 08:34,

Re: Missing help about run-application action in Flink CLI client

2020-11-04 Thread Kostas Kloudas
Could you also post the ticket here @Flavio Pompermaier and we will have a look before the upcoming release. Thanks, Kostas On Wed, Nov 4, 2020 at 10:58 AM Chesnay Schepler wrote: > > Good find, this is an oversight in the CliFrontendParser; no help is > printed for the run-application action.

Re: Does Flink operators synchronize states?

2020-11-04 Thread Yuta Morisawa
Hi Arvid, Thank you for your detailed answer. I read your answer and finally found that I did not understand well on the difference between micro-batch model and continuous(one-by-one) processing model. I am familiar with micro-batch model but not with continuous one. So, I will search some

Re: Missing help about run-application action in Flink CLI client

2020-11-04 Thread Chesnay Schepler
Good find, this is an oversight in the CliFrontendParser; no help is printed for the run-application action. Can you file a ticket? On 11/4/2020 10:53 AM, Flavio Pompermaier wrote: Hello everybody, I was looking into currently supported application-modes when submitting a Flink job so I tried

Missing help about run-application action in Flink CLI client

2020-11-04 Thread Flavio Pompermaier
Hello everybody, I was looking into currently supported application-modes when submitting a Flink job so I tried to use the CLI help (I'm using Flink 11.0) but I can't find any help about he action "run-application" at the moment...am I wrong? Is there any JIRA to address this missing

回复: flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

2020-11-04 Thread wind.fly....@outlook.com
补充一下,sql中dt是timestamp(3)类型,同时是watermark 发件人: wind.fly@outlook.com 发送时间: 2020年11月4日 17:29 收件人: user-zh@flink.apache.org 主题: flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types Hi,all 本人使用flink版本为1.11.0,自定义udaf如下: public class GetContinuousListenDuration

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-11-04 Thread Fy
您好,我也遇到了同样的问题。 MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-flink-mm" not found Back-off restarting failed container 查看对应namespace 下的configmap,flink-config-flink-mm已经存在。但是JobManager pod 还是一直在重试,不能提供服务。 -- Sent from:

flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types

2020-11-04 Thread wind.fly....@outlook.com
Hi,all 本人使用flink版本为1.11.0,自定义udaf如下: public class GetContinuousListenDuration extends AggregateFunction { private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("-MM-dd HH:mm"); @Override @DataTypeHint("ROW") public Row

cdc代码报错

2020-11-04 Thread hl9...@126.com
Hi,all: 我运行ververica/flink-cdc-connectors git上的demo代码,报错: 2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] WARN org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Sink: Print to Std. Out (1/1) (7c3ccf7686ccfb33254e8cb785cd339d) switched from

Re: Multi-stream SQL-like processing

2020-11-04 Thread Jark Wu
Hi Krzysztof, This is a very interesting idea. I think SQL is not a suitable tool for this use case, because SQL is a structured query language where the table schema is fixed and never changes during job running. However, I think it can be a configuration tool project on top of Flink SQL. The

Re: kafka table connector eventTime的问题

2020-11-04 Thread WeiXubin
Hi,我这有一个使用Datastream开发简单例子,接收Kafka(Event Time)数据并进行开窗聚合。Kafka数据格式如:{"word":"a","count":1,"time":1604286564},可以看看该Demo对你是否有所帮助。 public class MyExample { public static void main(String[] args) throws Exception { // 创建环境 StreamExecutionEnvironment env =