Re: Re: Kafka 数据源无法实现基于事件时间的窗口聚合

2023-02-08 Thread yidan zhao
实际使用你肯定不会是console producer吧。或者你换java代码写kafka,方便控制些。

wei_yuze  于2023年2月8日周三 13:30写道:
>
> 非常感谢各位的回答!
>
>
>
> Weihua和飞雨正确定位出了问题。问题出在Flink 并发数大于Kafka分区数,导致部分Flink task slot 
> 接收不到数据,进而导致watermark(取所有task slot的最小值)无法推进。
>
>
> 我尝试了Weihua提供的两个解决方案后都可以推进watermark求得窗口聚合结果。
>
>
> 后来我想,理想的解决方式应该是使Flink的并发数接近于或等于Kafka的分区数。我的Kafka分区数为3,于是Flink setParallelism 
> 为3。后来发现又无法推进watermark。检查Kafka后发现,kafka Console Producer把所有的数据都推送到了第0号分区。
>
>
>
> 请问哪位能指点一下,让Kafka topic的每个分区都能收到数据?
>
>
>
>
>
> Best,
>
> Lucas
>
>
>
> Original Email
>
>
>
> Sender:"Weihua Hu"< huweihua@gmail.com >;
>
> Sent Time:2023/2/7 18:48
>
> To:"user-zh"< user-zh@flink.apache.org >;
>
> Subject:Re: Kafka 数据源无法实现基于事件时间的窗口聚合
>
>
> Hi,
>
> 问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task
> 的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。
> 可以尝试通过以下办法解决:
> 1. 将 source 并发控制为 1
> 2. 为 watermark 策略开始 idleness 处理,参考 [#1]
>
> fromElement 数据源会强制指定并发为 1
>
> [#1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>
>
> Best,
> Weihua
>
>
> On Tue, Feb 7, 2023 at 1:31 PM wei_yuze  wrote:
>
> > 您好!
> >
> >
> >
> >
> > 
> 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource
> > 和 kafkaSource
> > 。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。
> >
> >
> >
> >
> > public class WindowReduceTest2 {    public static void
> > main(String[] args) throws Exception {
> >         StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >
> >         // 使用fromElement数据源
> >         DataStreamSource> env.fromElements(
> >                 new
> > Event2("Alice", "./home", "2023-02-04 17:10:11"),
> >                 new Event2("Bob",
> > "./cart", "2023-02-04 17:10:12"),
> >                 new
> > Event2("Alice", "./home", "2023-02-04 17:10:13"),
> >                 new
> > Event2("Alice", "./home", "2023-02-04 17:10:15"),
> >                 new 
> Event2("Cary",
> > "./home", "2023-02-04 17:10:16"),
> >                 new 
> Event2("Cary",
> > "./home", "2023-02-04 17:10:16")
> >         );
> >
> >
> >         // 使用Kafka数据源
> >         JsonDeserializationSchema > jsonFormat = new JsonDeserializationSchema<>(Event2.class);
> >         KafkaSource> 
> KafkaSource. >                
> > .setBootstrapServers(Config.KAFKA_BROKERS)
> >                
> > .setTopics(Config.KAFKA_TOPIC)
> >                
> > .setGroupId("my-group")
> >                
> > .setStartingOffsets(OffsetsInitializer.earliest())
> >                
> > .setValueOnlyDeserializer(jsonFormat)
> >                 .build();
> >         DataStreamSource> env.fromSource(source, 
> WatermarkStrategy.noWatermarks(), "Kafka Source");
> >         kafkaSource.print();
> >
> >
> >         // 生成watermark,从数据中提取时间作为事件时间
> >         SingleOutputStreamOperator > watermarkedStream =
> > 
> kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy. >                
> > .withTimestampAssigner(new SerializableTimestampAssigner>   
>                  
> > @Override
> >                    
> > public long extractTimestamp(Event2 element, long recordTimestamp) {
> >                    
> >     SimpleDateFormat simpleDateFormat = new
> > SimpleDateFormat("-MM-dd HH:mm:ss");
> >                    
> >     Date date = null;
> >                    
> >     try {
> >                    
> >         date =
> > simpleDateFormat.parse(element.getTime());
> >                    
> >     } catch (ParseException e) {
> >                    
> >         throw new RuntimeException(e);
> >                    
> >     }
> >                    
> >     long time = date.getTime();
> >                    
> >     System.out.println(time);
> >                    
> >     return time;
> >                     }
> >                 }));
> >
> >
> >         // 窗口聚合
> >         watermarkedStream.map(new MapFunction > Tuple2>                 
>    
> > @Override
> >                    
> > public Tuple2>               
>      
> >     // 将数据转换成二元组,方便计算
> >                    
> >     return Tuple2.of(value.getUser(), 1L);
> >                     }
> >                 })
> >                 .keyBy(r ->
> > r.f0)
> >                 // 设置滚动事件时间窗口
> >                
> > .window(TumblingEventTimeWindows.of(Time.seconds(5)))
> >                 .reduce(new
> > ReduceFunction        
> > @Override
> >                    
> > public Tuple2> Tuple2>             
>        
> >     // 定义累加规则,窗口闭合时,向下游发送累加结果
> >                    
> >     return Tuple2.of(value1.f0, value1.f1 + value2.f1);
> >                     }
> >                 })
> >                 
> .print("Aggregated
> > stream");
> >
> >
> >         env.execute();
> >     }
> > }
> >
> >
> >
> >
> >
> >
> > 值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows
> > ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。
> >
> >
> >
> > 感谢您花时间查看这个问题!
> > Lu

flink canal json格式忽略不识别的type

2023-02-08 Thread casel.chen
不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致
有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal json格式解析时直接忽略不识别的type,例如
例1:
{"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE
 TABLE `oms_parcels` (  `id` varchar(255) NOT NULL,  `createdby` varchar(255) 
DEFAULT NULL,  `createdat` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON 
UPDATE CURRENT_TIMESTAMP,  `updatedat` timestamp NOT NULL DEFAULT '-00-00 
00:00:00',  `updatedby` varchar(255) DEFAULT NULL,  `account` varchar(255) 
DEFAULT NULL,  `batch` varchar(255) DEFAULT NULL,  `client` varchar(255) 
DEFAULT NULL,  `command` varchar(255) DEFAULT NULL,  `container` varchar(255) 
DEFAULT NULL,  `items` mediumtext,  `trackingnumber` varchar(255) NOT NULL,  
`transporter` varchar(255) DEFAULT NULL,  `weight` decimal(19,2) NOT NULL,  
`zipcode` varchar(255) DEFAULT NULL,  `ld3` varchar(255) DEFAULT NULL,  
`destination_code` varchar(255) DEFAULT NULL,  PRIMARY KEY 
(`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT 
CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null}


例2:
{
"action":"ALTER",
"before":[],
"bid":0,
"data":[],
"db":"db_test",
"dbValType":{
"col1":"varchar(22)",
"col2":"varchar(22)",
"col_pk":"varchar(22)"
},
"ddl":true,
"entryType":"ROWDATA",
"execTs":1669789188000,
"jdbcType":{
"col1":12,
"col2":12,
"col_pk":12
},
"pks":[],
"schema":"db_test",
"sendTs":1669789189533,
"sql":"alter table table_test add col2 varchar(22) null",
"table":"table_test",
"tableChanges":{
"table":{
"columns":[
{
"jdbcType":12, // jdbc 类型。
"name":"col1",// 字段名称。
"position":0,  // 字段的顺序。
"typeExpression":"varchar(22)", // 类型描述。
"typeName":"varchar" // 类型名称。
},
{
"jdbcType":12,
"name":"col2",
"position":1, 
"typeExpression":"varchar(22)", 
"typeName":"varchar" 
},
{
"jdbcType":12, 
"name":"col_pk",   
"position":2,  
"typeExpression":"varchar(22)", 
"typeName":"varchar" 
}
],
"primaryKeyColumnNames":["col_pk"] // 主键名列表。
},
"type":"ALTER"
}
}