自定义partition,使用遇到问题,附代码

2021-02-19 文章 Ye Chen
各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
//自定义partition
public class customPartitioner extends FlinkKafkaPartitioner {
@Override
public int partition(String record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
return 0;
}
}


DataStream stream = 。。。
FlinkKafkaProducer myProducer = new FlinkKafkaProducer<>(
"test_topic",
new SimpleStringSchema(),
properties,
new customPartitioner()
);
stream.addSink(myProducer);


//上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java: 
无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
//去掉new 
customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数




查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
public FlinkKafkaProducer(
String topicId,
SerializationSchema serializationSchema,
Properties producerConfig,
Optional> customPartitioner) {
this(
topicId,
serializationSchema,
producerConfig,
customPartitioner.orElse(null),
Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}





flink sql source kafka sink 到 mysql 遇主健冲突出现append现象

2021-02-19 文章 Yu Wang
kafka 数据格式:
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613802148092,"end_time":1613803144969,"process_time":"2021-02-20
06:39:05.088"}
{"building_id":"trRlI9PL","sofa_id":"dJI53xLp","floor_num":10,"occupy_status":1,"start_time":1613803609813,"end_time":1613803654280,"process_time":"2021-02-20
06:47:34.609"}


*kafka ddl :*
CREATE TABLE washroom_detail (
 building_id STRING,
 sofa_id STRING,
 floor_num INT,
 occupy_status INT,
 start_time BIGINT,
 end_time BIGINT,
 process_time TIMESTAMP,
 occupy_times as concat(date_format(TIMESTAMPADD(hour, 8,
cast(start_time / 1000 as timestamp)), 'HH:mm'), '-',
date_format(TIMESTAMPADD(hour, 8, cast(end_time / 1000 as timestamp)),
'HH:mm')),
 local_date as date_format(cast(start_time / 1000 as
timestamp), '-MM-dd'),
 day_hour as cast(date_format(cast(start_time / 1000 as
timestamp), 'HH') as INT) + 8
) WITH (
 'connector' = 'kafka',
 'topic' = '',
 'properties.bootstrap.servers' = 'xxx',
 'properties.group.id' = '',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'json'
);


*mysql ddl:*
  create table hour_ddl
(
building_idSTRING,
sofa_id  STRING,
local_date STRING,
`hour`  INT,
floor_num INT,
occupy_frequency INT,
occupy_times STRING,
update_time TIMESTAMP,
process_time TIMESTAMP,
primary key (building_id, sofa_id, local_date, `hour`)
NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = '',
  'table-name' = '',
  'username' = 'x'
  'password' = 'xx'
  )


*flink sql dml:*
INSERT INTO hour_ddl (building_id, sofa_id, local_date, `hour`, floor_num,
occupy_frequency, occupy_times, update_time, process_time)
SELECT a.building_id,
   a.sofa_id,
   a.local_date,
   a.day_hour,
   a.floor_num,
   CAST(a.frequency + IF(b.occupy_frequency IS NULL, 0,
b.occupy_frequency) AS INT),
   concat(if(b.occupy_times IS NULL, '', b.occupy_times),
if(b.occupy_times IS NULL, a.times, concat(',', a.times))),
   NOW(),
   a.process_time
FROM
(SELECT building_id,
sofa_id,
local_date,
day_hour,
floor_num,
count(1) AS frequency,
LISTAGG(occupy_times) AS times,
MAX(process_time) AS process_time,
PROCTIME() AS compute_time
 FROM washroom_detail
 GROUP BY building_id,
  sofa_id,
  local_date,
  day_hour,
  floor_num) a
LEFT JOIN hour_ddl
FOR SYSTEM_TIME AS OF a.compute_time AS b ON a.building_id = b.building_id
AND a.sofa_id = b.sofa_id
AND a.local_date = b.local_date
AND a.day_hour = b.`hour`
WHERE a.process_time > b.process_time
OR b.process_time IS NULL

现象:
当mysql 没有数据时,插入一条记录
occupy_frequencyoccupy_times
  1  15:01-15:03
当主键冲突时
occupy_frequencyoccupy_times
  3  15:01-15:03,15:01-15:03,15:03-15:04
希望应该是
occupy_frequencyoccupy_times
  2  15:01-15:03,15:03-15:04


SqlValidatorException: No match found for function signature prod()

2021-02-19 文章 xiaoyue
我在使用flinksql1.11的udaf时出现SqlValidatorException: No match found for function 
signature prod(),请求大佬帮忙看看_(:з」∠)_

以下是代码:
-
...
  stableEnv.createTemporarySystemFunction("prod", 
ProductAggregateFunction.class);
  Table resultTable = stableEnv.sqlQuery("select pf_id,prod(yldrate+1)-1 as 
yldrate from queryData group by pf_id");
...
-
@FunctionHint(
input = @DataTypeHint("Double"),
output = @DataTypeHint("Double")
)
public class ProductAggregateFunction extends AggregateFunction {


@Override
public Double getValue(Product acc) {
return acc.prod;
}
@Override
public Product createAccumulator() {
return new Product();
}
public void accumulate(Product acc, Double iValue) {
acc.prod *= iValue;
}
public void retract(Product acc, Double iValue) {
acc.prod /= iValue;
}
public void merge(Product acc, Iterable it) {
for (Product p : it) {
accumulate(acc, p.prod);
}
}
public void resetAccumulator(Product acc) {
acc.prod = 1D;
}
}

Re:flink双流join如何确保数据不丢失

2021-02-19 文章 Smile@LETTers
用 left join 或者 full join?这样的话关联不上的数据在区间结束的时候也会被输出,对侧表的字段用 null 填充。目前 
DataStream API 里面 Interval Join 还不支持 outer join,不过 Table API/SQL 
是支持的,参考[1]。[1]. 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#joins
在 2021-02-08 19:05:56,"lxk7...@163.com"  写道:
>
>目前在使用flink进行双流join,多是使用interval join,根据经验值给定时间间隔,那么该如何保证数据不丢失呢?
>如果数据晚于这个时间间隔,那么数据就被丢弃了,而我做的是关于订单的数据,这是不被允许的。
>
>
>lxk7...@163.com


Re: Flink SQL temporal table join with Hive 报错

2021-02-19 文章 Leonard Xu

> 
>  二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key
> 
>   这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' = 
> 'all',但是还是因为没有 primary Key,所以无法 run。
> 
> 现在就是针对第二种情况,因为Hive的维度表不是我维护的,很多人都在用,所以不能修改去加上 primary key,无法进行 join.

第二种情况,hive表不是streaming读的,相当于是一张静态表,每次都是加载最新的全量,所以配置如下参数即可
  'streaming-source.enable' = 'false',  -- option with default value, 
can be ignored.
  'streaming-source.partition.include' = 'all', -- option with default value, 
can be ignored.
  'lookup.join.cache.ttl' = '12 h’
   'streaming-source.partition.include' = ‘all’  是默认值,也可以不配, 参考【1】
> 
> 
> 还有我看文档现在不支持 event time join, 官网的汇率是按照 process time 
> join,但是如果要回溯昨天的数据的时候,其实就会有问题。
> 
> 我看 FLIP-132 
> 
>  有提到 Event Time semantics, 这是以后回支持的吗?

Kafka connector已经支持了 event time join, 但hive表目前还不支持在上面声明watermark,所以还不支持


祝好,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/hive/hive_read_write/#temporal-join-the-latest-table

> 
> 
> macia kk mailto:pre...@gmail.com>> 于2021年2月8日周一 下午6:53写道:
> Hi. Leonard
> 
>   麻烦帮忙看下 Flink 邮件里的这个问题,卡了我很久了,谢谢



Re:flink on k8s日志时间戳时区问题

2021-02-19 文章 Michael Ran
k8s  设置的
在 2021-02-19 09:37:28,"casel.chen"  写道:
>目前是UTC时区的,怎样才能设置成当地的东8区呢?谢谢!
>
>
>2021-02-19 01:34:21,259 INFO  akka.event.slf4j.Slf4jLogger 
>[] - Slf4jLogger started
>2021-02-19 01:34:22,155 INFO  akka.remote.Remoting 
>[] - Starting remoting
>2021-02-19 01:34:21,259 INFO akka.event.slf4j.Slf4jLogger [] - Slf4jLogger 
>started
>2021-02-19 01:34:22,155 INFO akka.remote.Remoting [] - Starting remoting


Re: flink可以同时对接两套独立kerberos认证的kafka吗

2021-02-19 文章 占英华
这个不行吧,kerberos本来支持互信的,现在你的环境不支持没法弄


> 在 2021年2月19日,18:33,liwei li  写道:
> 
> 请问flink是否可以从开启了kerberos的kafka接收数据后,发送到另一个Kerberos认证的kafka,源端和目标端的Kerberos相互独立且不能建立互信。
>  非常感谢! liwei li 邮箱:hilili...@gmail.com 签名由 网易邮箱大师 定制


flink可以同时对接两套独立kerberos认证的kafka吗

2021-02-19 文章 liwei li
请问flink是否可以从开启了kerberos的kafka接收数据后,发送到另一个Kerberos认证的kafka,源端和目标端的Kerberos相互独立且不能建立互信。
 非常感谢! liwei li 邮箱:hilili...@gmail.com 签名由 网易邮箱大师 定制

Flink SQL时间序列化问题

2021-02-19 文章 guaishushu1...@163.com
Flink-1.12.0 SQL定义timestamp(3)格式出现时间解析问题

CREATE TABLE 
user_log1 (
user_id string,  
 ts TIMESTAMP(3),   
  proc_time as PROCTIME()) WITH (


Caused by: java.io.IOException: Failed to deserialize JSON 
'{"user_id":"1188","ts":"2021-02-19T17:52:20.921Z"}'.
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema


guaishushu1...@163.com