Thank you for reporting the issue. Mason has already identified the root
cause and the JIRA is now assigned to him:


On Tue, May 3, 2022 at 4:02 AM Martijn Visser <>

> I'm looping in @Thomas Weise <> since he has expertise on
> the HybridSource.
> On Tue, 3 May 2022 at 12:04, Arthur Li <> wrote:
>> Hi Mason,
>> I upload  the code and resource files to AwesomeArthurLi/quickstart:
>> quickstart ( <>,
>> may it will help you reproduce the issue.
>> BR.
>> Arthur Li
>> 2022年5月3日 15:48,Mason Chen <> 写道:
>> Hi Arthur,
>> Coincidentally, I also encountered a similar issue recently. For my
>> issue, I noticed that the source implementation always marks itself as
>> having data available causing the Flink runtime to repeatedly loop in
>> succession and causing high CPU utilization. More details in here:
>> Can you provide a minimal working example to reproduce this issue? I
>> presume you notice high CPU utilization before switching from FileSource
>> and also after switching to KafkaSource?
>> Best,
>> Mason
>> On Sun, May 1, 2022 at 6:24 AM Arthur Li <> wrote:
>>> Following snapshot is the java process’s frame graph.
>>> <粘贴的图形-1.png>
>>> 2022年5月1日 09:14,Arthur Li <> 写道:
>>> Hi all,
>>>  the Hybrid Source | Apache Flink
>>> <>
>>>  is
>>> one of new features of Flink 1.14.x,  but one problem is it takes over*
>>> 700% CPU* which is almost 5 times than these two splits.
>>> My Environment:
>>> JDK:  11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
>>> Scala: Scala code runner version 2.12.14
>>> OS: MacOS Monterey
>>> Hybrid Source Code:
>>> object HelloHybrid {
>>>   def main(args: Array[String]): Unit = {
>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>     val kafka =
>>>       KafkaSource.builder[String]()
>>>         .setBootstrapServers("localhost:9092")
>>>         .setTopics("lab-flink-sensor-iot")
>>>         .setGroupId("sensor-iot-group")
>>>         .setStartingOffsets(OffsetsInitializer.earliest())
>>>         .setValueOnlyDeserializer(new SimpleStringSchema())
>>>         .build()
>>>     val sensorDataFile = 
>>> "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
>>>     val fileData = FileSource.forRecordStreamFormat(
>>>       new TextLineFormat(),
>>>       Path.fromLocalFile(new File(sensorDataFile)))
>>>       .build()
>>>     val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>>>     env.fromSource(hybridSrc,
>>>       WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
>>>       "kafka & file hybrid source")
>>>       .map(data => {
>>>         val arr = data.split(",").map(_.trim)
>>>         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
>>>       })
>>>       .print("hybrid")
>>>     env.execute("Hello kafka & file hybrid source")
>>>   }
>>> }

Reply via email to