I'm looping in @Thomas Weise <t...@apache.org> since he has expertise on the
HybridSource.

On Tue, 3 May 2022 at 12:04, Arthur Li <lianyou1...@126.com> wrote:

> Hi Mason,
>
> I upload  the code and resource files to AwesomeArthurLi/quickstart:
> quickstart (github.com) <https://github.com/AwesomeArthurLi/quickstart>,
> may it will help you reproduce the issue.
>
> BR.
> Arthur Li
>
>
> 2022年5月3日 15:48,Mason Chen <mas.chen6...@gmail.com> 写道:
>
> Hi Arthur,
>
> Coincidentally, I also encountered a similar issue recently. For my issue,
> I noticed that the source implementation always marks itself as having data
> available causing the Flink runtime to repeatedly loop in succession
> and causing high CPU utilization. More details in here:
> https://issues.apache.org/jira/browse/FLINK-27479
>
> Can you provide a minimal working example to reproduce this issue? I
> presume you notice high CPU utilization before switching from FileSource
> and also after switching to KafkaSource?
>
> Best,
> Mason
>
> On Sun, May 1, 2022 at 6:24 AM Arthur Li <lianyou1...@126.com> wrote:
>
>> Following snapshot is the java process’s frame graph.
>>
>> <粘贴的图形-1.png>
>>
>>
>> 2022年5月1日 09:14,Arthur Li <lianyou1...@126.com> 写道:
>>
>> Hi all,
>>
>>  the Hybrid Source | Apache Flink
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source>
>>  is
>> one of new features of Flink 1.14.x,  but one problem is it takes over*
>> 700% CPU* which is almost 5 times than these two splits.
>>
>>
>> My Environment:
>> JDK:  11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
>> Scala: Scala code runner version 2.12.14
>> OS: MacOS Monterey
>>
>>
>> Hybrid Source Code:
>>
>> object HelloHybrid {
>>
>>   def main(args: Array[String]): Unit = {
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     val kafka =
>>       KafkaSource.builder[String]()
>>         .setBootstrapServers("localhost:9092")
>>         .setTopics("lab-flink-sensor-iot")
>>         .setGroupId("sensor-iot-group")
>>         .setStartingOffsets(OffsetsInitializer.earliest())
>>         .setValueOnlyDeserializer(new SimpleStringSchema())
>>         .build()
>>
>>     val sensorDataFile = 
>> "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
>>     val fileData = FileSource.forRecordStreamFormat(
>>       new TextLineFormat(),
>>       Path.fromLocalFile(new File(sensorDataFile)))
>>       .build()
>>
>>     val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>>
>>     env.fromSource(hybridSrc,
>>       WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
>>       "kafka & file hybrid source")
>>       .map(data => {
>>         val arr = data.split(",").map(_.trim)
>>         SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
>>       })
>>       .print("hybrid")
>>
>>     env.execute("Hello kafka & file hybrid source")
>>   }
>> }
>>
>>
>>
>>
>>
>

Reply via email to