更乱了哦...可以尝试加个附件或推到 github, 贴个链接 On Fri, Feb 24, 2023 at 4:59 PM wei_yuze <wei_y...@qq.com.invalid> wrote:
> > 刚才的邮件正文代码出现乱码,现在重新发送。<br/>-------------------------------------------------<br/><br/>您好!<br/><br/>我在运行Flink程序时遇到了一个问题,特来向各位大佬请教。<br/><br/>程序目标:<br/>用Flink SQL求窗口&nbsp;Top-5,开一小时的窗口。数据源为Kafka,我分批向Kafka里传入数据。计算出的&nbsp;Top-5结果,写入MySQL。<br/><br/>问题:<br/>一小时窗口设置完全没生效,事件时间和处理时间两种时间语义都测试了。我每向Kafka里传入一批数据,MySQL都会看到五条新增的 Top-5数据,可两批源数据之间的时间间隔并没有到一小时。<br/><br/>问题代码初步定位:<br/>TUMBLE( TABLE watermarkedTable, DESCRIPTOR(ts), INTERVAL '1' HOUR )<br/><br/>完整源代码:<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; // Create table environment<br/>&nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment);<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; // 接入Kafka数据源<br/>&nbsp; &nbsp; &nbsp; &nbsp; KafkaSource<string> kafkaSource = KafkaSource<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .<string>builder()<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setBootstrapServers(Config.KAFKA_BROKERS)<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setTopics(Config.KAFKA_TOPIC_EVENT)<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setGroupId("flink-consumer")<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setStartingOffsets(OffsetsInitializer.earliest())<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .setValueOnlyDeserializer(new SimpleStringSchema())<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; DataStreamSource<string> stringStream = streamExecutionEnvironment<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .fromSource(<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; kafkaSource,<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; WatermarkStrategy.noWatermarks(),<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "Kafka string source without watermark"<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; );<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; // Deserialize string stream<br/>&nbsp; &nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<event> deserializedStream = stringStream<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .map(<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new MapFunction<string, event="">() {<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public Event map(String jsonString) throws Exception {<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ObjectMapper objectMapper = new ObjectMapper();<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; objectMapper.setDateFormat(simpleDateFormat);<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Event deserializedObject = objectMapper.readValue(jsonString, Event.class);<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return deserializedObject;<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; });<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<event> watermarkedStream = deserializedStream.assignTimestampsAndWatermarks(<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; WatermarkStrategy<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .<event>forBoundedOutOfOrderness(Duration.ofSeconds(0L))<br/>// &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;.withTimestampAssigner((event, l) -&gt; event.getTs().getTime())<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .withTimestampAssigner(<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new SerializableTimestampAssigner<event>() {<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override > <br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractTimestamp(Event event, long kafkaTimestamp) {<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Timestamp timestamp = event.getOccurrentTime();<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; long time = timestamp.getTime();<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return time;<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )<br/>&nbsp; &nbsp; &nbsp; &nbsp; );<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; // 将流数据转换成表<br/>&nbsp; &nbsp; &nbsp; &nbsp; Table watermarkedTable = streamTableEnvironment.fromDataStream(<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; watermarkedStream,<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $("uid"),<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $("eventId"),<br/>// &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;$("eventName"),<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $("serviceId"),<br/>// &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;$("serviceName"),<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; $("occurrentTime").rowtime().as("ts")<br/>&nbsp; &nbsp; &nbsp; &nbsp; );<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; // 注册table用于SQL API<br/>&nbsp; &nbsp; &nbsp; &nbsp; streamTableEnvironment.createTemporaryView("watermarkedTable", watermarkedTable);<br/>&nbsp; &nbsp; &nbsp; &nbsp; // watermarkedTable.printSchema();<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; String countQuery =<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "SELECT eventId, " +<br/>// &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;"eventName, " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "serviceId, " +<br/>// &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;"serviceName, " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "COUNT(*) as eventCount, " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "COUNT(DISTINCT uid) as userCount " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "FROM TABLE ( " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "TUMBLE( TABLE watermarkedTable, DESCRIPTOR(ts), INTERVAL '1' HOUR ) " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "GROUP BY eventId, serviceId ";<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; String sortEventQuery =<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "SELECT *, " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "ROW_NUMBER() OVER ( " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "ORDER BY eventCount desc " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") AS eventRank " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "FROM (" + countQuery + ") ";<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; String top5Query =<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "SELECT * " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "FROM (" + sortEventQuery + ") " +<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "WHERE eventRank &lt;= 5 ";<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; // 执行SQL得到结果表<br/>&nbsp; &nbsp; &nbsp; &nbsp; Table queryResultTable = streamTableEnvironment.sqlQuery(top5Query);<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp; // Create sink table<br/>&nbsp; &nbsp; &nbsp; &nbsp; String top5EventTableDDL =<br/>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "CREATE TABLE top5EventTable ( ......因为涉及账号密码,暂且省略......)";<br/>&nbsp; &nbsp; &nbsp; &nbsp; streamTableEnvironment.executeSql(top5EventTableDDL);<br/><br/>&nbsp; &nbsp; &nbsp; &nbsp;// &nbsp;持久化<br/>&nbsp; &nbsp; &nbsp; &nbsp; TableResult sinkResult = queryResultTable.executeInsert("top5EventTable");<br/><br/><br/>感谢您花时间查看这个问题!<br/>Lucas</event></event></event></string,></event></string></string></string>