standalone K8S 如何查看 TaskMananger 的 gc.log ?

2021-06-18 文章 WeiXubin
请问 *standalone K8S* 部署模式为 *Deploy Application Cluster* 在哪获取查看/怎么配置
TaskMananger 的 *gc.log* 日志? 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:FlinkSQL join 维表后一定会变成 upsert流吗?

2021-06-10 文章 WeiXubin
感谢你的回答,我这边看了官网目前 join 一共可以分为 Regular Joins 、 Interval Joins 以及 Temporal Joins
三大类。 我上面问题所述的确是采用了 Regular Joins 的方式。 之后我也尝试使用了 Lookup Join 但发现其最后也是转为
INSERT INTO ON DUPLICATE KEY UPDATE  的执行语句, 并不是我所期望的纯 append 模式



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 关于 LEFT JOIN DimTable FOR SYSTEM_TIME AS OF 的异常求教!

2021-06-10 文章 WeiXubin
已解决,问题在于 Kafka 不是直接 join 维表,而是先和 UDTF join,之后整体才与维表 Join。
所以之前起别名的位置有误。导致找不到字段、改造如下:

先前写法:
INSERT INTO sinktable select
// 省略字段
 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,
'uid,game_id(BIGINT),platform'
)) *as k* LEFT JOIN DimTable FOR SYSTEM_TIME AS OF k.record_time as d
 ON k.game_id = d.game_id and k.platform = d.platform;

改造写法:

INSERT INTO sinktable select
  // 省略字段
 from (
 select *
 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,
'uid,game_id(BIGINT),platform'
))) as k JOIN DimTable FOR SYSTEM_TIME AS OF k.procTime as d
 ON k.game_id = d.game_id and k.platform = d.platform;




--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于 LEFT JOIN DimTable FOR SYSTEM_TIME AS OF 的异常求教!

2021-06-10 文章 WeiXubin
异常:column 'record_time' not found in table 'k'

异常描述:KafkaTable k 表在与维表进行 look up join 时定义了别名,之后报在 k 表中没有定义 record_time 字段。

Flink 版本: 1.12.2

// Source 表
CREATE TABLE KafkaTable (
message STRING,
record_time TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
);

// 维表
CREATE TEMPORARY TABLE DimTable (
game_id BIGINT,
game_name VARCHAR,
root_game_id BIGINT,
main_game_id BIGINT,
platform VARCHAR
) WITH (
'connector' = 'jdbc',
);

// 处理语句
INSERT INTO sinktable select
// 省略字段
 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,
'uid,game_id(BIGINT),platform'
)) *as k* LEFT JOIN DimTable FOR SYSTEM_TIME AS OF k.record_time as d
 ON k.game_id = d.game_id and k.platform = d.platform;



--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlinkSQL join 维表后一定会变成 upsert流吗?

2021-06-09 文章 WeiXubin
请教各位一下,我使用 FlinkSQL 编写任务时,kafka source ->  MySQL sink  不设置主键,查看了一下 request
mode 是 [INSERT] ,也就是普通的 append 流,这很正常。

但是当我关联上维表后,发现 request mode 变成了 [INSERT, UPDATE_BEFORE, UPDATE_AFTER,
DELETE],这时异常报错会要求我给 sink 表设置主键,当我设置上主键后就会变成了 upsert 流。

upsert流底层实现原理是 INSERT INTO ... DUPLICATE KEY UPDATE,由于我采用的是阿里云的ADB数据库,该语法在
ADB 中主键是不支持update的,这会导致报错。且业务上我只想以 append 流的形式插入表。 

请问各位有什么好的解决方案吗,关联上维表但是还是保持 append流?

// 维表
CREATE TABLE DimTable (
   //省略字段
) WITH (
'connector' = 'jdbc',
'url' = '***',
'table-name' = 'v2_dim_game_id',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = '***',
'password' = '***',
'lookup.cache.max-rows'='5000',
'lookup.cache.ttl' = '60s',
'lookup.max-retries'='3'
);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkSQL cannot update pk column UID to expr

2021-06-08 文章 WeiXubin
我把表移动到了普通 MYSQL,可以正常运行。 经过排查应该是 ADB 建表 SQL 中的  DISTRIBUTE BY HASH(`uid`)
所导致,该语法用于处理数据倾斜问题。 看起来似乎是联表 join 的时候要求定义主键,但是定义主键后会转换为 upsert 流,而 ADB 中定义了
DISTRIBUTE BY 与 upsert 冲突了,不知道是否这么理解



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL cannot update pk column UID to expr

2021-06-08 文章 WeiXubin
详细的异常打印信息如下:

java.sql.BatchUpdateException: [3,
2021060816420017201616500303151172306] cannot update pk column UID to expr
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
at com.mysql.cj.util.Util.getInstance(Util.java:167)
at com.mysql.cj.util.Util.getInstance(Util.java:174)
at
com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
at
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755)
at
com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426)
at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796)
at
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
at
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
at
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:128)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at
java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:308)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink.147419.n8.nabble.com/


FlinkSQL cannot update pk column UID to expr

2021-06-08 文章 WeiXubin
基础场景: 从 KafkaSource 输入数据,输出到 sinktable, 期间 Left join 关联 DimTable  维表。
Flink 版本 1.12.2

场景1:当把 sinktable 设置为 'connector' = 'print' ,不设置任何主键,可正常关联输出
场景2:当把 sinktable 设置为 'connector' = 'mysql' 则会要求加上 primary key
场景3:在 sinktable 加上 PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED
则报错,主要报错信息:
java.sql.BatchUpdateException: [3,
2021060816420117201616500303151172567] cannot update pk column UID to expr

注:此处使用的MySQL 是阿里的 ADB,建表SQL如下
Create Table `v2_dwd_root_game_uid_reg_log` (
 `uid` bigint NOT NULL DEFAULT '0' COMMENT '注册uid',
 `user_name` varchar NOT NULL DEFAULT '',
  // 此处省略其他字段
 primary key (`uid`,`platform`,`root_game_id`)
) DISTRIBUTE BY HASH(`uid`) INDEX_ALL='Y' STORAGE_POLICY='HOT'
COMMENT='按根游戏账号注册日志';


下面是场景3的SQL语句:
// Kafka Source
CREATE TABLE KafkaTable (
message STRING
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'properties.group.id' = 'x',
'scan.startup.mode' = 'group-offsets',
'format' = 'json'
);

// 维表
CREATE TABLE DimTable (
game_id BIGINT,
root_game_id BIGINT,
main_game_id BIGINT,
platform VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = '',
'table-name' = 'v2_dim_game_id',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'x',
'password' = '',
'lookup.cache.max-rows'='5000',
'lookup.cache.ttl' = '60s',
'lookup.max-retries'='3'
);

// MySQL输出
CREATE TABLE sinktable (
uid BIGINT,
root_game_id BIGINT,
game_id BIGINT,
platform VARCHAR,
//省略其它字段
PRIMARY KEY (uid, platform,root_game_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'xx',
'table-name' = 'v2_dwd_root_game_uid_reg_log',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'xxx',
'password' = '',
'sink.buffer-flush.interval'='5s',
'sink.buffer-flush.max-rows' = '10'
);

// 插入(关联维表)
INSERT INTO sinktable select
IF(IsInvalidValue(k.uid), 0 , CAST(k.uid AS BIGINT)) as uid,
IF((k.game_id IS NULL), 0 , k.game_id) as game_id,
d.platform as platform,
d.root_game_id as root_game_id,
// 省略其它字段
 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,
'uid,game_id(BIGINT),platform'
)) as k LEFT JOIN DimTable as d ON k.game_id = d.game_id and k.platform =
d.platform;




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于 flinksql 维表的问题

2021-06-02 文章 WeiXubin
你好,可以麻烦详细描述一下吗? 谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL 1.11.3问题请教

2021-06-02 文章 WeiXubin
不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 
[{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL
编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到
sink。

Row row = new Row(arity);
collect(row);

具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/

Best,Weixubin



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于 flinksql 维表的问题

2021-05-28 文章 WeiXubin
感谢各位的建议



--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于 flinksql 维表的问题

2021-05-22 文章 WeiXubin
我想实现将MySQL中的 A 表数据预先查询出来进行缓存,用于给流表 B 进行 join关联。接下来定时查询并更新 A 表内的缓存数据,请问目前
FlinkSQL 可以实现吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

类型转换问题 String 类型如何转 decimal类型

2021-05-13 文章 WeiXubin
source 端接收到的数据类型为 String,  sink 端 MySQL 数据库字段类型定义为 decimal(12, 2)  , 在编写
insert 语句时该如何把 String 类型转换为 decimal, 尝试了 flink build-in 的 cast
并不行,请问各位有什么好的方法?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: kafka table connector eventTime的问题

2020-11-04 文章 WeiXubin
Hi,我这有一个使用Datastream开发简单例子,接收Kafka(Event
Time)数据并进行开窗聚合。Kafka数据格式如:{"word":"a","count":1,"time":1604286564},可以看看该Demo对你是否有所帮助。

public class MyExample {

public static void main(String[] args) throws Exception {
// 创建环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 设置时间特性为
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 水印策略
WatermarkStrategy watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new
SerializableTimestampAssigner() {
@Override
public long extractTimestamp(WC wc, long l) {
return wc.getEventTime() * 1000;
}
});

// Kafka 配置
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "Kafka地址:9092");
properties.setProperty("group.id", "test");

env.addSource(new FlinkKafkaConsumer<>("flinktest1", new
JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest())
// map 构建 WC 对象
.map(new MapFunction() {
@Override
public WC map(ObjectNode jsonNode) throws Exception {
JsonNode valueNode = jsonNode.get("value");
WC wc = new
WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());
return wc;
}
})
// 设定水印策略
.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(WC::getWord)
// 窗口设置,这里设置为滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// 设置窗口延迟
.allowedLateness(Time.seconds(2))
.reduce(new ReduceFunction() {
@Override
public WC reduce(WC wc, WC t1) throws Exception {
return new WC(wc.getWord(), wc.getCount() +
t1.getCount());
}
})
.print();

env.execute();
}


static class WC {
public String word;
public int count;
public long eventTime;

public long getEventTime() {
return eventTime;
}

public void setEventTime(long eventTime) {
this.eventTime = eventTime;
}

public String getWord() {
return word;
}

public void setWord(String word) {
this.word = word;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public WC(String word, int count) {
this.word = word;
this.count = count;
}

public WC(String word, int count,long eventTime) {
this.word = word;
this.count = count;
this.eventTime = eventTime;
}
  
@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.10 sink to mysql SocketException

2020-10-09 文章 WeiXubin
Hi,
你可以看下这个 Jira , https://issues.apache.org/jira/browse/FLINK-16681,似乎与你问题相关。
由于长时间没有数据导致 connection 断开问题,该问题已经在1.11版本修复。

Best,
Weixubin



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于Flink1.11 CSV Format的一些疑问

2020-08-08 文章 WeiXubin
感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于Flink1.11 CSV Format的一些疑问

2020-08-07 文章 WeiXubin
Hi,我在Flink1.11版本,使用filesystem connector的时候,读取csv文件并输出到另外一个csv文件遇到了些问题,问题如下:
问题1:sink 的path指定具体输出文件名,但是输出的结果是 文件夹形式
问题2:在flink1.11的文档中没有找到csv的 ignore-first-line 忽略第一行这个配置

测试数据
11101322000220200517145507667060666706;9
11101412000220200515163257249700624970;9
11101412010220200514163709315410631541;9
11101712050220200516173624737150673715;9
11101312000220200516184127322880632288;9

CREATE TABLE source_table (
  face_id STRING,
  p_id STRING
) WITH (
 'connector' = 'filesystem',
 'path' = 'E:\label_file.csv',
 'format' = 'csv',
 'csv.field-delimiter'=';'
);

CREATE TABLE sink_table (
  face_id STRING,
  p_id STRING
) WITH (
 'connector' = 'filesystem',
 'path' = 'E:\label_file2.csv',
  'csv.disable-quote-character' ='true',
 'format' = 'csv',
 'csv.field-delimiter'=';'
);

INSERT INTO sink_table SELECT face_id,p_id FROM source_table;




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11查询结果每秒入库到mysql数量很少

2020-07-24 文章 WeiXubin
Hi,
你可以尝试改写url,加上rewritebatchedstatements=true,如下:
jdbc:mysql://198.2.2.71:3306/bda?useSSL=false=true

MySQL
Jdbc驱动在默认情况下会无视executeBatch()语句,把期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。把rewriteBatchedStatements参数置为true,
驱动才会帮你批量执行SQL。

祝好
weixubin



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 文章 WeiXubin
感谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 文章 Weixubin
Hi,
我想请教下,使用streamExecutionEnv.execute("from kafka sink hbase") 是可以指定Job的名称。
而当改用streamTableEnv.executeSql(sql)的方式时,似乎无法定义Job的名称。
请问有什么解决的方法吗?

















在 2020-07-08 16:07:17,"Jingsong Li"  写道:
>Hi,
>
>你的代码里:streamTableEnv.executeSql,它的意思就是已经提交到集群异步的去执行了。
>
>所以你后面 "streamExecutionEnv.execute("from kafka sink hbase")"
>并没有真正的物理节点。你不用再调用了。
>
>Best,
>Jingsong
>
>On Wed, Jul 8, 2020 at 3:56 PM Zhou Zach  wrote:
>
>>
>>
>>
>> 代码结构改成这样的了:
>>
>>
>>
>>
>> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> val blinkEnvSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>
>> val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
>> blinkEnvSettings)
>>
>>
>>
>>
>>
>> streamExecutionEnv.execute("from kafka sink hbase")
>>
>>
>>
>>
>> 还是报一样的错
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-08 15:40:41,"夏帅"  写道:
>> >你好,
>> >可以看看你的代码结构是不是以下这种
>> >val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> >val bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
>> >val tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>> >  ..
>> >tableEnv.execute("")
>> >如果是的话,可以尝试使用bsEnv.execute("")
>> >1.11对于两者的execute代码实现有改动
>> >
>> >
>> >--
>> >发件人:Zhou Zach 
>> >发送时间:2020年7月8日(星期三) 15:30
>> >收件人:Flink user-zh mailing list 
>> >主 题:flink Sql 1.11 executeSql报No operators defined in streaming topology
>> >
>> >代码在flink
>> 1.10.1是可以正常运行的,升级到1.11.0时,提示streamTableEnv.sqlUpdate弃用,改成executeSql了,程序启动2秒后,报异常:
>> >Exception in thread "main" java.lang.IllegalStateException: No operators
>> defined in streaming topology. Cannot generate StreamGraph.
>> >at
>> org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
>> >at
>> org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
>> >at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
>> >at org.rabbit.sql.FromKafkaSinkHbase$.main(FromKafkaSinkHbase.scala:79)
>> >at org.rabbit.sql.FromKafkaSinkHbase.main(FromKafkaSinkHbase.scala)
>> >
>> >
>> >但是,数据是正常sink到了hbase,是不是executeSql误报了。。。
>> >
>> >
>> >
>> >
>> >query:
>> >streamTableEnv.executeSql(
>> >  """
>> >|
>> >|CREATE TABLE `user` (
>> >|uid BIGINT,
>> >|sex VARCHAR,
>> >|age INT,
>> >|created_time TIMESTAMP(3),
>> >|WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >|) WITH (
>> >|'connector.type' = 'kafka',
>> >|'connector.version' = 'universal',
>> >|-- 'connector.topic' = 'user',
>> >|'connector.topic' = 'user_long',
>> >|'connector.startup-mode' = 'latest-offset',
>> >|'connector.properties.group.id' = 'user_flink',
>> >|'format.type' = 'json',
>> >|'format.derive-schema' = 'true'
>> >|)
>> >|""".stripMargin)
>> >
>> >
>> >
>> >
>> >
>> >
>> >streamTableEnv.executeSql(
>> >  """
>> >|
>> >|CREATE TABLE user_hbase3(
>> >|rowkey BIGINT,
>> >|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>> >|) WITH (
>> >|'connector.type' = 'hbase',
>> >|'connector.version' = '2.1.0',
>> >|'connector.table-name' = 'user_hbase2',
>> >|'connector.zookeeper.znode.parent' = '/hbase',
>> >|'connector.write.buffer-flush.max-size' = '10mb',
>> >|'connector.write.buffer-flush.max-rows' = '1000',
>> >|'connector.write.buffer-flush.interval' = '2s'
>> >|)
>> >|""".stripMargin)
>> >
>> >
>> >streamTableEnv.executeSql(
>> >  """
>> >|
>> >|insert into user_hbase3
>> >|SELECT uid,
>> >|
>> >|  ROW(sex, age, created_time ) as cf
>> >|  FROM  (select uid,sex,age, cast(created_time as VARCHAR) as
>> created_time from `user`)
>> >|
>> >|""".stripMargin)
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
>-- 
>Best, Jingsong Lee


Re: 回复:flink Sql 1.11 executeSql报No operators defined in streaming topology

2020-07-23 文章 WeiXubin
Hi,
我想请问下使用 streamExecutionEnv.execute("from kafka sink
hbase"),通过这种方式可以给Job指定名称。
而当使用streamTableEnv.executeSql(sql)之后似乎无法给Job定义名称。
请问有什么解决方案吗?谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-24 文章 Weixubin



感谢 Leonard Xu 与 Jark 两位,已成功解决满足需求!
对于 chenxuying 所提出的问题,我也很感兴趣。
由于使用UDF重复解析两遍,不知是否有更好的替代方法
Thanks
Bin














在 2020-06-24 12:32:27,"Leonard Xu"  写道:
>Hello,
>
>你的需求其实是要 抽取记录的字段定义watermark, 这个只能放到source 表的DDL中,view上也不支持的。
>1.10里的计算列 + udf 应该就可以满足你的需求, 大概长这样:
>
>CREATE TABLE sourceTable (
> request_uri STRING,
> ts as extractTsUdf(request_uri),
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
>
>) WITH (
>  ..
>);
>
>select ... from (
>select ts, T.* from 
>sourceTable  sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T(...)
>) t
>group by TUMBLE(ts, INTERVAL '30' SECOND)
>
>祝好,
>Leonard
>
>
>> 在 2020年6月24日,12:09,Weixubin <18925434...@163.com> 写道:
>> 
>> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
>


Re:Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-23 文章 Weixubin



感谢你提供了子查询的思路,不过经过试验后有点可惜,这似乎还是满足不了我们的需求。


我们的场景是从阿里云SLS读取消息。每条消息有一个字段是request_uri。
第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
第二个数据处理过程就是将每条记录里的 声明heart_time为事件时间属性并使用5秒延迟水印策略,进行开窗聚合处理。


//如果应用到子查询的话,Flink是不支持这样做的。WATERMARK FOR 水印声明只能在DDL里应用。如下:
select 
..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') ,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND 
 from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·


//如果应用到source,则一开始并不知道heart_time 的值
CREATE TABLE sourceTable (
  request_uri STRING
..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') ,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND 
) WITH ( ... );


只能等待Flink 1.11 尝试是否可以用View作为中间临时表,并对View进行 WATERMARK水印声明
Thanks
Bin

在 2020-06-23 15:28:50,"Leonard Xu"  写道:
>Hi
>我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select
> * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 
>的这段sql是可以复用的,就和 VIEW的作用类似。
>
>如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable , LATERAL 
>TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by, 插入最终的结果表就能满足需求了吧。
>
>祝好,
>Leonard Xu
>
>
>> 在 2020年6月23日,15:21,Weixubin <18925434...@163.com> 写道:
>> 
>> 
>> 
>> 
>> Hi,
>> 关于这句 “把 ` select * from sourceTable , LATERAL 
>> TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和 
>> group后再写入最终结果表就可以了”
>> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? 
>> 可否简单举个例子。
>> Thanks,
>> Bin
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-06-23 11:57:28,"Leonard Xu"  写道:
>>> Hi,
>>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master 
>>> 分支上的版本号为1.12-SNAPSHOT
>>> ,等1.11版本发布了就可以看到对应的文档。
>>> 
>>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 
>>> `  select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as 
>>> T( )….`  这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 
>>> planner 会做分段优化。
>>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
>>> 
>>> 
>>> 祝好,
>>> Leonard Xu
>


Re:Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-23 文章 Weixubin



 Hi,
关于这句 “把 ` select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) 
as T( )….` 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了”
我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误? 可否简单举个例子。
Thanks,
Bin










在 2020-06-23 11:57:28,"Leonard Xu"  写道:
>Hi,
>是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master 
>分支上的版本号为1.12-SNAPSHOT
>,等1.11版本发布了就可以看到对应的文档。
>
>回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10 版本也可以一个作业搞定。 把 `  
>select * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….` 
> 这段 sql insert 到中间结果表 和 group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
>另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
>
>
>祝好,
>Leonard Xu


Re:Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-22 文章 Weixubin



感谢,我查阅了下资料后发现CREATE VIEW这个语法是在Flink.1.12有提及而1.10版本没有 ,1.12版本暂未发行, 
而我目前使用的版本是1.10版本。
而且很奇怪,我并没有找到1.11版本的文档














在 2020-06-23 10:58:25,"Leonard Xu"  写道:
>Hi,
>
>> 在 2020年6月23日,10:49,Weixubin <18925434...@163.com> 写道:
>> 
>> //这个时候我希望能够创建一张临时中间表  tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 
>> 并不支持这么做)
>
>
>看着描述应该是源数据中的一行拆成多行。这个需求是不是用 VIEW 就可以了[1]?Flink SQL 支持 VIEW 语法的[1]。
>
>Best,
>Leonard Xu 
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html#create-view
> 
><https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/create.html#create-view>


FlinkSQL 是否支持类似临时中间表的概念

2020-06-22 文章 Weixubin
Hi,
我希望通过FlinkSQL的方式在一个Job中完成两步的操作,但似乎办不到,情况大致如下:


eg.有一个ETL过程,需要从Source获取数据--将每条数据拆分为一条多列数据--对拆分完的数据开窗聚合--输出到sink。
//从Source获取数据
CREATE TABLE sourceTable (
  request_uri STRING
) WITH (
   ..
);


//这个时候我希望能够创建一张临时中间表  tempTable用来存放 对Source表中数据拆分为多列后的结果,类似于下面这样(Flink 并不支持这么做)
CREATE TABLE tempTable (
  row1 STRING,
  row2 STRING,
) 
Insert into tempTable   select * from sourceTable , LATERAL 
TABLE(ParseUriRow(request_uri)) as T( )


//最后从 tempTable 表中获取数据并进行开窗做聚合操作
CREATE TABLE sinkTable (
  row1 STRING,
) 
INSERT INTO sinkTable SELECT .., SUM(unit) AS unitSum from tempTable GROUP BY 
TUMBLE(rowtime,INTERVAL '30' SECOND), ...


以上是大致情况描述,我希望能在一个Job中一次性完成以上数据处理,而不分成两个Job,不知是否有好的解决方案?
Thank!



Re:flink 通过 --classpath 传入https-xxx.xxx.jar 在1.8上正常运行 在flink1.10 就会报传入的jar包找不到

2020-06-22 文章 Weixubin
和版本应该没什么关系。如果是多节点部署的情况下,-C 所指定的URL 需要各个节点都能访问得到。 确认下该URL能被所有节点访问到吗 
 Best,
 Bin














At 2020-06-22 11:43:11, "程龙" <13162790...@163.com> wrote:
>2020-06-22 10:16:34,379 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(6/6) (5dd98ec92e0d5e53597cb7520643c7f5) switched from SCHEDULED to DEPLOYING.
>2020-06-22 10:16:34,379 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Deploying 
>Sink: Unnamed (6/6) (attempt #0) to container_1590655249980_0081_01_02 @ 
>al-bj-bigdata-inf-research-flink04 (dataPort=34781)
>2020-06-22 10:16:34,456 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (5/6) (aca298438b9eb6fcf295cb8af6eebcd8) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,481 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(3/8) (0daed15d107c3031891f0c9e84093068) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,492 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(1/6) (44ca248aba351026452ba4abdb5f33a6) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,497 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(1/8) (70612735eb755269fe9590e8ab51d3e2) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,512 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(2/8) (baba331cd6bcde1f5a6024eac0b953b4) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,524 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (4/6) (a34992362c2cf3634a29bd5a9c188754) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,531 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (2/6) (7683c15e3ebb3e718c2439c6e32f0d7d) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,564 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (6/6) (6ab1aaa5e1811c79c702197f984e9bb6) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,609 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (1/6) (6245b508b8e0494f06ef71c6ad4954b6) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,616 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(4/8) (2ea049476f0b48fcd85dcd9084091e9f) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,650 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(2/6) (e785e44e4212dcc5279bcde761b28292) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,656 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(5/6) (97a5e72f731d14aa97d93253b71b6aeb) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,662 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(7/8) (59babdbc2d7bea362a2794a966fe59ef) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,664 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
>Source (3/6) (aed33b78cc847561908ea43164e0311a) switched from DEPLOYING to 
>RUNNING.
>2020-06-22 10:16:34,669 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(6/8) (fd87d0111b10e9f9027068a72e9ce209) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,726 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(6/6) (5dd98ec92e0d5e53597cb7520643c7f5) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,729 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(8/8) (22b73fa7f7435a405e1102a3480c09c1) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:34,760 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(4/6) (4c677579ef44cf394618af38a75497da) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:37,081 INFO  
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
>checkpoint 1 @ 1592792197072 for job 1797e2f64b7b1caeb6356608290263cc.
>2020-06-22 10:16:45,065 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
>(3/6) (d9a9c913dcbf782bd933b0adae157b38) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:45,066 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(5/8) (39dbdd04e2066d1d93be1641c0ab7add) switched from DEPLOYING to RUNNING.
>2020-06-22 10:16:48,512 INFO  
>org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter -> Map 
>(3/8) (0daed15d107c3031891f0c9e84093068) switched from RUNNING to FAILED on 
>org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@25d7d2c5.
>java.lang.NoClassDefFoundError: core
>
>
>
>看日志是在部分task里面报错


Re:Re: 回复: 关于拓展 Tuple元组的问题

2020-06-19 文章 Weixubin
感谢,这边做了一下简单测试,已解决~   真香




//Demo
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;


public class ParseUriRow extends TableFunction {

public void eval(String data) {
//逻辑处理
}

@Override
public TypeInformation getResultType() {
return Types.ROW(
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING,
Types.STRING
);
}
}
 


















在 2020-06-19 15:46:42,"Jark Wu"  写道:
>用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
>
>
>On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
>
>> 感谢你的回答,请问可否举一个参照例子?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn" <
>> wangweigu...@stevegame.cn> 写道:
>> >
>> >   多个值组合在一起,当一个复合值使用!
>> >
>> >
>> >
>> >
>> >发件人: 魏旭斌
>> >发送时间: 2020-06-19 15:01
>> >收件人: user-zh
>> >主题: 关于拓展 Tuple元组的问题
>> >目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。
>> 请问有什么解决的方案? 谢谢
>>


Re:回复: 关于拓展 Tuple元组的问题

2020-06-19 文章 Weixubin
感谢你的回答,请问可否举一个参照例子?

















在 2020-06-19 15:31:53,"wangweigu...@stevegame.cn"  
写道:
>
>   多个值组合在一起,当一个复合值使用!
>
>
>
> 
>发件人: 魏旭斌
>发送时间: 2020-06-19 15:01
>收件人: user-zh
>主题: 关于拓展 Tuple元组的问题
>目前Flink 提供了Tuple1 ~ Tuple25,在实际开发中不能满足我们的需求,我们希望能有更大的元组,比如构建一个Tuple50。 
>请问有什么解决的方案? 谢谢