Hi.
I use ingestion time.
I didn’t use timing window.
I've used a GlobalWindow with custom Trigger as below.
My apply() logic is same as before and no complaint.
Thanks.
class HandTrigger extends Trigger[(String, String, String, String, Long),
GlobalWindow] {
override def onElement(t: (String, String, String, String, Long),
timestamp: Long, w: GlobalWindow,
triggerContext: TriggerContext): TriggerResult =
{
if (t._2 == "GameEndHistory") TriggerResult.FIRE_AND_PURGE
else TriggerResult.CONTINUE
}
override def onProcessingTime(timestamp: Long, w: GlobalWindow,
triggerContext: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onEventTime(timestamp: Long, w: GlobalWindow,
triggerContext: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
}
> On Aug 16, 2016, at 12:48 AM, Stephan Ewen <[email protected]> wrote:
>
> Hi!
>
> Concerning your latest questions
>
> - There should not be multiple threads accessing the same state.
> - With "using a regular Java Map" I mean keeping everything as it is,
> except instead of using "ValueState" in the RichFlatMapFunction, you use a
> java.util.HashMap
> - If the program works within windows, it could be that events arrive out
> of order (are you using Event Time here?)
>
> Greetings,
> Stephan
>
>
>
> On Mon, Aug 15, 2016 at 9:56 AM, Dong-iL, Kim <[email protected]
> <mailto:[email protected]>> wrote:
> Hi.
> I've tested the program with window function(keyBy->window->collect). it has
> no problem.
>
> my old program. (keyBy-> state processing). can it be processed by multiple
> thread within a key?
>
> Thank you.
>
>> On Aug 12, 2016, at 8:27 PM, Stephan Ewen <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi!
>>
>> So far we are not aware of a state loss bug in Flink. My guess is that it is
>> some subtlety in the program.
>>
>> The check that logs also has other checks, like "handHistoryInfo.playType ==
>> PlayType.Cash" and "players.size > 1". Is one of them maybe the problem?
>>
>>
>> To debug this, you can try and do the following:
>>
>> Rather than using Flink's key/value state, simply use your own java/scala
>> map in the RichFlatMapFunction.
>> That is not by default fault-tolerant, but you can use that to see if the
>> error occurs in the same way or not.
>>
>> Greetings,
>> Stephan
>>
>>
>>
>>
>> On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <[email protected]
>> <mailto:[email protected]>> wrote:
>> Hi.
>> I checked order of data. but it is alright.
>> Is there any other possibilities?
>> Thank you.
>>
>>> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Hi!
>>>
>>> Its not that easy to say at a first glance.
>>>
>>> One thing that is important to bear in mind is what ordering guarantees
>>> Flink gives, and where the ordering guarantees are not given.
>>> When you use keyBy() or redistribute(), order is preserved per parallel
>>> source/target pair only.
>>>
>>> Have a look here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows
>>>
>>> <https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows>
>>>
>>>
>>> Could it be that the events simply arrive in a different order in the
>>> functions, so that a later event that looks for state comes before an
>>> earlier event that creates the state?
>>>
>>> Greetings,
>>> Stephan
>>>
>>> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <[email protected]
>>> <mailto:[email protected]>> wrote:
>>> Nope.
>>> I added log in End.
>>> but there is same log.
>>> is there any fault in my code?
>>>
>>> thank you.
>>>
>>>
>>> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <[email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >
>>> > You're clearing the "handState" on "GameEndHistory". I'm assuming this
>>> > event comes in before "CommCardHistory" where you check the state.
>>> >
>>> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <[email protected]
>>> > <mailto:[email protected]>> wrote:
>>> >> in my code, is the config of ExecutionEnv alright?
>>> >>
>>> >>
>>> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <[email protected]
>>> >>> <mailto:[email protected]>> wrote:
>>> >>>
>>> >>>
>>> >>> my code and log is as below.
>>> >>>
>>> >>>
>>> >>> val getExecuteEnv: StreamExecutionEnvironment = {
>>> >>> val env =
>>> >>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
>>> >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>> >>>
>>> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>> >>> env.getCheckpointConfig.setCheckpointTimeout(60000)
>>> >>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>> >>> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30,
>>> >>> 30000))
>>> >>> env
>>> >>> }
>>> >>>
>>> >>> def transform(target: DataStream[(String, String, String, String,
>>> >>> Long)]): DataStream[WinLossBase] =
>>> >>> target.keyBy(_._3).flatMap(new StateOperator)
>>> >>>
>>> >>> def main(args: Array[String]) {
>>> >>> val env = getExecuteEnv
>>> >>> val source: DataStream[String] =
>>> >>> extractFromKafka(env).name("KafkaSource")
>>> >>> val json = deserializeToJsonObj(source).name("ConvertToJson")
>>> >>> val target: DataStream[(String, String, String, String, Long)] =
>>> >>> preTransform(json)
>>> >>> val result: DataStream[WinLossBase] =
>>> >>> transform(target).name("ToKeyedStream”)
>>> >>> …
>>> >>> }
>>> >>>
>>> >>> class StateOperator extends RichFlatMapFunction[(String, String,
>>> >>> String, String, Long), WinLossBase] {
>>> >>> var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>> >>> var handState: ValueState[HandHistoryInfo] = _
>>> >>>
>>> >>> override def open(param: Configuration): Unit = {
>>> >>> val playerValueStateDescriptor = new
>>> >>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>> >>> classOf[util.Map[String, PotPlayer]],
>>> >>> Maps.newHashMap[String, PotPlayer]())
>>> >>> playerState =
>>> >>> getRuntimeContext.getState(playerValueStateDescriptor)
>>> >>> handState = getRuntimeContext.getState(new
>>> >>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>> >>> }
>>> >>>
>>> >>> override def flatMap(in: (String, String, String, String, Long),
>>> >>> out: Collector[WinLossBase]): Unit = {
>>> >>> in._2 match {
>>> >>> case "GameStartHistory" =>
>>> >>> val players = playerState.value()
>>> >>> val obj = _convertJsonToRecord(in._4,
>>> >>> classOf[GameStartHistoryRecord])
>>> >>> val record = obj.asInstanceOf[GameStartHistoryRecord]
>>> >>> val handHistoryInfo: HandHistoryInfo =
>>> >>> _setUpHandHistoryInfo(record)
>>> >>> if (LOG.isInfoEnabled())
>>> >>> LOG.info <http://log.info/>("hand start {}", if
>>> >>> (handHistoryInfo != null) handHistoryInfo.handHistoryId else "NULL”)
>>> >>> ….
>>> >>> playerState.update(players)
>>> >>> handState.update(handHistoryInfo)
>>> >>> case "HoleCardHistory" =>
>>> >>> val players = playerState.value()
>>> >>> if (players != null) {
>>> >>> ...
>>> >>> playerState.update(players)
>>> >>> } else LOG.warn("there is no player[hole card]. {}",
>>> >>> in._4)
>>> >>> case "PlayerStateHistory" =>
>>> >>> val players = playerState.value()
>>> >>> if (players != null) {
>>> >>> ….
>>> >>> playerState.update(players)
>>> >>> } else LOG.warn("there is no player[player state].
>>> >>> {}", in._4)
>>> >>> case "CommCardHistory" =>
>>> >>> val handHistoryInfo = handState.value()
>>> >>> val commCardHistory: CommCardHistory =
>>> >>> commCardState.value()
>>> >>> if (handHistoryInfo != null) {
>>> >>> ...
>>> >>> handState.update(handHistoryInfo)
>>> >>> commCardState.update(commCardHistory)
>>> >>> } else LOG.warn("there is no handhistory info[comm
>>> >>> card]. {}", in._4)
>>> >>> case "PlayerActionHistory" =>
>>> >>> val handHistoryInfo = handState.value()
>>> >>> val players = playerState.value()
>>> >>>
>>> >>> if (handHistoryInfo != null) {
>>> >>> ...
>>> >>> } else LOG.warn("there is no handhistory info[player
>>> >>> action]. {}", in._4)
>>> >>> case "PotHistory" =>
>>> >>> val players = playerState.value()
>>> >>> val handHistoryInfo = handState.value()
>>> >>> val commCardHistory: CommCardHistory =
>>> >>> commCardState.value()
>>> >>> if (handHistoryInfo != null &&
>>> >>> handHistoryInfo.playType == PlayType.Cash && players != null &&
>>> >>> players.size > 1) {
>>> >>> ...
>>> >>> } else LOG.warn("there is no handhistory info[pot].
>>> >>> {}", in._4)
>>> >>> case "GameEndHistory" =>
>>> >>> val players = playerState.value()
>>> >>> val handHistoryInfo = handState.value()
>>> >>> ...
>>> >>> if (LOG.isTraceEnabled()) LOG.trace("end {}",
>>> >>> record.getHandHistoryId)
>>> >>> playerState.clear()
>>> >>> handState.clear()
>>> >>> case _ =>
>>> >>> }
>>> >>> }
>>> >>>
>>> >>> —— log ——
>>> >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase,
>>> >>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent
>>> >>> to HBase) (3/4)] INFO com.nsuslab.denma.stream.winloss.flow.Main$ -
>>> >>> hand start 5769392597641628595
>>> >>>
>>> >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase,
>>> >>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent
>>> >>> to HBase) (3/4)] WARN com.nsuslab.denma.stream.winloss.flow.Main$ -
>>> >>> there is no handhistory info[pot].
>>> >>>
>>> >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <[email protected]
>>> >>>> <mailto:[email protected]>> wrote:
>>> >>>>
>>> >>>> What do you mean with lost exactly?
>>> >>>>
>>> >>>> You call value() and it returns a value (!= null/defaultValue) and you
>>> >>>> call it again and it returns null/defaultValue for the same key with
>>> >>>> no update in between?
>>> >>>>
>>> >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
>>> >>>> <[email protected] <mailto:[email protected]>>
>>> >>>> wrote:
>>> >>>>> Hello,
>>> >>>>>
>>> >>>>> Could you share the code of the job you are running?
>>> >>>>> With only this information I am afraid we cannot help much.
>>> >>>>>
>>> >>>>> Thanks,
>>> >>>>> Kostas
>>> >>>>>
>>> >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <[email protected]
>>> >>>>>> <mailto:[email protected]>> wrote:
>>> >>>>>>
>>> >>>>>> Hi.
>>> >>>>>> I’m using flink 1.0.3 on aws EMR.
>>> >>>>>> sporadically value of ValueState is lost.
>>> >>>>>> what is starting point for solving this problem.
>>> >>>>>> Thank you.
>>> >>>>>
>>> >>>
>>> >>
>>>
>>>
>>
>>
>
>