Re: Data is lost in the ListState

2022-07-12 Thread Arthur Li
Thanks David, it’s clear now.  
In my case, the x2 will be recovered by the last checkpoint if it was processed 
and wait few seconds to be checkpointed Or my source can be rewind and it will 
replay the x2.


> 2022年7月12日 09:10,David Anderson  写道:
> 
> This is, in fact, the expected behavior. Let me explain why:
> 
> In order for Flink to provide exactly-once guarantees, the input sources must 
> be able to rewind and then replay any events since the last checkpoint.
> 
> In the scenario you shared, the last checkpoint was checkpoint 2, which 
> occurred before x2 was processed. The x3 input caused a failure, and the 
> state from checkpoint 2 was restored. This state contained only [a, b, c].
> 
> Since sockets don't rewind and replay their input, the x2 has been lost -- it 
> wasn't checkpointed, nor did you repeat it after the failure.
> 
> If the source had been something that supported rewind and replay, like 
> kafka, the events since the offsets stored in the checkpoint would have been 
> automatically re-processed and nothing would have been lost.
> 
> Hope that was helpful,
> David



Re: Flink 1.14.4 HybridSource consumes lots of CPU resources

2022-05-03 Thread Arthur Li
Hi Mason,

I upload  the code and resource files to AwesomeArthurLi/quickstart: quickstart 
(github.com) <https://github.com/AwesomeArthurLi/quickstart>, may it will help 
you reproduce the issue.

BR.
Arthur Li


> 2022年5月3日 15:48,Mason Chen  写道:
> 
> Hi Arthur,
> 
> Coincidentally, I also encountered a similar issue recently. For my issue, I 
> noticed that the source implementation always marks itself as having data 
> available causing the Flink runtime to repeatedly loop in succession and 
> causing high CPU utilization. More details in here: 
> https://issues.apache.org/jira/browse/FLINK-27479 
> <https://issues.apache.org/jira/browse/FLINK-27479>
> 
> Can you provide a minimal working example to reproduce this issue? I presume 
> you notice high CPU utilization before switching from FileSource and also 
> after switching to KafkaSource?
> 
> Best,
> Mason
> 
> On Sun, May 1, 2022 at 6:24 AM Arthur Li  <mailto:lianyou1...@126.com>> wrote:
> Following snapshot is the java process’s frame graph.
> 
> <粘贴的图形-1.png>
> 
> 
>> 2022年5月1日 09:14,Arthur Li mailto:lianyou1...@126.com>> 
>> 写道:
>> 
>> Hi all,
>> 
>>  the Hybrid Source | Apache Flink 
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/#hybrid-source>
>>  is one of new features of Flink 1.14.x,  but one problem is it takes over 
>> 700% CPU which is almost 5 times than these two splits.
>> 
>> 
>> My Environment:
>> JDK:  11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
>> Scala: Scala code runner version 2.12.14
>> OS: MacOS Monterey
>> 
>> 
>> Hybrid Source Code:
>> 
>> object HelloHybrid {
>>   def main(args: Array[String]): Unit = {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val kafka =
>>   KafkaSource.builder[String]()
>> .setBootstrapServers("localhost:9092")
>> .setTopics("lab-flink-sensor-iot")
>> .setGroupId("sensor-iot-group")
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setValueOnlyDeserializer(new SimpleStringSchema())
>> .build()
>> 
>> val sensorDataFile = 
>> "/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
>> val fileData = FileSource.forRecordStreamFormat(
>>   new TextLineFormat(),
>>   Path.fromLocalFile(new File(sensorDataFile)))
>>   .build()
>> 
>> val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()
>> 
>> env.fromSource(hybridSrc,
>>   WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
>>   "kafka & file hybrid source")
>>   .map(data => {
>> val arr = data.split(",").map(_.trim)
>> SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
>>   })
>>   .print("hybrid")
>> 
>> env.execute("Hello kafka & file hybrid source")
>>   }
>> }
>> 
>> 
> 



Flink 1.14.4 HybridSource consumes lots of CPU resources

2022-04-30 Thread Arthur Li
Hi all,

 the Hybrid Source | Apache Flink 

 is one of new features of Flink 1.14.x,  but one problem is it takes over 700% 
CPU which is almost 5 times than these two splits.


My Environment:
JDK:  11.0.12 (x86_64) "Oracle Corporation" - "Java SE 11.0.12"
Scala: Scala code runner version 2.12.14
OS: MacOS Monterey


Hybrid Source Code:

object HelloHybrid {
  def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafka =
  KafkaSource.builder[String]()
.setBootstrapServers("localhost:9092")
.setTopics("lab-flink-sensor-iot")
.setGroupId("sensor-iot-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()

val sensorDataFile = 
"/Users/arthur/Workspace/flink-summer/src/main/resources/sensor.csv"
val fileData = FileSource.forRecordStreamFormat(
  new TextLineFormat(),
  Path.fromLocalFile(new File(sensorDataFile)))
  .build()

val hybridSrc = HybridSource.builder(fileData).addSource(kafka).build()

env.fromSource(hybridSrc,
  WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
  "kafka & file hybrid source")
  .map(data => {
val arr = data.split(",").map(_.trim)
SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
  })
  .print("hybrid")

env.execute("Hello kafka & file hybrid source")
  }
}