Hi!

Its not that easy to say at a first glance.

One thing that is important to bear in mind is what ordering guarantees
Flink gives, and where the ordering guarantees are not given.
When you use keyBy() or redistribute(), order is preserved per parallel
source/target pair only.

Have a look here:
https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows


Could it be that the events simply arrive in a different order in the
functions, so that a later event that looks for state comes before an
earlier event that creates the state?

Greetings,
Stephan

On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:

> Nope.
> I added log in End.
> but there is same log.
> is there any fault in my code?
>
> thank you.
>
>
> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org> wrote:
> >
> > You're clearing the "handState" on "GameEndHistory". I'm assuming this
> > event comes in before "CommCardHistory" where you check the state.
> >
> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com>
> wrote:
> >> in my code, is the config of ExecutionEnv alright?
> >>
> >>
> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
> >>>
> >>>
> >>> my code and log is as below.
> >>>
> >>>
> >>>   val getExecuteEnv: StreamExecutionEnvironment = {
> >>>       val env = StreamExecutionEnvironment.getExecutionEnvironment.
> enableCheckpointing(10000)
> >>>       env.setStreamTimeCharacteristic(TimeCharacteristic.
> IngestionTime)
> >>>       env.getCheckpointConfig.setCheckpointingMode(
> CheckpointingMode.AT_LEAST_ONCE)
> >>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
> >>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> >>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30,
> 30000))
> >>>       env
> >>>   }
> >>>
> >>> def transform(target: DataStream[(String, String, String, String,
> Long)]): DataStream[WinLossBase] =
> >>>       target.keyBy(_._3).flatMap(new StateOperator)
> >>>
> >>> def main(args: Array[String]) {
> >>>       val env = getExecuteEnv
> >>>       val source: DataStream[String] = extractFromKafka(env).name("
> KafkaSource")
> >>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
> >>>       val target: DataStream[(String, String, String, String, Long)] =
> preTransform(json)
> >>>       val result: DataStream[WinLossBase] = transform(target).name("
> ToKeyedStream”)
> >>> …
> >>> }
> >>>
> >>> class StateOperator extends RichFlatMapFunction[(String, String,
> String, String, Long), WinLossBase] {
> >>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
> >>>       var handState: ValueState[HandHistoryInfo] = _
> >>>
> >>>       override def open(param: Configuration): Unit = {
> >>>           val playerValueStateDescriptor = new
> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
> >>>               classOf[util.Map[String, PotPlayer]],
> Maps.newHashMap[String, PotPlayer]())
> >>>           playerState = getRuntimeContext.getState(
> playerValueStateDescriptor)
> >>>           handState = getRuntimeContext.getState(new
> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
> >>>       }
> >>>
> >>>       override def flatMap(in: (String, String, String, String, Long),
> out: Collector[WinLossBase]): Unit = {
> >>>           in._2 match {
> >>>               case "GameStartHistory" =>
> >>>                   val players = playerState.value()
> >>>                   val obj = _convertJsonToRecord(in._4, classOf[
> GameStartHistoryRecord])
> >>>                   val record = obj.asInstanceOf[
> GameStartHistoryRecord]
> >>>                   val handHistoryInfo: HandHistoryInfo =
> _setUpHandHistoryInfo(record)
> >>>                   if (LOG.isInfoEnabled())
> >>>                       LOG.info("hand start {}", if (handHistoryInfo !=
> null) handHistoryInfo.handHistoryId else "NULL”)
> >>>                     ….
> >>>                   playerState.update(players)
> >>>                   handState.update(handHistoryInfo)
> >>>               case "HoleCardHistory" =>
> >>>                   val players = playerState.value()
> >>>                   if (players != null) {
> >>>                      ...
> >>>                        playerState.update(players)
> >>>                   } else LOG.warn("there is no player[hole card]. {}",
> in._4)
> >>>               case "PlayerStateHistory" =>
> >>>                   val players = playerState.value()
> >>>                   if (players != null) {
> >>>                      ….
> >>>                       playerState.update(players)
> >>>                   } else LOG.warn("there is no player[player state].
> {}", in._4)
> >>>               case "CommCardHistory" =>
> >>>                   val handHistoryInfo = handState.value()
> >>>                   val commCardHistory: CommCardHistory =
> commCardState.value()
> >>>                   if (handHistoryInfo != null) {
> >>>                      ...
> >>>                       handState.update(handHistoryInfo)
> >>>                       commCardState.update(commCardHistory)
> >>>                   } else LOG.warn("there is no handhistory info[comm
> card]. {}", in._4)
> >>>               case "PlayerActionHistory" =>
> >>>                   val handHistoryInfo = handState.value()
> >>>                   val players = playerState.value()
> >>>
> >>>                   if (handHistoryInfo != null) {
> >>>                      ...
> >>>                   } else LOG.warn("there is no handhistory info[player
> action]. {}", in._4)
> >>>               case "PotHistory" =>
> >>>                   val players = playerState.value()
> >>>                   val handHistoryInfo = handState.value()
> >>>                   val commCardHistory: CommCardHistory =
> commCardState.value()
> >>>                   if (handHistoryInfo != null &&
> handHistoryInfo.playType == PlayType.Cash && players != null &&
> players.size > 1) {
> >>>                       ...
> >>>                   } else LOG.warn("there is no handhistory info[pot].
> {}", in._4)
> >>>               case "GameEndHistory" =>
> >>>                   val players = playerState.value()
> >>>                   val handHistoryInfo = handState.value()
> >>>                      ...
> >>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}",
> record.getHandHistoryId)
> >>>                   playerState.clear()
> >>>                   handState.clear()
> >>>               case _ =>
> >>>           }
> >>>       }
> >>>
> >>> —— log ——
> >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase,
> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to
> HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand
> start 5769392597641628595
> >>>
> >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase,
> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to
> HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there
> is no handhistory info[pot].
> >>>
> >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote:
> >>>>
> >>>> What do you mean with lost exactly?
> >>>>
> >>>> You call value() and it returns a value (!= null/defaultValue) and you
> >>>> call it again and it returns null/defaultValue for the same key with
> >>>> no update in between?
> >>>>
> >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
> >>>> <k.klou...@data-artisans.com> wrote:
> >>>>> Hello,
> >>>>>
> >>>>> Could you share the code of the job you are running?
> >>>>> With only this information I am afraid we cannot help much.
> >>>>>
> >>>>> Thanks,
> >>>>> Kostas
> >>>>>
> >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com>
> wrote:
> >>>>>>
> >>>>>> Hi.
> >>>>>> I’m using flink 1.0.3 on aws EMR.
> >>>>>> sporadically value of ValueState is lost.
> >>>>>> what is starting point for solving this problem.
> >>>>>> Thank you.
> >>>>>
> >>>
> >>
>
>

Reply via email to