Nope. 
I added log in End.
but there is same log.
is there any fault in my code?

thank you.


> On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org> wrote:
> 
> You're clearing the "handState" on "GameEndHistory". I'm assuming this
> event comes in before "CommCardHistory" where you check the state.
> 
> On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>> in my code, is the config of ExecutionEnv alright?
>> 
>> 
>>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>> 
>>> 
>>> my code and log is as below.
>>> 
>>> 
>>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>>       val env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>       
>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
>>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>>>       env
>>>   }
>>> 
>>> def transform(target: DataStream[(String, String, String, String, Long)]): 
>>> DataStream[WinLossBase] =
>>>       target.keyBy(_._3).flatMap(new StateOperator)
>>> 
>>> def main(args: Array[String]) {
>>>       val env = getExecuteEnv
>>>       val source: DataStream[String] = 
>>> extractFromKafka(env).name("KafkaSource")
>>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
>>>       val target: DataStream[(String, String, String, String, Long)] = 
>>> preTransform(json)
>>>       val result: DataStream[WinLossBase] = 
>>> transform(target).name("ToKeyedStream”)
>>> …
>>> }
>>> 
>>> class StateOperator extends RichFlatMapFunction[(String, String, String, 
>>> String, Long), WinLossBase] {
>>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>>       var handState: ValueState[HandHistoryInfo] = _
>>> 
>>>       override def open(param: Configuration): Unit = {
>>>           val playerValueStateDescriptor = new 
>>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>>               classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, 
>>> PotPlayer]())
>>>           playerState = 
>>> getRuntimeContext.getState(playerValueStateDescriptor)
>>>           handState = getRuntimeContext.getState(new 
>>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>>       }
>>> 
>>>       override def flatMap(in: (String, String, String, String, Long), out: 
>>> Collector[WinLossBase]): Unit = {
>>>           in._2 match {
>>>               case "GameStartHistory" =>
>>>                   val players = playerState.value()
>>>                   val obj = _convertJsonToRecord(in._4, 
>>> classOf[GameStartHistoryRecord])
>>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>>                   val handHistoryInfo: HandHistoryInfo = 
>>> _setUpHandHistoryInfo(record)
>>>                   if (LOG.isInfoEnabled())
>>>                       LOG.info("hand start {}", if (handHistoryInfo != 
>>> null) handHistoryInfo.handHistoryId else "NULL”)
>>>                     ….
>>>                   playerState.update(players)
>>>                   handState.update(handHistoryInfo)
>>>               case "HoleCardHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ...
>>>                        playerState.update(players)
>>>                   } else LOG.warn("there is no player[hole card]. {}", 
>>> in._4)
>>>               case "PlayerStateHistory" =>
>>>                   val players = playerState.value()
>>>                   if (players != null) {
>>>                      ….
>>>                       playerState.update(players)
>>>                   } else LOG.warn("there is no player[player state]. {}", 
>>> in._4)
>>>               case "CommCardHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = 
>>> commCardState.value()
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                       handState.update(handHistoryInfo)
>>>                       commCardState.update(commCardHistory)
>>>                   } else LOG.warn("there is no handhistory info[comm card]. 
>>> {}", in._4)
>>>               case "PlayerActionHistory" =>
>>>                   val handHistoryInfo = handState.value()
>>>                   val players = playerState.value()
>>> 
>>>                   if (handHistoryInfo != null) {
>>>                      ...
>>>                   } else LOG.warn("there is no handhistory info[player 
>>> action]. {}", in._4)
>>>               case "PotHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                   val commCardHistory: CommCardHistory = 
>>> commCardState.value()
>>>                   if (handHistoryInfo != null && handHistoryInfo.playType 
>>> == PlayType.Cash && players != null && players.size > 1) {
>>>                       ...
>>>                   } else LOG.warn("there is no handhistory info[pot]. {}", 
>>> in._4)
>>>               case "GameEndHistory" =>
>>>                   val players = playerState.value()
>>>                   val handHistoryInfo = handState.value()
>>>                      ...
>>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}", 
>>> record.getHandHistoryId)
>>>                   playerState.clear()
>>>                   handState.clear()
>>>               case _ =>
>>>           }
>>>       }
>>> 
>>> —— log ——
>>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, 
>>> Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) 
>>> (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 
>>> 5769392597641628595
>>> 
>>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, 
>>> Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) 
>>> (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no 
>>> handhistory info[pot].
>>> 
>>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote:
>>>> 
>>>> What do you mean with lost exactly?
>>>> 
>>>> You call value() and it returns a value (!= null/defaultValue) and you
>>>> call it again and it returns null/defaultValue for the same key with
>>>> no update in between?
>>>> 
>>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>>> <k.klou...@data-artisans.com> wrote:
>>>>> Hello,
>>>>> 
>>>>> Could you share the code of the job you are running?
>>>>> With only this information I am afraid we cannot help much.
>>>>> 
>>>>> Thanks,
>>>>> Kostas
>>>>> 
>>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi.
>>>>>> I’m using flink 1.0.3 on aws EMR.
>>>>>> sporadically value of ValueState is lost.
>>>>>> what is starting point for solving this problem.
>>>>>> Thank you.
>>>>> 
>>> 
>> 

Reply via email to