?????? ????kafka????????????

2020-09-04 文章 ????????
??100,0100??100
0,100??(??100)???






--  --
??: 
   "user-zh"



回复:关于flink sql并行度问题的请教

2020-09-04 文章 faaron zheng
Hi, HiveTableSource默认会根据数据大小自行分配并发,所以和你设置的最大并发冲突了,你可以设置table. exec. hive. 
infer-source-parallelism: false来关闭这个功能 Best, Faaron Zheng 在2020年09月04日 15:29,me 
写道: val tableConfig = tableEnv.getConfig.getConfiguration 
tableConfig.setString("table.exec.resource.default-parallelism","4") 
已经加了table的并行度设置,但是提示小于104并行度不让执行 Vertex Source: HiveTableSource()'s 
parallelism (104) is higher than the max parallelism (4). Please lower the 
parallelism or increase the max parallelism 原始邮件 发件人: me 收件人: 
user-zh 发送时间: 2020年9月4日(周五) 15:18 主题: 关于flink 
sql并行度问题的请教 
我的代码中使用flinksql,本机idea中测试没有问题,放到集群上跑之后,占用了全部40个slots,并且并行度为114,程序也一直卡着没有输出,是一个加入了时间窗口的flink
 sql 我再代码中全局设置了,dataStreamEnv.setParallelism(4) 
dataStreamEnv.setMaxParallelism(4) 但是感觉完全不起作用,请问怎么去限制flink sql的并行度?

回复:flink sql client 如何同时执行多条 sql 语句

2020-09-04 文章 faaron zheng
Hi, sql-client目前应该是没有这个能力的,它是交互式执行的,我们之前在sql-client的基础上改过一个类似beeline 
-e/-f的脚本,主要修改的提交任务的地方。 Best, Faaron Zheng 在2020年09月04日 17:04,LittleFall 写道: 
我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client 
提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql 
也会报错。 请问用什么样的方法可以一次性执行多条语句呢? -- Sent from: 
http://apache-flink.147419.n8.nabble.com/

回复:消费kafka数据乱序问题

2020-09-04 文章 smq
换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100

---原始邮件---
发件人: "wwj"

flink ???? StreamingFileSink ??catalog??????????????

2020-09-04 文章 MuChen
hi, all??




DataStream APIkafka??DataStream ds1??

tableEnvhive catalog??
tableEnv.registerCatalog(catalogName, catalog); 
tableEnv.useCatalog(catalogName); 
??ds1??table
Table sourcetable = tableEnv.fromDataStream(ds1); String souceTableName = 
"music_source"; tableEnv.createTemporaryView(souceTableName, sourcetable); 
hive
CREATE TABLE `dwd_music_copyright_test`(   `url` string COMMENT 'url',   `md5` 
string COMMENT 'md5',   `utime` bigint COMMENT '',   `title` string COMMENT 
'??',   `singer` string COMMENT '??',   `company` string COMMENT 
'',   `level` int COMMENT 
'??.0??,1??acrcloud??,3??') PARTITIONED BY (   `dt` 
string,   `hour` string) ROW FORMAT SERDE   
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS 
INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
LOCATION   
'hdfs://Ucluster/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test' 
TBLPROPERTIES (   'connector'='HiveCatalog',   
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',   
'sink.partition-commit.delay'='1 min',   
'sink.partition-commit.policy.kind'='metastore,success-file',   
'sink.partition-commit.trigger'='partition-time',   
'sink.rolling-policy.check-interval'='30s',   
'sink.rolling-policy.rollover-interval'='1min',   
'sink.rolling-policy.file-size'='1MB'); 
??step3??dwd_music_copyright_test


flink:1.11 kafka:1.1.1 hadoop:2.6.0 hive:1.2.0 


hive 
catalog??hour=02??hour=03??
show partitions rt_dwd.dwd_music_copyright_test; | dt=2020-08-29/hour=00  | | 
dt=2020-08-29/hour=01  | | dt=2020-08-29/hour=04  | | dt=2020-08-29/hour=05  | 
hdfs??
$ hadoop fs -du -h 
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/ 4.5 K   
13.4 K  
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=00 
2.0 K   6.1 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=01 
1.7 K   5.1 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=02 
1.3 K   3.8 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=03 
3.1 K   9.2 K   
/user/hive/warehouse/rt_dwd.db/dwd_music_copyright_test/dt=2020-08-29/hour=04 
??add partition

flink 
WebUI??checkpoint??StreamingFileCommitter??





??

exactly-once??sink??catalog



EXACTLY_ONCE??kafkaisolation.level=read_committed??enable.auto.commit=false??EXACTLY_ONCE??
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);

1.11????????????????????????????????????????????????????????????????

2020-09-04 文章 Asahi Lee
??
??StreamTableEnvironment.from("")
??package kafka;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Test3 {

public static void main(String[] args) {
// 
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = 
StreamTableEnvironment.create(bsEnv, bsSettings);

// ??
String inTablePath = "CREATE TABLE datagen (  " +
" id INT,  " +
" total string,  " +
" ts AS localtimestamp,  " +
" WATERMARK FOR ts AS ts  " +
") WITH (  " +
" 'connector' = 'datagen',  " +
" 'rows-per-second'='5',  " +
" 'fields.id.min'='1',  " +
" 'fields.id.max'='10',  " +
" 'fields.total.length'='10'  " +
")";
// ??
bsTableEnv.executeSql(inTablePath);

Table table = bsTableEnv.sqlQuery("select id, total, 12 as col_1 from 
datagen");
bsTableEnv.createTemporaryView("table1", table);

Table table1 = bsTableEnv.from("table1");
System.out.println(table1);
// ??table1

Table queryT = bsTableEnv.sqlQuery("select table1.id, 1 as b from 
table1");
System.out.println(queryT.getSchema());


bsTableEnv.sqlQuery("select table1.id from " + 
bsTableEnv.from("table1"));

}

}

Re: 消费kafka数据乱序问题

2020-09-04 文章 Xiao Xu
两个方法
1. kafka 里面可以 keyby, partition 里面都是有序的, 所以每个用户处理都是有序的
2. 就是你说的在 flink 里面做乱序处理

宁吉浩  于2020年9月4日周五 下午5:56写道:

> 我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格;
> 我的解决办法是把迟到数据丢弃,然后进行业务计算;
> 另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算;
> 之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次;
>
>
> --
> 发件人:smq <374060...@qq.com>
> 发送时间:2020年9月4日(星期五) 17:35
> 收件人:wwj ; user-zh 
> 主 题:回复:消费kafka数据乱序问题
>
>
> 换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100
>
> ---原始邮件---
> 发件人: "wwj" 发送时间: 2020年9月4日(周五) 下午5:10
> 收件人: "smq"<374060...@qq.com>;
> 主题: 回复:消费kafka数据乱序问题
>
>
>
> “假如说先处理了存钱,存上之后余额是100-100+100=100”  这句话没看懂,存上之后余额不应该是 100+100=200 吗?
>
>
>
>
>
>
>
>
> 原始邮件
>
>
> 发件人:"smq"< 374060...@qq.com >;
>
> 发件时间:2020/9/4 16:40
>
> 收件人:"user-zh"< user-zh@flink.apache.org >;
>
> 主题:消费kafka数据乱序问题
>
>
>
> 大家好
>  
>  现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额.
>
> 如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。
>   这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.


Re: flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-04 文章 Peihui He
Hi, all

当指定partition的时候这个问题通过path 也没法解决了

CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,  dt string,) PARTITIONED BY (dt) WITH (
  'connector' = 'filesystem',   -- required: specify the connector
  'path' = 'file:///path/to/whatever',  -- required: path to a directory
  'format' = 'json', -- required: file system connector)


select  * from  MyUserTable  limit 10;

job 会一直卡在一个地方
[image: image.png]

这种改怎么解决呢?

Peihui He  于2020年9月4日周五 下午6:02写道:

> hi, all
> 我这边用flink sql client 创建表的时候
>
> CREATE TABLE MyUserTable (
>   column_name1 INT,
>   column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
>   'connector' = 'filesystem',   -- required: specify the connector
>   'path' = 'file:///path/to/whatever',  -- required: path to a directory
>   'format' = 'json', -- required: file system connector)
>
> 当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
> sql client 提交job会很慢,最后会报错
>
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error.,  org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
> already been submitted. at
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:280)
> at sun.reflect.GeneratedMethodAccessor127.invoke(Unknown Source) at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
> akka.actor.Actor$class.aroundReceive(Actor.scala:517) at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
> akka.actor.ActorCell.invoke(ActorCell.scala:561) at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
> akka.dispatch.Mailbox.run(Mailbox.scala:225) at
> akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> End of exception on server side>] at
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>
>
> flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。
>
> 这种情况不知道有没有遇到过?
>
> Best Wishes.
>
>
>


sql-client checkpoint sql-client

2020-09-04 文章 引领


想尝试采用flink-cdc 来清洗数据,但是尝试下来,困于几点:
① 使用sql-client 开启checkpoint ,如果程序挂掉,该如何接着checkpoint,继续执行相应程序。尤其是在执行group 
by或者是count等操作时该如何办?
② 如果以上方式不行,是否可以采用写代码的形式,重启时指定checkpoint,但还是采用flink-cdc的方式去消费
| |
引领
|
|
yrx73...@163.com
|
签名由网易邮箱大师定制



flink sql 1.11.1 FileSystem SQL Connector path directory slow

2020-09-04 文章 Peihui He
hi, all
我这边用flink sql client 创建表的时候

CREATE TABLE MyUserTable (
  column_name1 INT,
  column_name2 STRING,) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',   -- required: specify the connector
  'path' = 'file:///path/to/whatever',  -- required: path to a directory
  'format' = 'json', -- required: file system connector)

当path后面多一个"/"时, 比如: 'path' = 'file:///path/to/whatever/'
sql client 提交job会很慢,最后会报错

Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[Internal server error., ] at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)


flink session cluster job 页面基本上都打不开,要过好久才可以。最后看到job 确实提交成功了。

这种情况不知道有没有遇到过?

Best Wishes.


回复:消费kafka数据乱序问题

2020-09-04 文章 宁吉浩
我也遇到了和你一样的问题,也是两条数据有因果关系,必须有严格的先后顺序,我这边的业务不像银行那么严格;
我的解决办法是把迟到数据丢弃,然后进行业务计算;
另起一个程序把数据缓存在内存里,对数据排序,然后再度修正计算;
之前的实时+离线数仓用的办法,代码开发一次,但还要跑两次;


--
发件人:smq <374060...@qq.com>
发送时间:2020年9月4日(星期五) 17:35
收件人:wwj ; user-zh 
主 题:回复:消费kafka数据乱序问题

换句话说,写进kafka的数据是同一个用户的两条,余额分别是0和100,就是我之前那个计算过程,这个计算操作在Oracle完成,我只负责把余额,也就是0或者100更新到kudu对应的余额字段,因为消费数据可能会乱序,先更新100,再更新0,这样导致存在kudu的数据是0,正确的数据最终应该是100

---原始邮件---
发件人: "wwj"

??????????????flink??????????????????

2020-09-04 文章 ????
??


   Flink+drools drools
2020-9-4
| |

|
|
hold_li...@163.com
|
??
??2020??8??6?? 10:26??samuel@ubtrobot.com ??
flink 
,??


??mysql??json
{"times":5}  ---5??
{"temperature": 80} ---80

1)kafka
2)flinkkafka??


??
1. 
2.??flink CEP??
3.??







Re: 请指教一个关于时间窗的问题,非常感谢!

2020-09-04 文章 zilong xiao
可否发下是哪个配置,有相关的文档吗?

superainbower  于2020年9月4日周五 下午5:24写道:

> 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题
>
>
> | |
> superainbower
> |
> |
> superainbo...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年09月4日 15:11,taochanglian 写道:
> 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。
>
> 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
> hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。
>
> 在 2020/9/4 13:14, Benchao Li 写道:
> 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
> 要处理这种情况,可以了解下idle source[1]
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>
> samuel@ubtrobot.com  于2020年9月3日周四 下午3:41写道:
>
> 补充一下环境信息:
>
> 有点类似以下问题:
> 在1.11版本测试flink sql时发现一个问题,用streaming api
> 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
> watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
> topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。
>
> 不确定是否是因为kafka多分区引起的?
>
>
>
> 发件人: samuel@ubtrobot.com
> 发送时间: 2020-09-03 09:23
> 收件人: user-zh
> 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
> 谢谢回复!
>
> 是Flink1.11.1的版本
>
> 以下是代码:
> package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements.  See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership.  The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License.  You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
>
> import org.apache.commons.collections.map.HashedMap;
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.serialization.SimpleStringEncoder;
> import org.apache.flink.api.common.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.state.BroadcastState;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.Path;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.streaming.api.CheckpointingMode;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.BroadcastStream;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
>
> org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
> import org.apache.flink.streaming.api.functions.co
> .BroadcastProcessFunction;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
> import
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
> import
>
> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> import
> org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import com.alibaba.fastjson.JSON;
> import com.alibaba.fastjson.JSONArray;
> import com.alibaba.fastjson.JSONObject;
> import com.alibaba.fastjson.parser.Feature;
> import com.ubtechinc.dataplatform.flink.util.AES256;
> import com.ubtechinc.dataplatform.flink.util.ConstantStr;
> import com.ubtechinc.dataplatform.flink.util.MailUtils;
> import com.ubtechinc.dataplatform.flink.util.SmsUtil;
> import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;
>
> import java.sql.DriverManager;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import com.mysql.jdbc.Connection;

回复: 请指教一个关于时间窗的问题,非常感谢!

2020-09-04 文章 superainbower
1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


在2020年09月4日 15:11,taochanglian 写道:
确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。

举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key
hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。

在 2020/9/4 13:14, Benchao Li 写道:
如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
要处理这种情况,可以了解下idle source[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

samuel@ubtrobot.com  于2020年9月3日周四 下午3:41写道:

补充一下环境信息:

有点类似以下问题:
在1.11版本测试flink sql时发现一个问题,用streaming api
消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。

不确定是否是因为kafka多分区引起的?



发件人: samuel@ubtrobot.com
发送时间: 2020-09-03 09:23
收件人: user-zh
主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
谢谢回复!

是Flink1.11.1的版本

以下是代码:
package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import org.apache.commons.collections.map.HashedMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.functions.co
.BroadcastProcessFunction;
import
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.ubtechinc.dataplatform.flink.util.AES256;
import com.ubtechinc.dataplatform.flink.util.ConstantStr;
import com.ubtechinc.dataplatform.flink.util.MailUtils;
import com.ubtechinc.dataplatform.flink.util.SmsUtil;
import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.mysql.jdbc.Connection;

import java.sql.Timestamp;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* 使用广播实现动态的配置更新
*/
public class ExceptionAlertHour4{

private static final L

flink sql client 如何同时执行多条 sql 语句

2020-09-04 文章 LittleFall
我有一个 sql 文件,它里面有不少 flink sql 的创建表的语句和查询语句,现在我想要通过 sql client
提交这些任务,却只能一句一句的复制粘贴。如果同时复制粘贴多条语句就会报错,在 flink sql client 中使用 source xxx.sql
也会报错。

请问用什么样的方法可以一次性执行多条语句呢?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

消费kafka数据乱序问题

2020-09-04 文章 smq
大家好
   现在碰到一个关于处理乱序的问题,业务场景是银行余额的更新,数据源是kafka,有一个账户余额字段,sink到kudu,更新客户余额.
如果对于同一个账户的多笔操作出现乱序,可能会导致客户余额不对。比如一个客户账户有100块,先消费100,然后存100,按照正常的数据处理顺序,帐户余额应该还是100的,假如说先处理了存钱,存上之后余额是100-100+100=100,然后处理消费100的那条数据,由于消费在前,这时这条数据对应的余额是100-100=0。
  这样的话就出现了统计余额错误,请问想要按照事件时间处理数据是不是可以先keyby,然后用watermark.

Re: Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 文章 Xingbo Huang
Hi,

推荐你使用ddl来声明你上下游用的connector

```
table_env.execute_sql("""
CREATE TABLE output (
data STRING ARRAY
) WITH (
 'connector' = 'filesystem',   -- required: specify the connector
 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
 'format' = 'json',
 'json.fail-on-missing-field' = 'false',
 'json.ignore-parse-errors' = 'true'
)
""")

table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
```

Best,
Xingbo



whh_960101  于2020年9月4日周五 下午3:46写道:

> 您好,我是想让输出insert_into到目标表中,具体如下:
> st_env=StreamExecutionEnvironment.get_execution_environment()
> st_env.connect了一个source table(table包含a字段),
> 然后
> | st_env.connect(FileSystem().path('tmp')) \ |
> | | .with_format(OldCsv() |
> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
> | | .with_schema(Schema() |
> | | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
> | | .create_temporary_table('sink') |
> connect了一个sink表,format、schema都是DataTypes.ARRAY()
> 然后我定义了一个udf
>
> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def func(a):
> rec_list = a.split(',')
> res_arr = np.arrary(rec_list,dtype=str)
> return res_arr
> st_env.register_function("func", func)
> st_env.from_path("source").select("func(a)").insert_into("sink")
> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return
> res_arr[0],tmp文件里面的字符串就是正确。
> 我想要得到array,该怎么解决?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-04 15:17:38,"Xingbo Huang"  写道:
> >Hi,
> >
> >你是调试的时候想看结果吗?
> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
> >
> >个人觉得to_pandas最简单,比如你可以试试下面的例子
> >
> >```
> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
> >
> >@udf(input_types=DataTypes.STRING(),
> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
> >def func(a):
> > return np.array([a, a, a], dtype=str)
> >
> >table_env.register_function("func", func)
> >
> >table.select("func(b)").to_pandas()
> >```
> >然后,你可以看看官方文档[1],让你快速上手PyFlink
> >
> >Best,
> >Xingbo
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
> >
> >whh_960101  于2020年9月4日周五 下午2:50写道:
> >
> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> >> 我的udf输出了一个numpy.array(dtype = str),
> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
> >>
> >>
> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> >> 请问这个问题该怎么解决?
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-04 10:35:03,"Xingbo Huang"  写道:
> >> >Hi,
> >> >
> >>
> >>
> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> >whh_960101  于2020年9月4日周五 上午9:26写道:
> >> >
> >> >>
> >> >>
> >>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> >> udf定义如下:
> >> >>
> >> >>
> >>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> >> def fun(data):
> >> >>  b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >> >>
> >> >>
> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> >> 希望您能给我提供好的解决办法,万分感谢!
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang"  写道:
> >> >> >Hi,
> >> >> >
> >> >> >我觉得你从头详细描述一下你的表结构。
> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >>
> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >> >
> >> >> >[1]
> >> >> >
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >> >
> >> >> >Best,
> >> >> >Xingbo
> >> >> >
> >> >> > 于2020年9月3日周四 下午9:45写道:
> >> >> >
> >> >> >>
> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> DataTypes.STRING()]
> >> >> >>
> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> >> 或者正确写法是什么样的,感谢解答!
> >> >> >>
> >> >> >>
> >> >> >> | |
> >> >> >> whh_960101
> >> >> >> |
> >> >> >> |
> >> >> >> 邮箱:whh_960...@163.com
> >> >> >> |
> >> >> >>
> >> >> >> 签名由 网易邮箱大师 定制
> >> >> >>
> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> >> Hi,
> >> >> >> input_types定义的是每一个列的具体类型。
> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> >> 正确的写法是
> >> >> >>
> >> >> >>input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> >> DataTypes.STRING()]
> >> >> >>
> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >> >>input_types=DataTypes.Row([DataTypes.FIELD("a",
> >> DataTypes.STRING

Re:Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 文章 whh_960101
您好,我是想让输出insert_into到目标表中,具体如下:
st_env=StreamExecutionEnvironment.get_execution_environment()
st_env.connect了一个source table(table包含a字段),
然后
| st_env.connect(FileSystem().path('tmp')) \ |
| | .with_format(OldCsv() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
| | .with_schema(Schema() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
| | .create_temporary_table('sink') |
connect了一个sink表,format、schema都是DataTypes.ARRAY()
然后我定义了一个udf
@udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
rec_list = a.split(',')
res_arr = np.arrary(rec_list,dtype=str)
return res_arr
st_env.register_function("func", func)
st_env.from_path("source").select("func(a)").insert_into("sink")
最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容(假如res_arr=['1','2','3']),如果我单独返回一个值,比如return
 res_arr[0],tmp文件就显示'1'。
我想要得到array,该怎么解决?



















在 2020-09-04 15:17:38,"Xingbo Huang"  写道:
>Hi,
>
>你是调试的时候想看结果吗?
>你可以直接table.to_pandas()来看结果,或者用print connector来看。
>
>个人觉得to_pandas最简单,比如你可以试试下面的例子
>
>```
>table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>
>@udf(input_types=DataTypes.STRING(),
>result_type=DataTypes.ARRAY(DataTypes.STRING()))
>def func(a):
> return np.array([a, a, a], dtype=str)
>
>table_env.register_function("func", func)
>
>table.select("func(b)").to_pandas()
>```
>然后,你可以看看官方文档[1],让你快速上手PyFlink
>
>Best,
>Xingbo
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>
>whh_960101  于2020年9月4日周五 下午2:50写道:
>
>> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> 我的udf输出了一个numpy.array(dtype = str),
>> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> 请问这个问题该怎么解决?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 10:35:03,"Xingbo Huang"  写道:
>> >Hi,
>> >
>>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >
>> >Best,
>> >Xingbo
>> >
>> >whh_960101  于2020年9月4日周五 上午9:26写道:
>> >
>> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> udf定义如下:
>> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> def fun(data):
>> >>  b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >>
>> >>
>> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> 希望您能给我提供好的解决办法,万分感谢!
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-03 22:23:28,"Xingbo Huang"  写道:
>> >> >Hi,
>> >> >
>> >> >我觉得你从头详细描述一下你的表结构。
>> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> > 于2020年9月3日周四 下午9:45写道:
>> >> >
>> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> DataTypes.STRING()]
>> >> >>
>> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> >> 或者正确写法是什么样的,感谢解答!
>> >> >>
>> >> >>
>> >> >> | |
>> >> >> whh_960101
>> >> >> |
>> >> >> |
>> >> >> 邮箱:whh_960...@163.com
>> >> >> |
>> >> >>
>> >> >> 签名由 网易邮箱大师 定制
>> >> >>
>> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> >> Hi,
>> >> >> input_types定义的是每一个列的具体类型。
>> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> >> 正确的写法是
>> >> >>
>> >> >>input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> DataTypes.STRING()]
>> >> >>
>> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >> >>input_types=DataTypes.Row([DataTypes.FIELD("a",
>> DataTypes.STRING()),
>> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> >> DataTypes.STRING())])
>> >> >>
>> >> >> Best,
>> >> >> Xingbo
>> >> >>
>> >> >> whh_960101  于2020年9月3日周四 下午9:03写道:
>> >> >>
>> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> >> > input_type:input_type should be DataType but contain
>> RowField(RECID,
>> >> >> > VARCHAR)
>> >> >> > 我的pyflink版本:1.11.1
>> >> >>
>> >>
>>





 

Re:Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 文章 whh_960101
您好,我是想让输出insert_into到目标表中,具体如下:
st_env=StreamExecutionEnvironment.get_execution_environment()
st_env.connect了一个source table(table包含a字段),
然后
| st_env.connect(FileSystem().path('tmp')) \ |
| | .with_format(OldCsv() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
| | .with_schema(Schema() |
| | .field('res', DataTypes.ARRAY(DataTypes.STRING( \ |
| | .create_temporary_table('sink') |
connect了一个sink表,format、schema都是DataTypes.ARRAY()
然后我定义了一个udf
@udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
rec_list = a.split(',')
res_arr = np.arrary(rec_list,dtype=str)
return res_arr
st_env.register_function("func", func)
st_env.from_path("source").select("func(a)").insert_into("sink")
最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串,不是我res_arr里面的内容,如果我单独返回一个值,比如return
 res_arr[0],tmp文件里面的字符串就是正确。
我想要得到array,该怎么解决?



















在 2020-09-04 15:17:38,"Xingbo Huang"  写道:
>Hi,
>
>你是调试的时候想看结果吗?
>你可以直接table.to_pandas()来看结果,或者用print connector来看。
>
>个人觉得to_pandas最简单,比如你可以试试下面的例子
>
>```
>table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>
>@udf(input_types=DataTypes.STRING(),
>result_type=DataTypes.ARRAY(DataTypes.STRING()))
>def func(a):
> return np.array([a, a, a], dtype=str)
>
>table_env.register_function("func", func)
>
>table.select("func(b)").to_pandas()
>```
>然后,你可以看看官方文档[1],让你快速上手PyFlink
>
>Best,
>Xingbo
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>
>whh_960101  于2020年9月4日周五 下午2:50写道:
>
>> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> 我的udf输出了一个numpy.array(dtype = str),
>> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> 请问这个问题该怎么解决?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 10:35:03,"Xingbo Huang"  写道:
>> >Hi,
>> >
>>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >
>> >Best,
>> >Xingbo
>> >
>> >whh_960101  于2020年9月4日周五 上午9:26写道:
>> >
>> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> udf定义如下:
>> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> def fun(data):
>> >>  b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >>
>> >>
>> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> 希望您能给我提供好的解决办法,万分感谢!
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-03 22:23:28,"Xingbo Huang"  写道:
>> >> >Hi,
>> >> >
>> >> >我觉得你从头详细描述一下你的表结构。
>> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> > 于2020年9月3日周四 下午9:45写道:
>> >> >
>> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> DataTypes.STRING()]
>> >> >>
>> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> >> 或者正确写法是什么样的,感谢解答!
>> >> >>
>> >> >>
>> >> >> | |
>> >> >> whh_960101
>> >> >> |
>> >> >> |
>> >> >> 邮箱:whh_960...@163.com
>> >> >> |
>> >> >>
>> >> >> 签名由 网易邮箱大师 定制
>> >> >>
>> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> >> Hi,
>> >> >> input_types定义的是每一个列的具体类型。
>> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> >> 正确的写法是
>> >> >>
>> >> >>input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> DataTypes.STRING()]
>> >> >>
>> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >> >>input_types=DataTypes.Row([DataTypes.FIELD("a",
>> DataTypes.STRING()),
>> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> >> DataTypes.STRING())])
>> >> >>
>> >> >> Best,
>> >> >> Xingbo
>> >> >>
>> >> >> whh_960101  于2020年9月3日周四 下午9:03写道:
>> >> >>
>> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> >> > input_type:input_type should be DataType but contain
>> RowField(RECID,
>> >> >> > VARCHAR)
>> >> >> > 我的pyflink版本:1.11.1
>> >> >>
>> >>
>>


回复:关于flink sql并行度问题的请教

2020-09-04 文章 me
val tableConfig = tableEnv.getConfig.getConfiguration
tableConfig.setString("table.exec.resource.default-parallelism","4")
已经加了table的并行度设置,但是提示小于104并行度不让执行
 Vertex Source: HiveTableSource()'s parallelism (104) is higher than the max 
parallelism (4). Please lower the parallelism or increase the max parallelism


 原始邮件 
发件人: me
收件人: user-zh
发送时间: 2020年9月4日(周五) 15:18
主题: 关于flink sql并行度问题的请教


我的代码中使用flinksql,本机idea中测试没有问题,放到集群上跑之后,占用了全部40个slots,并且并行度为114,程序也一直卡着没有输出,是一个加入了时间窗口的flink
 sql 我再代码中全局设置了,dataStreamEnv.setParallelism(4) 
dataStreamEnv.setMaxParallelism(4) 但是感觉完全不起作用,请问怎么去限制flink sql的并行度?

关于flink sql并行度问题的请教

2020-09-04 文章 me
我的代码中使用flinksql,本机idea中测试没有问题,放到集群上跑之后,占用了全部40个slots,并且并行度为114,程序也一直卡着没有输出,是一个加入了时间窗口的flink
 sql 
我再代码中全局设置了,dataStreamEnv.setParallelism(4)
dataStreamEnv.setMaxParallelism(4)
但是感觉完全不起作用,请问怎么去限制flink sql的并行度?

Re: Re: Re: pyflink-udf 问题反馈

2020-09-04 文章 Xingbo Huang
Hi,

你是调试的时候想看结果吗?
你可以直接table.to_pandas()来看结果,或者用print connector来看。

个人觉得to_pandas最简单,比如你可以试试下面的例子

```
table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])

@udf(input_types=DataTypes.STRING(),
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
 return np.array([a, a, a], dtype=str)

table_env.register_function("func", func)

table.select("func(b)").to_pandas()
```
然后,你可以看看官方文档[1],让你快速上手PyFlink

Best,
Xingbo

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html

whh_960101  于2020年9月4日周五 下午2:50写道:

> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> 我的udf输出了一个numpy.array(dtype = str),
> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>
> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> 请问这个问题该怎么解决?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-04 10:35:03,"Xingbo Huang"  写道:
> >Hi,
> >
>
> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >
> >Best,
> >Xingbo
> >
> >whh_960101  于2020年9月4日周五 上午9:26写道:
> >
> >>
> >>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> udf定义如下:
> >>
> >>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> def fun(data):
> >>  b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >>
> >>
> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> 希望您能给我提供好的解决办法,万分感谢!
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-03 22:23:28,"Xingbo Huang"  写道:
> >> >Hi,
> >> >
> >> >我觉得你从头详细描述一下你的表结构。
> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> > 于2020年9月3日周四 下午9:45写道:
> >> >
> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()]
> >> >>
> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> 或者正确写法是什么样的,感谢解答!
> >> >>
> >> >>
> >> >> | |
> >> >> whh_960101
> >> >> |
> >> >> |
> >> >> 邮箱:whh_960...@163.com
> >> >> |
> >> >>
> >> >> 签名由 网易邮箱大师 定制
> >> >>
> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> Hi,
> >> >> input_types定义的是每一个列的具体类型。
> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> 正确的写法是
> >> >>
> >> >>input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> DataTypes.STRING()]
> >> >>
> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >>input_types=DataTypes.Row([DataTypes.FIELD("a",
> DataTypes.STRING()),
> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> >> DataTypes.STRING())])
> >> >>
> >> >> Best,
> >> >> Xingbo
> >> >>
> >> >> whh_960101  于2020年9月3日周四 下午9:03写道:
> >> >>
> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> >> > input_type:input_type should be DataType but contain
> RowField(RECID,
> >> >> > VARCHAR)
> >> >> > 我的pyflink版本:1.11.1
> >> >>
> >>
>


Re: 回复:请指教一个关于时间窗的问题,非常感谢!

2020-09-04 文章 taochanglian

确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。

举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key 
hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。


在 2020/9/4 13:14, Benchao Li 写道:

如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。
要处理这种情况,可以了解下idle source[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

samuel@ubtrobot.com  于2020年9月3日周四 下午3:41写道:


补充一下环境信息:

有点类似以下问题:
在1.11版本测试flink sql时发现一个问题,用streaming api
消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui
watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。

不确定是否是因为kafka多分区引起的?



发件人: samuel@ubtrobot.com
发送时间: 2020-09-03 09:23
收件人: user-zh
主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢!
谢谢回复!

是Flink1.11.1的版本

以下是代码:
package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */

import org.apache.commons.collections.map.HashedMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.functions.co
.BroadcastProcessFunction;
import
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.ubtechinc.dataplatform.flink.util.AES256;
import com.ubtechinc.dataplatform.flink.util.ConstantStr;
import com.ubtechinc.dataplatform.flink.util.MailUtils;
import com.ubtechinc.dataplatform.flink.util.SmsUtil;
import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.mysql.jdbc.Connection;

import java.sql.Timestamp;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
  * 使用广播实现动态的配置更新
  */
public class ExceptionAlertHour4{

private static final Logger LOG =
LoggerFactory.getLogger(ExceptionAlertHour4.class);

public static void main(String[] args