in my code, is the config of ExecutionEnv alright?

> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
> 
> 
> my code and log is as below.
> 
> 
>    val getExecuteEnv: StreamExecutionEnvironment = {
>        val env = 
> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>        
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>        env.getCheckpointConfig.setCheckpointTimeout(60000)
>        env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000))
>        env
>    }
> 
> def transform(target: DataStream[(String, String, String, String, Long)]): 
> DataStream[WinLossBase] =
>        target.keyBy(_._3).flatMap(new StateOperator)
> 
> def main(args: Array[String]) {
>        val env = getExecuteEnv
>        val source: DataStream[String] = 
> extractFromKafka(env).name("KafkaSource")
>        val json = deserializeToJsonObj(source).name("ConvertToJson")
>        val target: DataStream[(String, String, String, String, Long)] = 
> preTransform(json)
>        val result: DataStream[WinLossBase] = 
> transform(target).name("ToKeyedStream”)
> …
> }
> 
> class StateOperator extends RichFlatMapFunction[(String, String, String, 
> String, Long), WinLossBase] {
>        var playerState: ValueState[util.Map[String, PotPlayer]] = _
>        var handState: ValueState[HandHistoryInfo] = _
> 
>        override def open(param: Configuration): Unit = {
>            val playerValueStateDescriptor = new 
> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>                classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, 
> PotPlayer]())
>            playerState = 
> getRuntimeContext.getState(playerValueStateDescriptor)
>            handState = getRuntimeContext.getState(new 
> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>        }
> 
>        override def flatMap(in: (String, String, String, String, Long), out: 
> Collector[WinLossBase]): Unit = {
>            in._2 match {
>                case "GameStartHistory" =>
>                    val players = playerState.value()
>                    val obj = _convertJsonToRecord(in._4, 
> classOf[GameStartHistoryRecord])
>                    val record = obj.asInstanceOf[GameStartHistoryRecord]
>                    val handHistoryInfo: HandHistoryInfo = 
> _setUpHandHistoryInfo(record)
>                    if (LOG.isInfoEnabled())
>                        LOG.info("hand start {}", if (handHistoryInfo != null) 
> handHistoryInfo.handHistoryId else "NULL”)
>                      ….
>                    playerState.update(players)
>                    handState.update(handHistoryInfo)
>                case "HoleCardHistory" =>
>                    val players = playerState.value()
>                    if (players != null) {
>                       ...
>                         playerState.update(players)
>                    } else LOG.warn("there is no player[hole card]. {}", in._4)
>                case "PlayerStateHistory" =>
>                    val players = playerState.value()
>                    if (players != null) {
>                       ….
>                        playerState.update(players)
>                    } else LOG.warn("there is no player[player state]. {}", 
> in._4)
>                case "CommCardHistory" =>
>                    val handHistoryInfo = handState.value()
>                    val commCardHistory: CommCardHistory = 
> commCardState.value()
>                    if (handHistoryInfo != null) {
>                       ...
>                        handState.update(handHistoryInfo)
>                        commCardState.update(commCardHistory)
>                    } else LOG.warn("there is no handhistory info[comm card]. 
> {}", in._4)
>                case "PlayerActionHistory" =>
>                    val handHistoryInfo = handState.value()
>                    val players = playerState.value()
> 
>                    if (handHistoryInfo != null) {
>                       ...
>                    } else LOG.warn("there is no handhistory info[player 
> action]. {}", in._4)
>                case "PotHistory" =>
>                    val players = playerState.value()
>                    val handHistoryInfo = handState.value()
>                    val commCardHistory: CommCardHistory = 
> commCardState.value()
>                    if (handHistoryInfo != null && handHistoryInfo.playType == 
> PlayType.Cash && players != null && players.size > 1) {
>                        ...
>                    } else LOG.warn("there is no handhistory info[pot]. {}", 
> in._4)
>                case "GameEndHistory" =>
>                    val players = playerState.value()
>                    val handHistoryInfo = handState.value()
>                       ...
>                    if (LOG.isTraceEnabled()) LOG.trace("end {}", 
> record.getHandHistoryId)
>                    playerState.clear()
>                    handState.clear()
>                case _ =>
>            }
>        }
> 
> —— log —— 
> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, 
> Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) 
> (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 
> 5769392597641628595
> 
> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, 
> Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) 
> (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no 
> handhistory info[pot]. 
> 
>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote:
>> 
>> What do you mean with lost exactly?
>> 
>> You call value() and it returns a value (!= null/defaultValue) and you
>> call it again and it returns null/defaultValue for the same key with
>> no update in between?
>> 
>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>> <k.klou...@data-artisans.com> wrote:
>>> Hello,
>>> 
>>> Could you share the code of the job you are running?
>>> With only this information I am afraid we cannot help much.
>>> 
>>> Thanks,
>>> Kostas
>>> 
>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>>> 
>>>> Hi.
>>>> I’m using flink 1.0.3 on aws EMR.
>>>> sporadically value of ValueState is lost.
>>>> what is starting point for solving this problem.
>>>> Thank you.
>>> 
> 

Reply via email to