?????? ????kafka????????????
??100,0100??100 0,100??(??100)??? -- -- ??: "user-zh"
回复:关于flink sql并行度问题的请教
Hi, HiveTableSource默认会根据数据大小自行分配并发,所以和你设置的最大并发冲突了,你可以设置table. exec. hive. infer-source-parallelism: false来关闭这个功能 Best, Faaron Zheng 在2020年09月04日 15:29,me 写道: val tableConfig = tableEnv.getConfig.getConfiguration tableConfig.setString("table.exec.resource.default-parallelism","4") 已经加了table的并行度设置,但是提示小于104并行度不让执行 Vertex Source: HiveTableSource()'s parallelism (104) is higher than the max parallelism (4). Please lower the parallelism or increase the max parallelism 原始邮件 发件人: me 收件人: user-zh 发送时间: 2020年9月4日(周五) 15:18 主题: 关于flink sql并行度问题的请教 我的代码中使用flinksql,本机idea中测试没有问题,放到集群上跑之后,占用了全部40个slots,并且并行度为114,程序也一直卡着没有输出,是一个加入了时间窗口的flink sql 我再代码中全局设置了,dataStreamEnv.setParallelism(4) dataStreamEnv.setMaxParallelism(4) 但是感觉完全不起作用,请问怎么去限制flink sql的并行度?
回复:flink sql client 如何同时执行多条 sql 语句
Hi, sql-client目前应该是没有这个能力的,它是交互式执行的,我们之前在sql-client的基础上改过一个类似beeline -e/-f的脚本,主要修改的提交任务的地方。 Best, Faaron Zheng 在2020年09月04日 17:04,LittleFall 写道: 我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client 提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql 也会报错。 请问用什么样的方法可以一次性执行多条语句呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:消费kafka数据乱序问题
换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100 ---原始邮件--- 发件人: "wwj"
flink ???? StreamingFileSink ??catalog??????????????
hi, all?? DataStream APIkafka??DataStream ds1?? tableEnvhive catalog?? tableEnv.registerCatalog(catalogName, catalog); tableEnv.useCatalog(catalogName); ??ds1??table Table sourcetable = tableEnv.fromDataStream(ds1); String souceTableName = "music_source"; tableEnv.createTemporaryView(souceTableName, sourcetable); hive CREATE TABLE `dwd_music_copyright_test`( `url` string COMMENT 'url', `md5` string COMMENT 'md5', `utime` bigint COMMENT '', `title` string COMMENT '??', `singer` string COMMENT '??', `company` string COMMENT '', `level` int COMMENT '??.0??,1??acrcloud??,3??') PARTITIONED BY ( `dt` string, `hour` string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test' TBLPROPERTIES ( 'connector'='HiveCatalog', 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file', 'sink.partition-commit.trigger'='partition-time', 'sink.rolling-policy.check-interval'='30s', 'sink.rolling-policy.rollover-interval'='1min', 'sink.rolling-policy.file-size'='1MB'); ??step3??dwd_music_copyright_test flink:1.11 kafka:1.1.1 hadoop:2.6.0 hive:1.2.0 hive catalog??hour=02??hour=03?? show partitions rt_dwd.dwd_music_copyright_test; | dt=2020-08-29/hour=00 | | dt=2020-08-29/hour=01 | | dt=2020-08-29/hour=04 | | dt=2020-08-29/hour=05 | hdfs?? $ hadoop fs -du -h /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/ 4.5 K 13.4 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00 2.0 K 6.1 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01 1.7 K 5.1 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02 1.3 K 3.8 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03 3.1 K 9.2 K /user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04 ??add partition flink WebUI??checkpoint??StreamingFileCommitter?? ?? exactly-once??sink??catalog EXACTLY_ONCE??kafkaisolation.level=read_committed??enable.auto.commit=false??EXACTLY_ONCE?? streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
1.11????????????????????????????????????????????????????????????????
?? ??StreamTableEnvironment.from("") ??package kafka; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class Test3 { public static void main(String[] args) { // StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // ?? String inTablePath = "CREATE TABLE datagen ( " + " id INT, " + " total string, " + " ts AS localtimestamp, " + " WATERMARK FOR ts AS ts " + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='5', " + " 'fields.id.min'='1', " + " 'fields.id.max'='10', " + " 'fields.total.length'='10' " + ")"; // ?? bsTableEnv.executeSql(inTablePath); Table table = bsTableEnv.sqlQuery("select id, total, 12 as col_1 from datagen"); bsTableEnv.createTemporaryView("table1", table); Table table1 = bsTableEnv.from("table1"); System.out.println(table1); // ??table1 Table queryT = bsTableEnv.sqlQuery("select table1.id, 1 as b from table1"); System.out.println(queryT.getSchema()); bsTableEnv.sqlQuery("select table1.id from " + bsTableEnv.from("table1")); } }
Re: 消费kafka数据乱序问题
两个方法 1. kafka 里面可以 keyby, partition 里面都是有序的, 所以每个用户处理都是有序的 2. 就是你说的在 flink 里面做乱序处理 宁吉浩 于2020年9月4日周五 下午5:56写道: > 我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格; > 我的解决办法是把迟到数据丢弃,然后进行业务计算; > 另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算; > 之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次; > > > -- > 发件人:smq <374060...@qq.com> > 发送时间:2020年9月4日(星期五) 17:35 > 收件人:wwj ; user-zh > 主 题:回复:消费kafka数据乱序问题 > > > 换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100 > > ---原始邮件--- > 发件人: "wwj" 发送时间: 2020年9月4日(周五) 下午5:10 > 收件人: "smq"<374060...@qq.com>; > 主题: 回复:消费kafka数据乱序问题 > > > > “假如说先处理了存钱,存上之后余额是100-100+100=100” 这句话没看懂,存上之后余额不应该是 100+100=200 吗? > > > > > > > > > 原始邮件 > > > 发件人:"smq"< 374060...@qq.com >; > > 发件时间:2020/9/4 16:40 > > 收件人:"user-zh"< user-zh@flink.apache.org >; > > 主题:消费kafka数据乱序问题 > > > > 大家好 > > 现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额. > > 如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。 > 这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.
Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow
Hi, all 当指定partition的时候这个问题通过path 也没法解决了 CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING, dt string,) PARTITIONED BY (dt) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a directory 'format' = 'json', -- required: file system connector) select * from MyUserTable limit 10; job 会一直卡在一个地方 [image: image.png] 这种改怎么解决呢? Peihui He 于2020年9月4日周五 下午6:02写道: > hi, all > 我这边用flink sql client 创建表的时候 > > CREATE TABLE MyUserTable ( > column_name1 INT, > column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( > 'connector' = 'filesystem', -- required: specify the connector > 'path' = 'file:///path/to/whatever', -- required: path to a directory > 'format' = 'json', -- required: file system connector) > > 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' > sql client 提交job会很慢,最后会报错 > > Caused by: org.apache.flink.runtime.rest.util.RestClientException: > [Internal server error., org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has > already been submitted. at > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280) > at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at > akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at > akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at > akka.actor.Actor$class.aroundReceive(Actor.scala:517) at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at > akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at > akka.actor.ActorCell.invoke(ActorCell.scala:561) at > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at > akka.dispatch.Mailbox.run(Mailbox.scala:225) at > akka.dispatch.Mailbox.exec(Mailbox.scala:235) at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > End of exception on server side>] at > org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > > > flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 > > 这种情况不知道有没有遇到过? > > Best Wishes. > > >
sql-client checkpoint sql-client
想尝试采用flink-cdc 来清洗数据,但是尝试下来,困于几点: ① 使用sql-client 开启checkpoint ,如果程序挂掉,该如何接着checkpoint,继续执行相应程序。尤其是在执行group by或者是count等操作时该如何办? ② 如果以上方式不行,是否可以采用写代码的形式,重启时指定checkpoint,但还是采用flink-cdc的方式去消费 | | 引领 | | yrx73...@163.com | 签名由网易邮箱大师定制
flink sql 1.11.1 FileSystem SQL Connector path directory slow
hi, all 我这边用flink sql client 创建表的时候 CREATE TABLE MyUserTable ( column_name1 INT, column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///path/to/whatever', -- required: path to a directory 'format' = 'json', -- required: file system connector) 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/' sql client 提交job会很慢,最后会报错 Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., ] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。 这种情况不知道有没有遇到过? Best Wishes.
回复:消费kafka数据乱序问题
我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格; 我的解决办法是把迟到数据丢弃,然后进行业务计算; 另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算; 之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次; -- 发件人:smq <374060...@qq.com> 发送时间:2020年9月4日(星期五) 17:35 收件人:wwj ; user-zh 主 题:回复:消费kafka数据乱序问题 换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100 ---原始邮件--- 发件人: "wwj"
??????????????flink??????????????????
?? Flink+drools drools 2020-9-4 | | | | hold_li...@163.com | ?? ??2020??8??6?? 10:26??samuel@ubtrobot.com ?? flink ,?? ??mysql??json {"times":5} ---5?? {"temperature": 80} ---80 1)kafka 2)flinkkafka?? ?? 1. 2.??flink CEP?? 3.??
Re: 请指教一个关于时间窗的问题,非常感谢!
可否发下是哪个配置,有相关的文档吗? superainbower 于2020年9月4日周五 下午5:24写道: > 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题 > > > | | > superainbower > | > | > superainbo...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年09月4日 15:11,taochanglian 写道: > 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 > > 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key > hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。 > > 在 2020/9/4 13:14, Benchao Li 写道: > 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 > 要处理这种情况,可以了解下idle source[1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources > > samuel@ubtrobot.com 于2020年9月3日周四 下午3:41写道: > > 补充一下环境信息: > > 有点类似以下问题: > 在1.11版本测试flink sql时发现一个问题,用streaming api > 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui > watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka > topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。 > > 不确定是否是因为kafka多分区引起的? > > > > 发件人: samuel@ubtrobot.com > 发送时间: 2020-09-03 09:23 > 收件人: user-zh > 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢! > 谢谢回复! > > 是Flink1.11.1的版本 > > 以下是代码: > package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > import org.apache.commons.collections.map.HashedMap; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.serialization.SimpleStringEncoder; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.common.state.BroadcastState; > import org.apache.flink.api.common.state.MapStateDescriptor; > import org.apache.flink.api.common.state.ReadOnlyBroadcastState; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple4; > import org.apache.flink.api.java.utils.ParameterTool; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.fs.Path; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.runtime.state.filesystem.FsStateBackend; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.BroadcastStream; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > > org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; > import org.apache.flink.streaming.api.functions.co > .BroadcastProcessFunction; > import > org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; > import > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; > import > > org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; > import org.apache.flink.streaming.api.functions.source.RichSourceFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import > org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import com.alibaba.fastjson.JSON; > import com.alibaba.fastjson.JSONArray; > import com.alibaba.fastjson.JSONObject; > import com.alibaba.fastjson.parser.Feature; > import com.ubtechinc.dataplatform.flink.util.AES256; > import com.ubtechinc.dataplatform.flink.util.ConstantStr; > import com.ubtechinc.dataplatform.flink.util.MailUtils; > import com.ubtechinc.dataplatform.flink.util.SmsUtil; > import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; > > import java.sql.DriverManager; > import java.sql.PreparedStatement; > import java.sql.ResultSet; > import com.mysql.jdbc.Connection;
回复: 请指教一个关于时间窗的问题,非常感谢!
1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题 | | superainbower | | superainbo...@163.com | 签名由网易邮箱大师定制 在2020年09月4日 15:11,taochanglian 写道: 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。 在 2020/9/4 13:14, Benchao Li 写道: 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 要处理这种情况,可以了解下idle source[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources samuel@ubtrobot.com 于2020年9月3日周四 下午3:41写道: 补充一下环境信息: 有点类似以下问题: 在1.11版本测试flink sql时发现一个问题,用streaming api 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。 不确定是否是因为kafka多分区引起的? 发件人: samuel@ubtrobot.com 发送时间: 2020-09-03 09:23 收件人: user-zh 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢! 谢谢回复! 是Flink1.11.1的版本 以下是代码: package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.commons.collections.map.HashedMap; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.functions.co .BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import com.ubtechinc.dataplatform.flink.util.AES256; import com.ubtechinc.dataplatform.flink.util.ConstantStr; import com.ubtechinc.dataplatform.flink.util.MailUtils; import com.ubtechinc.dataplatform.flink.util.SmsUtil; import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import com.mysql.jdbc.Connection; import java.sql.Timestamp; import java.text.MessageFormat; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * 使用广播实现动态的配置更新 */ public class ExceptionAlertHour4{ private static final L
flink sql client 如何同时执行多条 sql 语句
我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client 提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql 也会报错。 请问用什么样的方法可以一次性执行多条语句呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/
消费kafka数据乱序问题
大家好 现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额. 如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。 这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.
Re: Re: Re: Re: pyflink-udf 问题反馈
Hi, 推荐你使用ddl来声明你上下游用的connector ``` table_env.execute_sql(""" CREATE TABLE output ( data STRING ARRAY ) WITH ( 'connector' = 'filesystem', -- required: specify the connector 'path' = 'file:///tmp/test.csv', -- required: path to a directory 'format' = 'json', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' ) """) table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result() ``` Best, Xingbo whh_960101 于2020年9月4日周五 下午3:46写道: > 您好,我是想让输出insert_into到目标表中,具体如下: > st_env=StreamExecutionEnvironment.get_execution_environment() > st_env.connect了一个source table(table包含a字段), > 然后 > | st_env.connect(FileSystem().path('tmp')) \ | > | | .with_format(OldCsv() | > | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ | > | | .with_schema(Schema() | > | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ | > | | .create_temporary_table('sink') | > connect了一个sink表,format、schema都是DataTypes.ARRAY() > 然后我定义了一个udf > > @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING())) > def func(a): > rec_list = a.split(',') > res_arr = np.arrary(rec_list,dtype=str) > return res_arr > st_env.register_function("func", func) > st_env.from_path("source").select("func(a)").insert_into("sink") > 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return > res_arr[0],tmp文件里面的字符串就是正确。 > 我想要得到array,该怎么解决? > > > > > > > > > > > > > > > > > > > > 在 2020-09-04 15:17:38,"Xingbo Huang" 写道: > >Hi, > > > >你是调试的时候想看结果吗? > >你可以直接table.to_pandas()来看结果,或者用print connector来看。 > > > >个人觉得to_pandas最简单,比如你可以试试下面的例子 > > > >``` > >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b']) > > > >@udf(input_types=DataTypes.STRING(), > >result_type=DataTypes.ARRAY(DataTypes.STRING())) > >def func(a): > > return np.array([a, a, a], dtype=str) > > > >table_env.register_function("func", func) > > > >table.select("func(b)").to_pandas() > >``` > >然后,你可以看看官方文档[1],让你快速上手PyFlink > > > >Best, > >Xingbo > > > >[1] > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html > > > >whh_960101 于2020年9月4日周五 下午2:50写道: > > > >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗 > >> 我的udf输出了一个numpy.array(dtype = str), > >> result_type设的是DataTypes.ARRAY(DataTypes.STRING()) > >> > >> > 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING())) > >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容 > >> 请问这个问题该怎么解决? > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-09-04 10:35:03,"Xingbo Huang" 写道: > >> >Hi, > >> > > >> > >> > >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事 > >> > > >> >Best, > >> >Xingbo > >> > > >> >whh_960101 于2020年9月4日周五 上午9:26写道: > >> > > >> >> > >> >> > >> > 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值 > >> >> udf定义如下: > >> >> > >> >> > >> > @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING()) > >> >> def fun(data): > >> >> b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错 > >> >> > >> >> > >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错 > >> >> 希望您能给我提供好的解决办法,万分感谢! > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" 写道: > >> >> >Hi, > >> >> > > >> >> >我觉得你从头详细描述一下你的表结构。 > >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的, > >> >> > >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。 > >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1] > >> >> > > >> >> >[1] > >> >> > > >> >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions > >> >> > > >> >> >Best, > >> >> >Xingbo > >> >> > > >> >> > 于2020年9月3日周四 下午9:45写道: > >> >> > > >> >> >> > 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 > >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), > >> DataTypes.STRING()] > >> >> >> > >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 > >> >> >> 或者正确写法是什么样的,感谢解答! > >> >> >> > >> >> >> > >> >> >> | | > >> >> >> whh_960101 > >> >> >> | > >> >> >> | > >> >> >> 邮箱:whh_960...@163.com > >> >> >> | > >> >> >> > >> >> >> 签名由 网易邮箱大师 定制 > >> >> >> > >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道: > >> >> >> Hi, > >> >> >> input_types定义的是每一个列的具体类型。 > >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你 > >> >> >> 正确的写法是 > >> >> >> > >> >> >>input_types=[DataTypes.STRING(), DataTypes.STRING(), > >> >> DataTypes.STRING()] > >> >> >> > >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的) > >> >> >>input_types=DataTypes.Row([DataTypes.FIELD("a", > >> DataTypes.STRING
Re:Re: Re: Re: pyflink-udf 问题反馈
您好,我是想让输出insert_into到目标表中,具体如下: st_env=StreamExecutionEnvironment.get_execution_environment() st_env.connect了一个source table(table包含a字段), 然后 | st_env.connect(FileSystem().path('tmp')) \ | | | .with_format(OldCsv() | | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ | | | .with_schema(Schema() | | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ | | | .create_temporary_table('sink') | connect了一个sink表,format、schema都是DataTypes.ARRAY() 然后我定义了一个udf @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING())) def func(a): rec_list = a.split(',') res_arr = np.arrary(rec_list,dtype=str) return res_arr st_env.register_function("func", func) st_env.from_path("source").select("func(a)").insert_into("sink") 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容(假如res_arr=['1','2','3']),如果我单独返回一个值,比如return res_arr[0],tmp文件就显示'1'。 我想要得到array,该怎么解决? 在 2020-09-04 15:17:38,"Xingbo Huang" 写道: >Hi, > >你是调试的时候想看结果吗? >你可以直接table.to_pandas()来看结果,或者用print connector来看。 > >个人觉得to_pandas最简单,比如你可以试试下面的例子 > >``` >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b']) > >@udf(input_types=DataTypes.STRING(), >result_type=DataTypes.ARRAY(DataTypes.STRING())) >def func(a): > return np.array([a, a, a], dtype=str) > >table_env.register_function("func", func) > >table.select("func(b)").to_pandas() >``` >然后,你可以看看官方文档[1],让你快速上手PyFlink > >Best, >Xingbo > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html > >whh_960101 于2020年9月4日周五 下午2:50写道: > >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗 >> 我的udf输出了一个numpy.array(dtype = str), >> result_type设的是DataTypes.ARRAY(DataTypes.STRING()) >> >> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING())) >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容 >> 请问这个问题该怎么解决? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" 写道: >> >Hi, >> > >> >> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事 >> > >> >Best, >> >Xingbo >> > >> >whh_960101 于2020年9月4日周五 上午9:26写道: >> > >> >> >> >> >> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值 >> >> udf定义如下: >> >> >> >> >> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING()) >> >> def fun(data): >> >> b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错 >> >> >> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错 >> >> 希望您能给我提供好的解决办法,万分感谢! >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" 写道: >> >> >Hi, >> >> > >> >> >我觉得你从头详细描述一下你的表结构。 >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的, >> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。 >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1] >> >> > >> >> >[1] >> >> > >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions >> >> > >> >> >Best, >> >> >Xingbo >> >> > >> >> > 于2020年9月3日周四 下午9:45写道: >> >> > >> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), >> DataTypes.STRING()] >> >> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 >> >> >> 或者正确写法是什么样的,感谢解答! >> >> >> >> >> >> >> >> >> | | >> >> >> whh_960101 >> >> >> | >> >> >> | >> >> >> 邮箱:whh_960...@163.com >> >> >> | >> >> >> >> >> >> 签名由 网易邮箱大师 定制 >> >> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道: >> >> >> Hi, >> >> >> input_types定义的是每一个列的具体类型。 >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你 >> >> >> 正确的写法是 >> >> >> >> >> >>input_types=[DataTypes.STRING(), DataTypes.STRING(), >> >> DataTypes.STRING()] >> >> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的) >> >> >>input_types=DataTypes.Row([DataTypes.FIELD("a", >> DataTypes.STRING()), >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c", >> >> >> DataTypes.STRING())]) >> >> >> >> >> >> Best, >> >> >> Xingbo >> >> >> >> >> >> whh_960101 于2020年9月3日周四 下午9:03写道: >> >> >> >> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid >> >> >> > input_type:input_type should be DataType but contain >> RowField(RECID, >> >> >> > VARCHAR) >> >> >> > 我的pyflink版本:1.11.1 >> >> >> >> >> >>
Re:Re: Re: Re: pyflink-udf 问题反馈
您好,我是想让输出insert_into到目标表中,具体如下: st_env=StreamExecutionEnvironment.get_execution_environment() st_env.connect了一个source table(table包含a字段), 然后 | st_env.connect(FileSystem().path('tmp')) \ | | | .with_format(OldCsv() | | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ | | | .with_schema(Schema() | | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ | | | .create_temporary_table('sink') | connect了一个sink表,format、schema都是DataTypes.ARRAY() 然后我定义了一个udf @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING())) def func(a): rec_list = a.split(',') res_arr = np.arrary(rec_list,dtype=str) return res_arr st_env.register_function("func", func) st_env.from_path("source").select("func(a)").insert_into("sink") 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return res_arr[0],tmp文件里面的字符串就是正确。 我想要得到array,该怎么解决? 在 2020-09-04 15:17:38,"Xingbo Huang" 写道: >Hi, > >你是调试的时候想看结果吗? >你可以直接table.to_pandas()来看结果,或者用print connector来看。 > >个人觉得to_pandas最简单,比如你可以试试下面的例子 > >``` >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b']) > >@udf(input_types=DataTypes.STRING(), >result_type=DataTypes.ARRAY(DataTypes.STRING())) >def func(a): > return np.array([a, a, a], dtype=str) > >table_env.register_function("func", func) > >table.select("func(b)").to_pandas() >``` >然后,你可以看看官方文档[1],让你快速上手PyFlink > >Best, >Xingbo > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html > >whh_960101 于2020年9月4日周五 下午2:50写道: > >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗 >> 我的udf输出了一个numpy.array(dtype = str), >> result_type设的是DataTypes.ARRAY(DataTypes.STRING()) >> >> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING())) >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容 >> 请问这个问题该怎么解决? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" 写道: >> >Hi, >> > >> >> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事 >> > >> >Best, >> >Xingbo >> > >> >whh_960101 于2020年9月4日周五 上午9:26写道: >> > >> >> >> >> >> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值 >> >> udf定义如下: >> >> >> >> >> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING()) >> >> def fun(data): >> >> b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错 >> >> >> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错 >> >> 希望您能给我提供好的解决办法,万分感谢! >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" 写道: >> >> >Hi, >> >> > >> >> >我觉得你从头详细描述一下你的表结构。 >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的, >> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。 >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1] >> >> > >> >> >[1] >> >> > >> >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions >> >> > >> >> >Best, >> >> >Xingbo >> >> > >> >> > 于2020年9月3日周四 下午9:45写道: >> >> > >> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), >> DataTypes.STRING()] >> >> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 >> >> >> 或者正确写法是什么样的,感谢解答! >> >> >> >> >> >> >> >> >> | | >> >> >> whh_960101 >> >> >> | >> >> >> | >> >> >> 邮箱:whh_960...@163.com >> >> >> | >> >> >> >> >> >> 签名由 网易邮箱大师 定制 >> >> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道: >> >> >> Hi, >> >> >> input_types定义的是每一个列的具体类型。 >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你 >> >> >> 正确的写法是 >> >> >> >> >> >>input_types=[DataTypes.STRING(), DataTypes.STRING(), >> >> DataTypes.STRING()] >> >> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的) >> >> >>input_types=DataTypes.Row([DataTypes.FIELD("a", >> DataTypes.STRING()), >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c", >> >> >> DataTypes.STRING())]) >> >> >> >> >> >> Best, >> >> >> Xingbo >> >> >> >> >> >> whh_960101 于2020年9月3日周四 下午9:03写道: >> >> >> >> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid >> >> >> > input_type:input_type should be DataType but contain >> RowField(RECID, >> >> >> > VARCHAR) >> >> >> > 我的pyflink版本:1.11.1 >> >> >> >> >> >>
回复:关于flink sql并行度问题的请教
val tableConfig = tableEnv.getConfig.getConfiguration tableConfig.setString("table.exec.resource.default-parallelism","4") 已经加了table的并行度设置,但是提示小于104并行度不让执行 Vertex Source: HiveTableSource()'s parallelism (104) is higher than the max parallelism (4). Please lower the parallelism or increase the max parallelism 原始邮件 发件人: me 收件人: user-zh 发送时间: 2020年9月4日(周五) 15:18 主题: 关于flink sql并行度问题的请教 我的代码中使用flinksql,本机idea中测试没有问题,放到集群上跑之后,占用了全部40个slots,并且并行度为114,程序也一直卡着没有输出,是一个加入了时间窗口的flink sql 我再代码中全局设置了,dataStreamEnv.setParallelism(4) dataStreamEnv.setMaxParallelism(4) 但是感觉完全不起作用,请问怎么去限制flink sql的并行度?
关于flink sql并行度问题的请教
我的代码中使用flinksql,本机idea中测试没有问题,放到集群上跑之后,占用了全部40个slots,并且并行度为114,程序也一直卡着没有输出,是一个加入了时间窗口的flink sql 我再代码中全局设置了,dataStreamEnv.setParallelism(4) dataStreamEnv.setMaxParallelism(4) 但是感觉完全不起作用,请问怎么去限制flink sql的并行度?
Re: Re: Re: pyflink-udf 问题反馈
Hi, 你是调试的时候想看结果吗? 你可以直接table.to_pandas()来看结果,或者用print connector来看。 个人觉得to_pandas最简单,比如你可以试试下面的例子 ``` table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b']) @udf(input_types=DataTypes.STRING(), result_type=DataTypes.ARRAY(DataTypes.STRING())) def func(a): return np.array([a, a, a], dtype=str) table_env.register_function("func", func) table.select("func(b)").to_pandas() ``` 然后,你可以看看官方文档[1],让你快速上手PyFlink Best, Xingbo [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html whh_960101 于2020年9月4日周五 下午2:50写道: > 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗 > 我的udf输出了一个numpy.array(dtype = str), > result_type设的是DataTypes.ARRAY(DataTypes.STRING()) > > 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING())) > 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容 > 请问这个问题该怎么解决? > > > > > > > > > > > > > > > > > > 在 2020-09-04 10:35:03,"Xingbo Huang" 写道: > >Hi, > > > > >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事 > > > >Best, > >Xingbo > > > >whh_960101 于2020年9月4日周五 上午9:26写道: > > > >> > >> > 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值 > >> udf定义如下: > >> > >> > @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING()) > >> def fun(data): > >> b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错 > >> > >> > >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错 > >> 希望您能给我提供好的解决办法,万分感谢! > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-09-03 22:23:28,"Xingbo Huang" 写道: > >> >Hi, > >> > > >> >我觉得你从头详细描述一下你的表结构。 > >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的, > >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。 > >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1] > >> > > >> >[1] > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions > >> > > >> >Best, > >> >Xingbo > >> > > >> > 于2020年9月3日周四 下午9:45写道: > >> > > >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法 > >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(), > DataTypes.STRING()] > >> >> > >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗 > >> >> 或者正确写法是什么样的,感谢解答! > >> >> > >> >> > >> >> | | > >> >> whh_960101 > >> >> | > >> >> | > >> >> 邮箱:whh_960...@163.com > >> >> | > >> >> > >> >> 签名由 网易邮箱大师 定制 > >> >> > >> >> 在2020年09月03日 21:14,Xingbo Huang 写道: > >> >> Hi, > >> >> input_types定义的是每一个列的具体类型。 > >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你 > >> >> 正确的写法是 > >> >> > >> >>input_types=[DataTypes.STRING(), DataTypes.STRING(), > >> DataTypes.STRING()] > >> >> > >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的) > >> >>input_types=DataTypes.Row([DataTypes.FIELD("a", > DataTypes.STRING()), > >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c", > >> >> DataTypes.STRING())]) > >> >> > >> >> Best, > >> >> Xingbo > >> >> > >> >> whh_960101 于2020年9月3日周四 下午9:03写道: > >> >> > >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid > >> >> > input_type:input_type should be DataType but contain > RowField(RECID, > >> >> > VARCHAR) > >> >> > 我的pyflink版本:1.11.1 > >> >> > >> >
Re: 回复:请指教一个关于时间窗的问题,非常感谢!
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。 在 2020/9/4 13:14, Benchao Li 写道: 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 要处理这种情况,可以了解下idle source[1] [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources samuel@ubtrobot.com 于2020年9月3日周四 下午3:41写道: 补充一下环境信息: 有点类似以下问题: 在1.11版本测试flink sql时发现一个问题,用streaming api 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。 不确定是否是因为kafka多分区引起的? 发件人: samuel@ubtrobot.com 发送时间: 2020-09-03 09:23 收件人: user-zh 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢! 谢谢回复! 是Flink1.11.1的版本 以下是代码: package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.commons.collections.map.HashedMap; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.functions.co .BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import com.ubtechinc.dataplatform.flink.util.AES256; import com.ubtechinc.dataplatform.flink.util.ConstantStr; import com.ubtechinc.dataplatform.flink.util.MailUtils; import com.ubtechinc.dataplatform.flink.util.SmsUtil; import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import com.mysql.jdbc.Connection; import java.sql.Timestamp; import java.text.MessageFormat; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * 使用广播实现动态的配置更新 */ public class ExceptionAlertHour4{ private static final Logger LOG = LoggerFactory.getLogger(ExceptionAlertHour4.class); public static void main(String[] args