my code and log is as below.
val getExecuteEnv: StreamExecutionEnvironment = {
val env =
StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
env
}
def transform(target: DataStream[(String, String, String, String, Long)]):
DataStream[WinLossBase] =
target.keyBy(_._3).flatMap(new StateOperator)
def main(args: Array[String]) {
val env = getExecuteEnv
val source: DataStream[String] =
extractFromKafka(env).name("KafkaSource")
val json = deserializeToJsonObj(source).name("ConvertToJson")
val target: DataStream[(String, String, String, String, Long)] =
preTransform(json)
val result: DataStream[WinLossBase] =
transform(target).name("ToKeyedStream”)
…
}
class StateOperator extends RichFlatMapFunction[(String, String, String,
String, Long), WinLossBase] {
var playerState: ValueState[util.Map[String, PotPlayer]] = _
var handState: ValueState[HandHistoryInfo] = _
override def open(param: Configuration): Unit = {
val playerValueStateDescriptor = new
ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String,
PotPlayer]())
playerState = getRuntimeContext.getState(playerValueStateDescriptor)
handState = getRuntimeContext.getState(new
ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
}
override def flatMap(in: (String, String, String, String, Long), out:
Collector[WinLossBase]): Unit = {
in._2 match {
case "GameStartHistory" =>
val players = playerState.value()
val obj = _convertJsonToRecord(in._4,
classOf[GameStartHistoryRecord])
val record = obj.asInstanceOf[GameStartHistoryRecord]
val handHistoryInfo: HandHistoryInfo =
_setUpHandHistoryInfo(record)
if (LOG.isInfoEnabled())
LOG.info("hand start {}", if (handHistoryInfo != null)
handHistoryInfo.handHistoryId else "NULL”)
….
playerState.update(players)
handState.update(handHistoryInfo)
case "HoleCardHistory" =>
val players = playerState.value()
if (players != null) {
...
playerState.update(players)
} else LOG.warn("there is no player[hole card]. {}", in._4)
case "PlayerStateHistory" =>
val players = playerState.value()
if (players != null) {
….
playerState.update(players)
} else LOG.warn("there is no player[player state]. {}",
in._4)
case "CommCardHistory" =>
val handHistoryInfo = handState.value()
val commCardHistory: CommCardHistory = commCardState.value()
if (handHistoryInfo != null) {
...
handState.update(handHistoryInfo)
commCardState.update(commCardHistory)
} else LOG.warn("there is no handhistory info[comm card].
{}", in._4)
case "PlayerActionHistory" =>
val handHistoryInfo = handState.value()
val players = playerState.value()
if (handHistoryInfo != null) {
...
} else LOG.warn("there is no handhistory info[player
action]. {}", in._4)
case "PotHistory" =>
val players = playerState.value()
val handHistoryInfo = handState.value()
val commCardHistory: CommCardHistory = commCardState.value()
if (handHistoryInfo != null && handHistoryInfo.playType ==
PlayType.Cash && players != null && players.size > 1) {
...
} else LOG.warn("there is no handhistory info[pot]. {}",
in._4)
case "GameEndHistory" =>
val players = playerState.value()
val handHistoryInfo = handState.value()
...
if (LOG.isTraceEnabled()) LOG.trace("end {}",
record.getHandHistoryId)
playerState.clear()
handState.clear()
case _ =>
}
}
—— log ——
2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map
-> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)]
INFO com.nsuslab.denma.stream.winloss.flow.Main$ - hand start
5769392597641628595
2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map
-> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)]
WARN com.nsuslab.denma.stream.winloss.flow.Main$ - there is no handhistory
info[pot].
> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[email protected]> wrote:
>
> What do you mean with lost exactly?
>
> You call value() and it returns a value (!= null/defaultValue) and you
> call it again and it returns null/defaultValue for the same key with
> no update in between?
>
> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
> <[email protected]> wrote:
>> Hello,
>>
>> Could you share the code of the job you are running?
>> With only this information I am afraid we cannot help much.
>>
>> Thanks,
>> Kostas
>>
>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[email protected]> wrote:
>>>
>>> Hi.
>>> I’m using flink 1.0.3 on aws EMR.
>>> sporadically value of ValueState is lost.
>>> what is starting point for solving this problem.
>>> Thank you.
>>