Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Hi, 我想请问下使用 streamExecutionEnv.execute("from kafka sink hbase"),通过这种方式可以给Job指定名称。 而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。 请问有什么解决方案吗?谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
Hi, 我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。 而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。 请问有什么解决的方法吗? 在 2020-07-08 16:07:17,"Jingsong Li" 写道: >Hi, > >你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。 > >所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")" >并没有真正的物理节点。你不用再调用了。 > >Best, >Jingsong > >On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach wrote: > >> >> >> >> 代码结构改成这样的了: >> >> >> >> >> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment >> >> val blinkEnvSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >> >> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv, >> blinkEnvSettings) >> >> >> >> >> >> streamExecutionEnv.execute("from kafka sink hbase") >> >> >> >> >> 还是报一样的错 >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-08 15:40:41,"夏帅" 写道: >> >你好, >> >可以看看你的代码结构是不是以下这种 >> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >> >val bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build >> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings) >> > .. >> >tableEnv.execute("") >> >如果是的话,可以尝试使用bsEnv.execute("") >> >1.11对于两者的execute代码实现有改动 >> > >> > >> >-- >> >发件人:Zhou Zach >> >发送时间:2020年7月8日(星期三) 15:30 >> >收件人:Flink user-zh mailing list >> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology >> > >> >代码在flink >> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常: >> >Exception in thread "main" java.lang.IllegalStateException: No operators >> defined in streaming topology. Cannot generate StreamGraph. >> >at >> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) >> >at >> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) >> >at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) >> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79) >> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala) >> > >> > >> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。 >> > >> > >> > >> > >> >query: >> >streamTableEnv.executeSql( >> > """ >> >| >> >|CREATE TABLE `user` ( >> >|uid BIGINT, >> >|sex VARCHAR, >> >|age INT, >> >|created_time TIMESTAMP(3), >> >|WATERMARK FOR created_time as created_time - INTERVAL '3' >> SECOND >> >|) WITH ( >> >|'connector.type' = 'kafka', >> >|'connector.version' = 'universal', >> >|-- 'connector.topic' = 'user', >> >|'connector.topic' = 'user_long', >> >|'connector.startup-mode' = 'latest-offset', >> >|'connector.properties.group.id' = 'user_flink', >> >|'format.type' = 'json', >> >|'format.derive-schema' = 'true' >> >|) >> >|""".stripMargin) >> > >> > >> > >> > >> > >> > >> >streamTableEnv.executeSql( >> > """ >> >| >> >|CREATE TABLE user_hbase3( >> >|rowkey BIGINT, >> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR) >> >|) WITH ( >> >|'connector.type' = 'hbase', >> >|'connector.version' = '2.1.0', >> >|'connector.table-name' = 'user_hbase2', >> >|'connector.zookeeper.znode.parent' = '/hbase', >> >|'connector.write.buffer-flush.max-size' = '10mb', >> >|'connector.write.buffer-flush.max-rows' = '1000', >> >|'connector.write.buffer-flush.interval' = '2s' >> >|) >> >|""".stripMargin) >> > >> > >> >streamTableEnv.executeSql( >> > """ >> >| >> >|insert into user_hbase3 >> >|SELECT uid, >> >| >> >| ROW(sex, age, created_time ) as cf >> >| FROM (select uid,sex,age, cast(created_time as VARCHAR) as >> created_time from `user`) >> >| >> >|""".stripMargin) >> > >> > >> > >> > >> > >> > >> > >> > >> > > >-- >Best, Jingsong Lee
Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology
感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.11查询结果每秒入库到mysql数量很少
Hi, 你可以尝试改写url,加上rewritebatchedstatements=true,如下: jdbc:mysql://198.2.2.71:3306/bda?useSSL=false&rewritebatchedstatements=true MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true, 驱动才会帮你批量执行SQL。 祝好 weixubin -- Sent from: http://apache-flink.147419.n8.nabble.com/
关于Flink1.11 CSV Format的一些疑问
Hi,我在Flink1.11版本,使用filesystem connector的时候,读取csv文件并输出到另外一个csv文件遇到了些问题,问题如下: 问题1:sink 的path指定具体输出文件名,但是输出的结果是 文件夹形式 问题2:在flink1.11的文档中没有找到csv的 ignore-first-line 忽略第一行这个配置 测试数据 11101322000220200517145507667060666706;9 11101412000220200515163257249700624970;9 11101412010220200514163709315410631541;9 11101712050220200516173624737150673715;9 11101312000220200516184127322880632288;9 CREATE TABLE source_table ( face_id STRING, p_id STRING ) WITH ( 'connector' = 'filesystem', 'path' = 'E:\label_file.csv', 'format' = 'csv', 'csv.field-delimiter'=';' ); CREATE TABLE sink_table ( face_id STRING, p_id STRING ) WITH ( 'connector' = 'filesystem', 'path' = 'E:\label_file2.csv', 'csv.disable-quote-character' ='true', 'format' = 'csv', 'csv.field-delimiter'=';' ); INSERT INTO sink_table SELECT face_id,p_id FROM source_table; -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于Flink1.11 CSV Format的一些疑问
感谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink1.10 sink to mysql SocketException
Hi, 你可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681,似乎与你问题相关。 由于长时间没有数据导致 connection 断开问题,该问题已经在1.11版本修复。 Best, Weixubin -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: kafka table connector eventTime的问题
Hi,我这有一个使用Datastream开发简单例子,接收Kafka(Event Time)数据并进行开窗聚合。Kafka数据格式如:{"word":"a","count":1,"time":1604286564},可以看看该Demo对你是否有所帮助。 public class MyExample { public static void main(String[] args) throws Exception { // 创建环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 设置时间特性为 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 水印策略 WatermarkStrategy watermarkStrategy = WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner() { @Override public long extractTimestamp(WC wc, long l) { return wc.getEventTime() * 1000; } }); // Kafka 配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "Kafka地址:9092"); properties.setProperty("group.id", "test"); env.addSource(new FlinkKafkaConsumer<>("flinktest1", new JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest()) // map 构建 WC 对象 .map(new MapFunction() { @Override public WC map(ObjectNode jsonNode) throws Exception { JsonNode valueNode = jsonNode.get("value"); WC wc = new WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong()); return wc; } }) // 设定水印策略 .assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(WC::getWord) // 窗口设置,这里设置为滚动窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 设置窗口延迟 .allowedLateness(Time.seconds(2)) .reduce(new ReduceFunction() { @Override public WC reduce(WC wc, WC t1) throws Exception { return new WC(wc.getWord(), wc.getCount() + t1.getCount()); } }) .print(); env.execute(); } static class WC { public String word; public int count; public long eventTime; public long getEventTime() { return eventTime; } public void setEventTime(long eventTime) { this.eventTime = eventTime; } public String getWord() { return word; } public void setWord(String word) { this.word = word; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public WC(String word, int count) { this.word = word; this.count = count; } public WC(String word, int count,long eventTime) { this.word = word; this.count = count; this.eventTime = eventTime; } @Override public String toString() { return "WC{" + "word='" + word + '\'' + ", count=" + count + '}'; } } } -- Sent from: http://apache-flink.147419.n8.nabble.com/
关于 Flink on K8S Deploy Job Cluster 部署问题
我们打算采用 Flink on K8S Job Cluster(perjob)的部署方式。我们使用taskmanager-job-deployment.yaml 在K8S启动taskmananger,副本数为2,每个taskMananger的solt为8。我们把TaskMananger理解为资源池,当有一个Job启动时,会根据任务情况自动分配一定数量的TaskMananger给它,当它用完时把TaskMananger归还。 当我们使用 jobmanager-job.yaml 启动Job(Job只需要一个solt)时候,发现该Job会占用这两个TaskMananger,即使其并不需要那么多solt。这导致第二个Job启动时没有可用的TaskMananger,导致资源浪费。 问题: 是否pre-job模式每次启动都是需要创建 taskmanager-job-deployment.yaml 和 jobmanager-job.yaml,然后这部分taskmananger归属于这个job,当运行完需要销毁掉 taskmanager?但这样就会导致每次都要创建和销毁taskmanager Thanks, Bin -- Sent from: http://apache-flink.147419.n8.nabble.com/
类型转换问题 String 类型如何转 decimal类型
source 端接收到的数据类型为 String, sink 端 MySQL 数据库字段类型定义为 decimal(12, 2) , 在编写 insert 语句时该如何把 String 类型转换为 decimal, 尝试了 flink build-in 的 cast 并不行,请问各位有什么好的方法? -- Sent from: http://apache-flink.147419.n8.nabble.com/
关于 flinksql 维表的问题
我想实现将MySQL中的 A 表数据预先查询出来进行缓存,用于给流表 B 进行 join关联。接下来定时查询并更新 A 表内的缓存数据,请问目前 FlinkSQL 可以实现吗? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于 flinksql 维表的问题
感谢各位的建议 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL 1.11.3问题请教
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 [{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL 编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到 sink。 Row row = new Row(arity); collect(row); 具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/ Best,Weixubin -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于 flinksql 维表的问题
你好,可以麻烦详细描述一下吗? 谢谢 -- Sent from: http://apache-flink.147419.n8.nabble.com/
FlinkSQL cannot update pk column UID to expr
基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable 维表。 Flink 版本 1.12.2 场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出 场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key 场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED 则报错,主要报错信息: java.sql.BatchUpdateException: [3, 2021060816420117201616500303151172567] cannot update pk column UID to expr 注:此处使用的MySQL 是阿里的 ADB,建表SQL如下 Create Table `v2_dwd_root_game_uid_reg_log` ( `uid` bigint NOT NULL DEFAULT '0' COMMENT '注册uid', `user_name` varchar NOT NULL DEFAULT '', // 此处省略其他字段 primary key (`uid`,`platform`,`root_game_id`) ) DISTRIBUTE BY HASH(`uid`) INDEX_ALL='Y' STORAGE_POLICY='HOT' COMMENT='按根游戏账号注册日志'; 下面是场景3的SQL语句: // Kafka Source CREATE TABLE KafkaTable ( message STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'xxx', 'properties.bootstrap.servers' = 'xxx', 'properties.group.id' = 'x', 'scan.startup.mode' = 'group-offsets', 'format' = 'json' ); // 维表 CREATE TABLE DimTable ( game_id BIGINT, root_game_id BIGINT, main_game_id BIGINT, platform VARCHAR ) WITH ( 'connector' = 'jdbc', 'url' = '', 'table-name' = 'v2_dim_game_id', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'x', 'password' = '', 'lookup.cache.max-rows'='5000', 'lookup.cache.ttl' = '60s', 'lookup.max-retries'='3' ); // MySQL输出 CREATE TABLE sinktable ( uid BIGINT, root_game_id BIGINT, game_id BIGINT, platform VARCHAR, //省略其它字段 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'xx', 'table-name' = 'v2_dwd_root_game_uid_reg_log', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = 'xxx', 'password' = '', 'sink.buffer-flush.interval'='5s', 'sink.buffer-flush.max-rows' = '10' ); // 插入(关联维表) INSERT INTO sinktable select IF(IsInvalidValue(k.uid), 0 , CAST(k.uid AS BIGINT)) as uid, IF((k.game_id IS NULL), 0 , k.game_id) as game_id, d.platform as platform, d.root_game_id as root_game_id, // 省略其它字段 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message, 'uid,game_id(BIGINT),platform' )) as k LEFT JOIN DimTable as d ON k.game_id = d.game_id and k.platform = d.platform; -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL cannot update pk column UID to expr
详细的异常打印信息如下: java.sql.BatchUpdateException: [3, 2021060816420017201616500303151172306] cannot update pk column UID to expr at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.cj.util.Util.handleNewInstance(Util.java:192) at com.mysql.cj.util.Util.getInstance(Util.java:167) at com.mysql.cj.util.Util.getInstance(Util.java:174) at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224) at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755) at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426) at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796) at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL cannot update pk column UID to expr
我把表移动到了普通 MYSQL,可以正常运行。 经过排查应该是 ADB 建表 SQL 中的 DISTRIBUTE BY HASH(`uid`) 所导致,该语法用于处理数据倾斜问题。 看起来似乎是联表 join 的时候要求定义主键,但是定义主键后会转换为 upsert 流,而 ADB 中定义了 DISTRIBUTE BY 与 upsert 冲突了,不知道是否这么理解 -- Sent from: http://apache-flink.147419.n8.nabble.com/
FlinkSQL join 维表后一定会变成 upsert流吗?
请教各位一下,我使用 FlinkSQL 编写任务时,kafka source -> MySQL sink 不设置主键,查看了一下 request mode 是 [INSERT] ,也就是普通的 append 流,这很正常。 但是当我关联上维表后,发现 request mode 变成了 [INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE],这时异常报错会要求我给 sink 表设置主键,当我设置上主键后就会变成了 upsert 流。 upsert流底层实现原理是 INSERT INTO ... DUPLICATE KEY UPDATE,由于我采用的是阿里云的ADB数据库,该语法在 ADB 中主键是不支持update的,这会导致报错。且业务上我只想以 append 流的形式插入表。 请问各位有什么好的解决方案吗,关联上维表但是还是保持 append流? // 维表 CREATE TABLE DimTable ( //省略字段 ) WITH ( 'connector' = 'jdbc', 'url' = '***', 'table-name' = 'v2_dim_game_id', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = '***', 'password' = '***', 'lookup.cache.max-rows'='5000', 'lookup.cache.ttl' = '60s', 'lookup.max-retries'='3' ); -- Sent from: http://apache-flink.147419.n8.nabble.com/
关于 LEFT JOIN DimTable FOR SYSTEM_TIME AS OF 的异常求教!
异常:column 'record_time' not found in table 'k' 异常描述:KafkaTable k 表在与维表进行 look up join 时定义了别名,之后报在 k 表中没有定义 record_time 字段。 Flink 版本: 1.12.2 // Source 表 CREATE TABLE KafkaTable ( message STRING, record_time TIMESTAMP(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka', ); // 维表 CREATE TEMPORARY TABLE DimTable ( game_id BIGINT, game_name VARCHAR, root_game_id BIGINT, main_game_id BIGINT, platform VARCHAR ) WITH ( 'connector' = 'jdbc', ); // 处理语句 INSERT INTO sinktable select // 省略字段 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message, 'uid,game_id(BIGINT),platform' )) *as k* LEFT JOIN DimTable FOR SYSTEM_TIME AS OF k.record_time as d ON k.game_id = d.game_id and k.platform = d.platform; -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于 LEFT JOIN DimTable FOR SYSTEM_TIME AS OF 的异常求教!
已解决,问题在于 Kafka 不是直接 join 维表,而是先和 UDTF join,之后整体才与维表 Join。 所以之前起别名的位置有误。导致找不到字段、改造如下: 先前写法: INSERT INTO sinktable select // 省略字段 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message, 'uid,game_id(BIGINT),platform' )) *as k* LEFT JOIN DimTable FOR SYSTEM_TIME AS OF k.record_time as d ON k.game_id = d.game_id and k.platform = d.platform; 改造写法: INSERT INTO sinktable select // 省略字段 from ( select * from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message, 'uid,game_id(BIGINT),platform' ))) as k JOIN DimTable FOR SYSTEM_TIME AS OF k.procTime as d ON k.game_id = d.game_id and k.platform = d.platform; -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:FlinkSQL join 维表后一定会变成 upsert流吗?
感谢你的回答,我这边看了官网目前 join 一共可以分为 Regular Joins 、 Interval Joins 以及 Temporal Joins 三大类。 我上面问题所述的确是采用了 Regular Joins 的方式。 之后我也尝试使用了 Lookup Join 但发现其最后也是转为 INSERT INTO ON DUPLICATE KEY UPDATE 的执行语句, 并不是我所期望的纯 append 模式 -- Sent from: http://apache-flink.147419.n8.nabble.com/
standalone K8S 如何查看 TaskMananger 的 gc.log ?
请问 *standalone K8S* 部署模式为 *Deploy Application Cluster* 在哪获取查看/怎么配置 TaskMananger 的 *gc.log* 日志? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re:回复: 关于拓展 Tuple元组的问题
感谢你的回答,请问可否举一个参照例子? 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" 写道: > > 多个值组合在一起,当一个复合值使用! > > > > >发件人: 魏旭斌 >发送时间: 2020-06-19 15:01 >收件人: user-zh >主题: 关于拓展 Tuple元组的问题 >目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。 >请问有什么解决的方案? 谢谢
Re:Re: 回复: 关于拓展 Tuple元组的问题
感谢,这边做了一下简单测试,已解决~ 真香 //Demo import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; public class ParseUriRow extends TableFunction { public void eval(String data) { //逻辑处理 } @Override public TypeInformation getResultType() { return Types.ROW( Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING ); } } 在 2020-06-19 15:46:42,"Jark Wu" 写道: >用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么? > > >On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote: > >> 感谢你的回答,请问可否举一个参照例子? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" < >> wangweigu...@stevegame.cn> 写道: >> > >> > 多个值组合在一起,当一个复合值使用! >> > >> > >> > >> > >> >发件人: 魏旭斌 >> >发送时间: 2020-06-19 15:01 >> >收件人: user-zh >> >主题: 关于拓展 Tuple元组的问题 >> >目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。 >> 请问有什么解决的方案? 谢谢 >>
Re:flink 通过 --classpath 传入https-xxx.xxx.jar 在1.8上正常运行 在flink1.10 就会报传入的jar包找不到
和版本应该没什么关系。如果是多节点部署的情况下,-C 所指定的URL 需要各个节点都能访问得到。 确认下该URL能被所有节点访问到吗 Best, Bin At 2020-06-22 11:43:11, "程龙" <13162790...@163.com> wrote: >2020-06-22 10:16:34,379 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed >(6/6) (5dd98ec92e0d5e53597cb7520643c7f5) switched from SCHEDULED to DEPLOYING. >2020-06-22 10:16:34,379 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying >Sink: Unnamed (6/6) (attempt #0) to container_1590655249980_0081_01_02 @ >al-bj-bigdata-inf-research-flink04 (dataPort=34781) >2020-06-22 10:16:34,456 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom >Source (5/6) (aca298438b9eb6fcf295cb8af6eebcd8) switched from DEPLOYING to >RUNNING. >2020-06-22 10:16:34,481 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map >(3/8) (0daed15d107c3031891f0c9e84093068) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,492 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed >(1/6) (44ca248aba351026452ba4abdb5f33a6) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,497 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map >(1/8) (70612735eb755269fe9590e8ab51d3e2) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,512 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map >(2/8) (baba331cd6bcde1f5a6024eac0b953b4) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,524 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom >Source (4/6) (a34992362c2cf3634a29bd5a9c188754) switched from DEPLOYING to >RUNNING. >2020-06-22 10:16:34,531 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom >Source (2/6) (7683c15e3ebb3e718c2439c6e32f0d7d) switched from DEPLOYING to >RUNNING. >2020-06-22 10:16:34,564 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom >Source (6/6) (6ab1aaa5e1811c79c702197f984e9bb6) switched from DEPLOYING to >RUNNING. >2020-06-22 10:16:34,609 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom >Source (1/6) (6245b508b8e0494f06ef71c6ad4954b6) switched from DEPLOYING to >RUNNING. >2020-06-22 10:16:34,616 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map >(4/8) (2ea049476f0b48fcd85dcd9084091e9f) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,650 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed >(2/6) (e785e44e4212dcc5279bcde761b28292) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,656 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed >(5/6) (97a5e72f731d14aa97d93253b71b6aeb) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,662 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map >(7/8) (59babdbc2d7bea362a2794a966fe59ef) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,664 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom >Source (3/6) (aed33b78cc847561908ea43164e0311a) switched from DEPLOYING to >RUNNING. >2020-06-22 10:16:34,669 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map >(6/8) (fd87d0111b10e9f9027068a72e9ce209) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,726 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed >(6/6) (5dd98ec92e0d5e53597cb7520643c7f5) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,729 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map >(8/8) (22b73fa7f7435a405e1102a3480c09c1) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:34,760 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed >(4/6) (4c677579ef44cf394618af38a75497da) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:37,081 INFO >org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >checkpoint 1 @ 1592792197072 for job 1797e2f64b7b1caeb6356608290263cc. >2020-06-22 10:16:45,065 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed >(3/6) (d9a9c913dcbf782bd933b0adae157b38) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:45,066 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map >(5/8) (39dbdd04e2066d1d93be1641c0ab7add) switched from DEPLOYING to RUNNING. >2020-06-22 10:16:48,512 INFO >org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map >(3/8) (0daed15d107c3031891f0c9e84093068) switched from RUNNING to FAILED on >org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@25d7d2c5. >java.lang.NoClassDefFoundError: core > > > >看日志是在部分task里面报错
FlinkSQL 是否支持类似临时中间表的概念
Hi, 我希望通过FlinkSQL的方式在一个Job中完成两步的操作,但似乎办不到,情况大致如下: eg.有一个ETL过程,需要从Source获取数据--将每条数据拆分为一条多列数据--对拆分完的数据开窗聚合--输出到sink。 //从Source获取数据 CREATE TABLE sourceTable ( request_uri STRING ) WITH ( .. ); //这个时候我希望能够创建一张临时中间表 tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 并不支持这么做) CREATE TABLE tempTable ( row1 STRING, row2 STRING, ) Insert into tempTable select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( ) //最后从 tempTable 表中获取数据并进行开窗做聚合操作 CREATE TABLE sinkTable ( row1 STRING, ) INSERT INTO sinkTable SELECT .., SUM(unit) AS unitSum from tempTable GROUP BY TUMBLE(rowtime,INTERVAL '30' SECOND), ... 以上是大致情况描述,我希望能在一个Job中一次性完成以上数据处理,而不分成两个Job,不知是否有好的解决方案? Thank!
Re:Re: FlinkSQL 是否支持类似临时中间表的概念
感谢,我查阅了下资料后发现CREATE VIEW这个语法是在Flink.1.12有提及而1.10版本没有 ,1.12版本暂未发行, 而我目前使用的版本是1.10版本。 而且很奇怪,我并没有找到1.11版本的文档 在 2020-06-23 10:58:25,"Leonard Xu" 写道: >Hi, > >> 在 2020年6月23日,10:49,Weixubin <18925434...@163.com> 写道: >> >> //这个时候我希望能够创建一张临时中间表 tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink >> 并不支持这么做) > > >看着描述应该是源数据中的一行拆成多行。这个需求是不是用 VIEW 就可以了[1]?Flink SQL 支持 VIEW 语法的[1]。 > >Best, >Leonard Xu > >[1] >https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html#create-view > ><https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html#create-view>
Re:Re: FlinkSQL 是否支持类似临时中间表的概念
Hi, 关于这句 “把 ` select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了” 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? 可否简单举个例子。 Thanks, Bin 在 2020-06-23 11:57:28,"Leonard Xu" 写道: >Hi, >是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master >分支上的版本号为1.12-SNAPSHOT >,等1.11版本发布了就可以看到对应的文档。 > >回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 ` >select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` > 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。 >另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。 > > >祝好, >Leonard Xu
Re:Re: FlinkSQL 是否支持类似临时中间表的概念
感谢你提供了子查询的思路,不过经过试验后有点可惜,这似乎还是满足不了我们的需求。 我们的场景是从阿里云SLS读取消息。每条消息有一个字段是request_uri。 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。 第二个数据处理过程就是将每条记录里的 声明heart_time为事件时间属性并使用5秒延迟水印策略,进行开窗聚合处理。 //如果应用到子查询的话,Flink是不支持这样做的。WATERMARK FOR 水印声明只能在DDL里应用。如下: select ..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') , WATERMARK FOR ts AS ts - INTERVAL '1' SECOND from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….· //如果应用到source,则一开始并不知道heart_time 的值 CREATE TABLE sourceTable ( request_uri STRING ..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') , WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( ... ); 只能等待Flink 1.11 尝试是否可以用View作为中间临时表,并对View进行 WATERMARK水印声明 Thanks Bin 在 2020-06-23 15:28:50,"Leonard Xu" 写道: >Hi >我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select > * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` >的这段sql是可以复用的,就和 VIEW的作用类似。 > >如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable , LATERAL >TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by, 插入最终的结果表就能满足需求了吧。 > >祝好, >Leonard Xu > > >> 在 2020年6月23日,15:21,Weixubin <18925434...@163.com> 写道: >> >> >> >> >> Hi, >> 关于这句 “把 ` select * from sourceTable , LATERAL >> TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 >> group后再写入最终结果表就可以了” >> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? >> 可否简单举个例子。 >> Thanks, >> Bin >> >> >> >> >> >> >> >> >> >> >> 在 2020-06-23 11:57:28,"Leonard Xu" 写道: >>> Hi, >>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master >>> 分支上的版本号为1.12-SNAPSHOT >>> ,等1.11版本发布了就可以看到对应的文档。 >>> >>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 >>> ` select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as >>> T( )….` 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 >>> planner 会做分段优化。 >>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。 >>> >>> >>> 祝好, >>> Leonard Xu >
Re:Re: FlinkSQL 是否支持类似临时中间表的概念
感谢 Leonard Xu 与 Jark 两位,已成功解决满足需求! 对于 chenxuying 所提出的问题,我也很感兴趣。 由于使用UDF重复解析两遍,不知是否有更好的替代方法 Thanks Bin 在 2020-06-24 12:32:27,"Leonard Xu" 写道: >Hello, > >你的需求其实是要 抽取记录的字段定义watermark, 这个只能放到source 表的DDL中,view上也不支持的。 >1.10里的计算列 + udf 应该就可以满足你的需求, 大概长这样: > >CREATE TABLE sourceTable ( > request_uri STRING, > ts as extractTsUdf(request_uri), > WATERMARK FOR ts AS ts - INTERVAL '5' SECOND > >) WITH ( > .. >); > >select ... from ( >select ts, T.* from >sourceTable sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T(...) >) t >group by TUMBLE(ts, INTERVAL '30' SECOND) > >祝好, >Leonard > > >> 在 2020年6月24日,12:09,Weixubin <18925434...@163.com> 写道: >> >> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。 >