大佬好,我在1.11版本测试flink sql时发现一个问题,用streaming api
消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui 
watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka
topic只有一个分区时却能这个正常触发计算,watermarks也显示正常下面是测试代码:package com.test.opsimport
java.util.Propertiesimport com.fasterxml.jackson.databind.{JsonNode,
ObjectMapper}import
org.apache.flink.api.common.restartstrategy.RestartStrategiesimport
org.apache.flink.api.common.serialization.SimpleStringSchemaimport
org.apache.flink.configuration.Configurationimport
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanupimport
org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport
org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport
org.apache.flink.table.api.EnvironmentSettingsimport
org.apache.flink.types.Rowimport scala.collection.JavaConversions._import
org.apache.flink.api.scala._import org.apache.flink.table.api._import
org.apache.flink.table.api.bridge.scala._object FlinkSqlTest { 
//源json解析结构化case类,以及基于此case类,分拆出来的RhoLog和PpoLog case类 分别计算  case class
SrcLog(userid: String, guid: String, rho: JsonNode, ts: Long)  case class
RhoLog(userid: String, guid: String, ts: Long)  def main(args:
Array[String]): Unit = {    val parallelism = 3    val kafkaBrokers =
"172.x.x.x:9092"    val jobName = "flinksql-test"    val topicNames =
List("ops_nginx_logs")    val groupName = "test-ops-100000"    val
properties = new Properties()    //流处理的环境构造    val conf: Configuration = new
Configuration()    import org.apache.flink.configuration.ConfigConstants   
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)    val
streamEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)   
streamEnv.enableCheckpointing(10000)   
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)   
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   
streamEnv.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000))   
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
   
streamEnv.setParallelism(parallelism)    // table环境构造    val
blinkTableEnvSettings = EnvironmentSettings.newInstance()     
.useBlinkPlanner()      .inStreamingMode()      .build()    val tableEnv =
StreamTableEnvironment.create(streamEnv, blinkTableEnvSettings)   
properties.setProperty("bootstrap.servers", kafkaBrokers)   
properties.setProperty("group.id", groupName)    val myConsumer = new
FlinkKafkaConsumer[String](topicNames, new SimpleStringSchema(), properties)   
//从kafka最新offset开始消费    myConsumer.setStartFromLatest()    val mapper = new
ObjectMapper    val srcStream = streamEnv.addSource(myConsumer)     
.filter(_.nonEmpty).filter(_.contains("\"message\":{")).map(line => {     
val rootNode = mapper.readTree(line)      val rowTime =
rootNode.get("@timestamp").asText()      //      val timeStamp =
dateTimeToTimestampJdk8(rowTime)      val timeStamp = rowTime.toLong     
val messageNode = mapper.readTree(rootNode.get("message").toString)      val
rho = messageNode.get("rho")      val userid = if (messageNode.has("u"))
messageNode.get("u").asText else "nodata"      val guid = if
(messageNode.has("g")) messageNode.get("g").asText else "nodata"     
SrcLog(userid, guid, rho, timeStamp)    })    val rhoStream =
srcStream.map(src => {      RhoLog(src.userid, src.guid, src.ts)   
}).assignAscendingTimestamps(row => {        println(row.ts)        row.ts     
}) //水位线    //流转表    val table = tableEnv.fromDataStream(rhoStream, 'userid,
'guid, 'ts as 'ts1, 'ts.rowtime() as 'ts)    //源表转窗口聚合表    val
resTableEventtime = table.window(Tumble over 10.seconds on 'ts as 'window)     
.groupBy('userid, 'window, 'ts, 'ts1)      .select('userid, 'guid.count,
'window.start(), 'window.end(), 'ts, 'ts1)    // 窗口聚合表输出   
resTableEventtime.toAppendStream[Row].print("test")    streamEnv.execute() 
}}症状:当topic为单分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic
ops_nginx_logs1Topic:ops_nginx_logs1    PartitionCount:1        
ReplicationFactor:3
Configs:        Topic: ops_nginx_logs1  Partition: 0    Leader: 104     
Replicas:
104,107,105     Isr: 104,107,105flinksql 聚合结果:test:1>
aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:17.932,1596979157932test:2>
aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:19.932,1596979159932test:3>
aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:11.930,1596979151930test:3>
aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:15.931,1596979155931test:3>
aaaaa,1,2020-08-09T13:19:10,2020-08-09T13:19:20,2020-08-09T13:19:13.931,1596979153931
<http://apache-flink.147419.n8.nabble.com/file/t837/3.png> web ui
watermarks显示正常 <http://apache-flink.147419.n8.nabble.com/file/t837/4.png>
当kafka topic为多分区时:kafka-topics --zookeeper 172.x.x.x:2181 --describe --topic
ops_nginx_logsTopic:ops_nginx_logs      PartitionCount:3        
ReplicationFactor:3
Configs:        Topic: ops_nginx_logs   Partition: 0    Leader: 104     
Replicas:
104,106,107     Isr: 104,106,107        Topic: ops_nginx_logs   Partition: 1    
Leader: 105
Replicas: 105,107,104   Isr: 105,107,104        Topic: ops_nginx_logs   
Partition: 2
Leader: 106     Replicas: 106,104,105   Isr: 106,104,105flinksql 
聚合计算迟迟不计算,没有结果输出:
<http://apache-flink.147419.n8.nabble.com/file/t837/2.png> web ui watermarks
显示 No Watermark <http://apache-flink.147419.n8.nabble.com/file/t837/1.png> 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复