Hi. I've tested the program with window function(keyBy->window->collect). it has no problem.
my old program. (keyBy-> state processing). can it be processed by multiple thread within a key? Thank you. > On Aug 12, 2016, at 8:27 PM, Stephan Ewen <se...@apache.org> wrote: > > Hi! > > So far we are not aware of a state loss bug in Flink. My guess is that it is > some subtlety in the program. > > The check that logs also has other checks, like "handHistoryInfo.playType == > PlayType.Cash" and "players.size > 1". Is one of them maybe the problem? > > > To debug this, you can try and do the following: > > Rather than using Flink's key/value state, simply use your own java/scala map > in the RichFlatMapFunction. > That is not by default fault-tolerant, but you can use that to see if the > error occurs in the same way or not. > > Greetings, > Stephan > > > > > On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <kim.s...@gmail.com > <mailto:kim.s...@gmail.com>> wrote: > Hi. > I checked order of data. but it is alright. > Is there any other possibilities? > Thank you. > >> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org >> <mailto:se...@apache.org>> wrote: >> >> Hi! >> >> Its not that easy to say at a first glance. >> >> One thing that is important to bear in mind is what ordering guarantees >> Flink gives, and where the ordering guarantees are not given. >> When you use keyBy() or redistribute(), order is preserved per parallel >> source/target pair only. >> >> Have a look here: >> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows >> >> <https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows> >> >> >> Could it be that the events simply arrive in a different order in the >> functions, so that a later event that looks for state comes before an >> earlier event that creates the state? >> >> Greetings, >> Stephan >> >> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com >> <mailto:kim.s...@gmail.com>> wrote: >> Nope. >> I added log in End. >> but there is same log. >> is there any fault in my code? >> >> thank you. >> >> >> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org >> > <mailto:m...@apache.org>> wrote: >> > >> > You're clearing the "handState" on "GameEndHistory". I'm assuming this >> > event comes in before "CommCardHistory" where you check the state. >> > >> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com >> > <mailto:kim.s...@gmail.com>> wrote: >> >> in my code, is the config of ExecutionEnv alright? >> >> >> >> >> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com >> >>> <mailto:kim.s...@gmail.com>> wrote: >> >>> >> >>> >> >>> my code and log is as below. >> >>> >> >>> >> >>> val getExecuteEnv: StreamExecutionEnvironment = { >> >>> val env = >> >>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000) >> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) >> >>> >> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) >> >>> env.getCheckpointConfig.setCheckpointTimeout(60000) >> >>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) >> >>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, >> >>> 30000)) >> >>> env >> >>> } >> >>> >> >>> def transform(target: DataStream[(String, String, String, String, >> >>> Long)]): DataStream[WinLossBase] = >> >>> target.keyBy(_._3).flatMap(new StateOperator) >> >>> >> >>> def main(args: Array[String]) { >> >>> val env = getExecuteEnv >> >>> val source: DataStream[String] = >> >>> extractFromKafka(env).name("KafkaSource") >> >>> val json = deserializeToJsonObj(source).name("ConvertToJson") >> >>> val target: DataStream[(String, String, String, String, Long)] = >> >>> preTransform(json) >> >>> val result: DataStream[WinLossBase] = >> >>> transform(target).name("ToKeyedStream”) >> >>> … >> >>> } >> >>> >> >>> class StateOperator extends RichFlatMapFunction[(String, String, String, >> >>> String, Long), WinLossBase] { >> >>> var playerState: ValueState[util.Map[String, PotPlayer]] = _ >> >>> var handState: ValueState[HandHistoryInfo] = _ >> >>> >> >>> override def open(param: Configuration): Unit = { >> >>> val playerValueStateDescriptor = new >> >>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss", >> >>> classOf[util.Map[String, PotPlayer]], >> >>> Maps.newHashMap[String, PotPlayer]()) >> >>> playerState = >> >>> getRuntimeContext.getState(playerValueStateDescriptor) >> >>> handState = getRuntimeContext.getState(new >> >>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null)) >> >>> } >> >>> >> >>> override def flatMap(in: (String, String, String, String, Long), >> >>> out: Collector[WinLossBase]): Unit = { >> >>> in._2 match { >> >>> case "GameStartHistory" => >> >>> val players = playerState.value() >> >>> val obj = _convertJsonToRecord(in._4, >> >>> classOf[GameStartHistoryRecord]) >> >>> val record = obj.asInstanceOf[GameStartHistoryRecord] >> >>> val handHistoryInfo: HandHistoryInfo = >> >>> _setUpHandHistoryInfo(record) >> >>> if (LOG.isInfoEnabled()) >> >>> LOG.info <http://log.info/>("hand start {}", if >> >>> (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”) >> >>> …. >> >>> playerState.update(players) >> >>> handState.update(handHistoryInfo) >> >>> case "HoleCardHistory" => >> >>> val players = playerState.value() >> >>> if (players != null) { >> >>> ... >> >>> playerState.update(players) >> >>> } else LOG.warn("there is no player[hole card]. {}", >> >>> in._4) >> >>> case "PlayerStateHistory" => >> >>> val players = playerState.value() >> >>> if (players != null) { >> >>> …. >> >>> playerState.update(players) >> >>> } else LOG.warn("there is no player[player state]. >> >>> {}", in._4) >> >>> case "CommCardHistory" => >> >>> val handHistoryInfo = handState.value() >> >>> val commCardHistory: CommCardHistory = >> >>> commCardState.value() >> >>> if (handHistoryInfo != null) { >> >>> ... >> >>> handState.update(handHistoryInfo) >> >>> commCardState.update(commCardHistory) >> >>> } else LOG.warn("there is no handhistory info[comm >> >>> card]. {}", in._4) >> >>> case "PlayerActionHistory" => >> >>> val handHistoryInfo = handState.value() >> >>> val players = playerState.value() >> >>> >> >>> if (handHistoryInfo != null) { >> >>> ... >> >>> } else LOG.warn("there is no handhistory info[player >> >>> action]. {}", in._4) >> >>> case "PotHistory" => >> >>> val players = playerState.value() >> >>> val handHistoryInfo = handState.value() >> >>> val commCardHistory: CommCardHistory = >> >>> commCardState.value() >> >>> if (handHistoryInfo != null && >> >>> handHistoryInfo.playType == PlayType.Cash && players != null && >> >>> players.size > 1) { >> >>> ... >> >>> } else LOG.warn("there is no handhistory info[pot]. >> >>> {}", in._4) >> >>> case "GameEndHistory" => >> >>> val players = playerState.value() >> >>> val handHistoryInfo = handState.value() >> >>> ... >> >>> if (LOG.isTraceEnabled()) LOG.trace("end {}", >> >>> record.getHandHistoryId) >> >>> playerState.clear() >> >>> handState.clear() >> >>> case _ => >> >>> } >> >>> } >> >>> >> >>> —— log —— >> >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, >> >>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to >> >>> HBase) (3/4)] INFO com.nsuslab.denma.stream.winloss.flow.Main$ - hand >> >>> start 5769392597641628595 >> >>> >> >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, >> >>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to >> >>> HBase) (3/4)] WARN com.nsuslab.denma.stream.winloss.flow.Main$ - there >> >>> is no handhistory info[pot]. >> >>> >> >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org >> >>>> <mailto:u...@apache.org>> wrote: >> >>>> >> >>>> What do you mean with lost exactly? >> >>>> >> >>>> You call value() and it returns a value (!= null/defaultValue) and you >> >>>> call it again and it returns null/defaultValue for the same key with >> >>>> no update in between? >> >>>> >> >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas >> >>>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> >> >>>> wrote: >> >>>>> Hello, >> >>>>> >> >>>>> Could you share the code of the job you are running? >> >>>>> With only this information I am afraid we cannot help much. >> >>>>> >> >>>>> Thanks, >> >>>>> Kostas >> >>>>> >> >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com >> >>>>>> <mailto:kim.s...@gmail.com>> wrote: >> >>>>>> >> >>>>>> Hi. >> >>>>>> I’m using flink 1.0.3 on aws EMR. >> >>>>>> sporadically value of ValueState is lost. >> >>>>>> what is starting point for solving this problem. >> >>>>>> Thank you. >> >>>>> >> >>> >> >> >> >> > >