Hi all,
Thanks very much. I wants to debug checkpoint with code. Below is my code.
Anyway I am sorry I doesn’t understand UT class.
def demo(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(10000)
val checkpointConfig = env.getCheckpointConfig
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
checkpointConfig.setMinPauseBetweenCheckpoints(5000)
checkpointConfig.setCheckpointTimeout(5000)
checkpointConfig.setMaxConcurrentCheckpoints(1)
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val fsStateBackend: StateBackend = new FsStateBackend(STATE_BACKEND)
env.setStateBackend(fsStateBackend)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 30000))
//TODO recovery my checkpoint here or run this job from my checkpoint
// how to run this job with checkpoint metadata ? use CheckpointCoordinator ??
val dataStream: DataStream[String] =
env.addSource(streamSource).name("mysource")
dataStream.addSink(new MySQLSink).uid("tesCheckpoint").name("mysink")
env.execute()
}
MySQLSink:
class MySQLSink extends RichSinkFunction[String] with CheckpointedFunction {
private val bufferSize = 50
private var count: AtomicInteger = _
private var cacheData: ListBuffer[String] = ListBuffer[String]()
private var checkpointedState: ListState[(String, ListBuffer[String])] = _
override def open(parameters: Configuration): Unit = {
count = new AtomicInteger(0)
}
override def invoke(jsonData: String, context: SinkFunction.Context[_]): Unit
= {
val flag = count.getAndIncrement()
val end: Long = System.currentTimeMillis()
val result = jsonData.substring(0,jsonData.length-1) + ",\"fend\":"+end+"}";
if (flag >= bufferSize) {
cacheData += result
saveDataList()
cacheData.clear()
count.set(1)
} else {
cacheData += result
}
}
def saveDataList(): Unit = {
}
override def close(): Unit = {
super.close()
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
val buffer = ListBuffer[(String, ListBuffer[String])](("nlcpTestData",
cacheData))
checkpointedState.addAll(buffer.toList.asJava)
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val listStateDesc = new ListStateDescriptor[(String,
ListBuffer[String])]("nlcpTestData", TypeInformation.of(new TypeHint[(String,
ListBuffer[String])]() {}))
val stateStore: OperatorStateStore = context.getOperatorStateStore
checkpointedState = stateStore.getListState(listStateDesc)
if (context.isRestored) {
val data = checkpointedState.get().iterator()
while (data.hasNext) {
cacheData ++= data.next()._2
}
}
}
}
> 在 2019年11月7日,12:03,Congxian Qiu <[email protected]> 写道:
>
> Hi,
> If you just want to debug, maybe you can do this in UT class in module
> flink-runtime :) so that you do not need to handle the dependency problem,
> and access problem.
>
> Best,
> Congxian
>
>
> Jark Wu <[email protected]> 于2019年11月6日周三 下午3:39写道:
>
>> Btw, user questions should be asked in [email protected] or [email protected]. The
>> dev
>> ML is mainly used to discuss development.
>>
>> Best,
>> Jark
>>
>> On Wed, 6 Nov 2019 at 15:36, Jark Wu <[email protected]> wrote:
>>
>>> Hi,
>>>
>>> Savepoint.load(env, path) is in state processor API library, you should
>>> add the following dependency in your project.
>>>
>>> <dependency>
>>> <groupId>org.apache.flink</groupId>
>>> <artifactId>flink-state-processor-api_2.11</artifactId>
>>> <version>1.9.1</version>
>>> </dependency>
>>>
>>>
>>> You can see the docuementation for more detailed instructions [1].
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
>>>
>>> On Wed, 6 Nov 2019 at 09:21, qq <[email protected]> wrote:
>>>
>>>> Hi all,
>>>> I want to load checkpoint or savepoint metadata on dev . in this case
>>>> , I want to debug saved checkpoint metadata. And I knew flink provided a
>>>> api which is Savepoint.load(env, path), but I can’t find it and can’t
>> use
>>>> it. Anyone who know about this ? Could you help me ? Thanks very much;
>>>>
>>>>
>>
>