in my code, is the config of ExecutionEnv alright?
> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote: > > > my code and log is as below. > > > val getExecuteEnv: StreamExecutionEnvironment = { > val env = > StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000) > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) > > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) > env.getCheckpointConfig.setCheckpointTimeout(60000) > env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 30000)) > env > } > > def transform(target: DataStream[(String, String, String, String, Long)]): > DataStream[WinLossBase] = > target.keyBy(_._3).flatMap(new StateOperator) > > def main(args: Array[String]) { > val env = getExecuteEnv > val source: DataStream[String] = > extractFromKafka(env).name("KafkaSource") > val json = deserializeToJsonObj(source).name("ConvertToJson") > val target: DataStream[(String, String, String, String, Long)] = > preTransform(json) > val result: DataStream[WinLossBase] = > transform(target).name("ToKeyedStream”) > … > } > > class StateOperator extends RichFlatMapFunction[(String, String, String, > String, Long), WinLossBase] { > var playerState: ValueState[util.Map[String, PotPlayer]] = _ > var handState: ValueState[HandHistoryInfo] = _ > > override def open(param: Configuration): Unit = { > val playerValueStateDescriptor = new > ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss", > classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, > PotPlayer]()) > playerState = > getRuntimeContext.getState(playerValueStateDescriptor) > handState = getRuntimeContext.getState(new > ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null)) > } > > override def flatMap(in: (String, String, String, String, Long), out: > Collector[WinLossBase]): Unit = { > in._2 match { > case "GameStartHistory" => > val players = playerState.value() > val obj = _convertJsonToRecord(in._4, > classOf[GameStartHistoryRecord]) > val record = obj.asInstanceOf[GameStartHistoryRecord] > val handHistoryInfo: HandHistoryInfo = > _setUpHandHistoryInfo(record) > if (LOG.isInfoEnabled()) > LOG.info("hand start {}", if (handHistoryInfo != null) > handHistoryInfo.handHistoryId else "NULL”) > …. > playerState.update(players) > handState.update(handHistoryInfo) > case "HoleCardHistory" => > val players = playerState.value() > if (players != null) { > ... > playerState.update(players) > } else LOG.warn("there is no player[hole card]. {}", in._4) > case "PlayerStateHistory" => > val players = playerState.value() > if (players != null) { > …. > playerState.update(players) > } else LOG.warn("there is no player[player state]. {}", > in._4) > case "CommCardHistory" => > val handHistoryInfo = handState.value() > val commCardHistory: CommCardHistory = > commCardState.value() > if (handHistoryInfo != null) { > ... > handState.update(handHistoryInfo) > commCardState.update(commCardHistory) > } else LOG.warn("there is no handhistory info[comm card]. > {}", in._4) > case "PlayerActionHistory" => > val handHistoryInfo = handState.value() > val players = playerState.value() > > if (handHistoryInfo != null) { > ... > } else LOG.warn("there is no handhistory info[player > action]. {}", in._4) > case "PotHistory" => > val players = playerState.value() > val handHistoryInfo = handState.value() > val commCardHistory: CommCardHistory = > commCardState.value() > if (handHistoryInfo != null && handHistoryInfo.playType == > PlayType.Cash && players != null && players.size > 1) { > ... > } else LOG.warn("there is no handhistory info[pot]. {}", > in._4) > case "GameEndHistory" => > val players = playerState.value() > val handHistoryInfo = handState.value() > ... > if (LOG.isTraceEnabled()) LOG.trace("end {}", > record.getHandHistoryId) > playerState.clear() > handState.clear() > case _ => > } > } > > —— log —— > 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, > Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) > (3/4)] INFO com.nsuslab.denma.stream.winloss.flow.Main$ - hand start > 5769392597641628595 > > 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, > Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) > (3/4)] WARN com.nsuslab.denma.stream.winloss.flow.Main$ - there is no > handhistory info[pot]. > >> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote: >> >> What do you mean with lost exactly? >> >> You call value() and it returns a value (!= null/defaultValue) and you >> call it again and it returns null/defaultValue for the same key with >> no update in between? >> >> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas >> <k.klou...@data-artisans.com> wrote: >>> Hello, >>> >>> Could you share the code of the job you are running? >>> With only this information I am afraid we cannot help much. >>> >>> Thanks, >>> Kostas >>> >>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote: >>>> >>>> Hi. >>>> I’m using flink 1.0.3 on aws EMR. >>>> sporadically value of ValueState is lost. >>>> what is starting point for solving this problem. >>>> Thank you. >>> >