Re: assignTimestamp after keyBy

2016-09-08 Thread Dong-iL, Kim
I wanna assign timestamp after keyBy.
because the stream does not aligned before keyBy.
I’ve already tested as like your code.
It occured many warnings that timestamp monotony violated.

> On Sep 8, 2016, at 4:32 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
> 
> Thanks for replying. pushpendra.
> The assignTimestamp method returns not KeyedStream but DataStream.
> so I cannot use windowing.
> is it possible casting to KeyedStream?
> Regards
> 
>> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal 
>> <pushpendra.jaiswa...@gmail.com> wrote:
>> 
>> Please refer
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
>> for assigning timestamps.
>> 
>> You can do map after keyby to assign timestamps
>> 
>> e.g:
>> 
>> val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
>>   .filter( _.severity == WARNING )
>>   .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
>> 
>> withTimestampsAndWatermarks
>>   .keyBy( _.getGroup )
>>   .timeWindow(Time.seconds(10))
>>   .reduce( (a, b) => a.add(b) )
>>   .addSink(...
>> 
>> ~Pushpendra
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com.
> 



Re: assignTimestamp after keyBy

2016-09-08 Thread Dong-iL, Kim
Thanks for replying. pushpendra.
The assignTimestamp method returns not KeyedStream but DataStream.
so I cannot use windowing.
is it possible casting to KeyedStream?
Regards

> On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal 
>  wrote:
> 
> Please refer
> https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html
> for assigning timestamps.
> 
> You can do map after keyby to assign timestamps
> 
> e.g:
> 
> val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
>.filter( _.severity == WARNING )
>.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
> 
> withTimestampsAndWatermarks
>.keyBy( _.getGroup )
>.timeWindow(Time.seconds(10))
>.reduce( (a, b) => a.add(b) )
>.addSink(...
> 
> ~Pushpendra
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-after-keyBy-tp8962p8964.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



checkpoints not removed on hdfs.

2016-09-02 Thread Dong-iL, Kim
Hi,

I’m using HDFS as state backend.
The checkpoints folder grows bigger every moments.
What shall I do?

Regards.

Re: ValueState is missing

2016-08-15 Thread Dong-iL, Kim
Hi.
I use ingestion time.
I didn’t use timing window. 
I've used a GlobalWindow with custom Trigger as below.
My apply() logic is same as before and no complaint.
Thanks.

class HandTrigger extends Trigger[(String, String, String, String, Long), 
GlobalWindow] {
override def onElement(t: (String, String, String, String, Long), 
timestamp: Long, w: GlobalWindow,
   triggerContext: TriggerContext): TriggerResult = 
{
if (t._2 == "GameEndHistory") TriggerResult.FIRE_AND_PURGE
else TriggerResult.CONTINUE
}

override def onProcessingTime(timestamp: Long, w: GlobalWindow, 
triggerContext: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}

override def onEventTime(timestamp: Long, w: GlobalWindow, 
triggerContext: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
}

> On Aug 16, 2016, at 12:48 AM, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> Concerning your latest questions
> 
>   - There should not be multiple threads accessing the same state.
>   - With "using a regular Java Map" I mean keeping everything as it is, 
> except instead of using "ValueState" in the RichFlatMapFunction, you use a 
> java.util.HashMap
>   - If the program works within windows, it could be that events arrive out 
> of order (are you using Event Time here?)
> 
> Greetings,
> Stephan
> 
> 
> 
> On Mon, Aug 15, 2016 at 9:56 AM, Dong-iL, Kim <kim.s...@gmail.com 
> <mailto:kim.s...@gmail.com>> wrote:
> Hi.
> I've tested the program with window function(keyBy->window->collect). it has 
> no problem.
> 
> my old program. (keyBy-> state processing). can it be processed by multiple 
> thread within a key?
> 
> Thank you.
> 
>> On Aug 12, 2016, at 8:27 PM, Stephan Ewen <se...@apache.org 
>> <mailto:se...@apache.org>> wrote:
>> 
>> Hi!
>> 
>> So far we are not aware of a state loss bug in Flink. My guess is that it is 
>> some subtlety in the program.
>> 
>> The check that logs also has other checks, like "handHistoryInfo.playType == 
>> PlayType.Cash" and "players.size > 1". Is one of them maybe the problem?
>> 
>> 
>> To debug this, you can try and do the following:
>> 
>> Rather than using Flink's key/value state, simply use your own java/scala 
>> map in the RichFlatMapFunction.
>> That is not by default fault-tolerant, but you can use that to see if the 
>> error occurs in the same way or not.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> 
>> On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <kim.s...@gmail.com 
>> <mailto:kim.s...@gmail.com>> wrote:
>> Hi.
>> I checked order of data. but it is alright.
>> Is there any other possibilities?
>> Thank you.
>> 
>>> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org 
>>> <mailto:se...@apache.org>> wrote:
>>> 
>>> Hi!
>>> 
>>> Its not that easy to say at a first glance.
>>> 
>>> One thing that is important to bear in mind is what ordering guarantees 
>>> Flink gives, and where the ordering guarantees are not given.
>>> When you use keyBy() or redistribute(), order is preserved per parallel 
>>> source/target pair only.
>>> 
>>> Have a look here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows>
>>> 
>>> 
>>> Could it be that the events simply arrive in a different order in the 
>>> functions, so that a later event that looks for state comes before an 
>>> earlier event that creates the state?
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com 
>>> <mailto:kim.s...@gmail.com>> wrote:
>>> Nope.
>>> I added log in End.
>>> but there is same log.
>>> is there any fault in my code?
>>> 
>>> thank you.
>>> 
>>> 
>>> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org 
>>> > <mailto:m...@apache.org>> wrote:
>>> >
>>> > You're clearing the "handState" on "GameEndHistory". I'm assuming this
>>> > event comes in before "CommCardHistory" where you check the state.
>>> >
>>> &

Re: ValueState is missing

2016-08-15 Thread Dong-iL, Kim
Hi. Stephan.

do you mean using map on local excution?
I’ve tested it but not works at all.
Thanks.

> On Aug 15, 2016, at 4:56 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
> 
> Hi.
> I've tested the program with window function(keyBy->window->collect). it has 
> no problem.
> 
> my old program. (keyBy-> state processing). can it be processed by multiple 
> thread within a key?
> 
> Thank you.
> 
>> On Aug 12, 2016, at 8:27 PM, Stephan Ewen <se...@apache.org 
>> <mailto:se...@apache.org>> wrote:
>> 
>> Hi!
>> 
>> So far we are not aware of a state loss bug in Flink. My guess is that it is 
>> some subtlety in the program.
>> 
>> The check that logs also has other checks, like "handHistoryInfo.playType == 
>> PlayType.Cash" and "players.size > 1". Is one of them maybe the problem?
>> 
>> 
>> To debug this, you can try and do the following:
>> 
>> Rather than using Flink's key/value state, simply use your own java/scala 
>> map in the RichFlatMapFunction.
>> That is not by default fault-tolerant, but you can use that to see if the 
>> error occurs in the same way or not.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> 
>> On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <kim.s...@gmail.com 
>> <mailto:kim.s...@gmail.com>> wrote:
>> Hi.
>> I checked order of data. but it is alright.
>> Is there any other possibilities?
>> Thank you.
>> 
>>> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org 
>>> <mailto:se...@apache.org>> wrote:
>>> 
>>> Hi!
>>> 
>>> Its not that easy to say at a first glance.
>>> 
>>> One thing that is important to bear in mind is what ordering guarantees 
>>> Flink gives, and where the ordering guarantees are not given.
>>> When you use keyBy() or redistribute(), order is preserved per parallel 
>>> source/target pair only.
>>> 
>>> Have a look here:
>>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows>
>>> 
>>> 
>>> Could it be that the events simply arrive in a different order in the 
>>> functions, so that a later event that looks for state comes before an 
>>> earlier event that creates the state?
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com 
>>> <mailto:kim.s...@gmail.com>> wrote:
>>> Nope.
>>> I added log in End.
>>> but there is same log.
>>> is there any fault in my code?
>>> 
>>> thank you.
>>> 
>>> 
>>> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org 
>>> > <mailto:m...@apache.org>> wrote:
>>> >
>>> > You're clearing the "handState" on "GameEndHistory". I'm assuming this
>>> > event comes in before "CommCardHistory" where you check the state.
>>> >
>>> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com 
>>> > <mailto:kim.s...@gmail.com>> wrote:
>>> >> in my code, is the config of ExecutionEnv alright?
>>> >>
>>> >>
>>> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com 
>>> >>> <mailto:kim.s...@gmail.com>> wrote:
>>> >>>
>>> >>>
>>> >>> my code and log is as below.
>>> >>>
>>> >>>
>>> >>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>> >>>   val env = 
>>> >>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(1)
>>> >>>   env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>> >>>   
>>> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>> >>>   env.getCheckpointConfig.setCheckpointTimeout(6)
>>> >>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>> >>>   env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 
>>> >>> 3))
>>> >>>   env
>>> >>>   }
>>> >>>
>>> >>> def 

Re: ValueState is missing

2016-08-15 Thread Dong-iL, Kim
Hi.
I've tested the program with window function(keyBy->window->collect). it has no 
problem.

my old program. (keyBy-> state processing). can it be processed by multiple 
thread within a key?

Thank you.

> On Aug 12, 2016, at 8:27 PM, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> So far we are not aware of a state loss bug in Flink. My guess is that it is 
> some subtlety in the program.
> 
> The check that logs also has other checks, like "handHistoryInfo.playType == 
> PlayType.Cash" and "players.size > 1". Is one of them maybe the problem?
> 
> 
> To debug this, you can try and do the following:
> 
> Rather than using Flink's key/value state, simply use your own java/scala map 
> in the RichFlatMapFunction.
> That is not by default fault-tolerant, but you can use that to see if the 
> error occurs in the same way or not.
> 
> Greetings,
> Stephan
> 
> 
> 
> 
> On Fri, Aug 12, 2016 at 12:46 PM, Dong-iL, Kim <kim.s...@gmail.com 
> <mailto:kim.s...@gmail.com>> wrote:
> Hi.
> I checked order of data. but it is alright.
> Is there any other possibilities?
> Thank you.
> 
>> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org 
>> <mailto:se...@apache.org>> wrote:
>> 
>> Hi!
>> 
>> Its not that easy to say at a first glance.
>> 
>> One thing that is important to bear in mind is what ordering guarantees 
>> Flink gives, and where the ordering guarantees are not given.
>> When you use keyBy() or redistribute(), order is preserved per parallel 
>> source/target pair only.
>> 
>> Have a look here:
>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows>
>> 
>> 
>> Could it be that the events simply arrive in a different order in the 
>> functions, so that a later event that looks for state comes before an 
>> earlier event that creates the state?
>> 
>> Greetings,
>> Stephan
>> 
>> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com 
>> <mailto:kim.s...@gmail.com>> wrote:
>> Nope.
>> I added log in End.
>> but there is same log.
>> is there any fault in my code?
>> 
>> thank you.
>> 
>> 
>> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org 
>> > <mailto:m...@apache.org>> wrote:
>> >
>> > You're clearing the "handState" on "GameEndHistory". I'm assuming this
>> > event comes in before "CommCardHistory" where you check the state.
>> >
>> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com 
>> > <mailto:kim.s...@gmail.com>> wrote:
>> >> in my code, is the config of ExecutionEnv alright?
>> >>
>> >>
>> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com 
>> >>> <mailto:kim.s...@gmail.com>> wrote:
>> >>>
>> >>>
>> >>> my code and log is as below.
>> >>>
>> >>>
>> >>>   val getExecuteEnv: StreamExecutionEnvironment = {
>> >>>   val env = 
>> >>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(1)
>> >>>   env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>> >>>   
>> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>> >>>   env.getCheckpointConfig.setCheckpointTimeout(6)
>> >>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>> >>>   env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 
>> >>> 3))
>> >>>   env
>> >>>   }
>> >>>
>> >>> def transform(target: DataStream[(String, String, String, String, 
>> >>> Long)]): DataStream[WinLossBase] =
>> >>>   target.keyBy(_._3).flatMap(new StateOperator)
>> >>>
>> >>> def main(args: Array[String]) {
>> >>>   val env = getExecuteEnv
>> >>>   val source: DataStream[String] = 
>> >>> extractFromKafka(env).name("KafkaSource")
>> >>>   val json = deserializeToJsonObj(source).name("ConvertToJson")
>> >>>   val target: DataStream[(String, String, String, String, Long)] = 
>> >>> p

Re: ValueState is missing

2016-08-12 Thread Dong-iL, Kim
Hi.
I checked order of data. but it is alright.
Is there any other possibilities?
Thank you.

> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> Its not that easy to say at a first glance.
> 
> One thing that is important to bear in mind is what ordering guarantees Flink 
> gives, and where the ordering guarantees are not given.
> When you use keyBy() or redistribute(), order is preserved per parallel 
> source/target pair only.
> 
> Have a look here:
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows>
> 
> 
> Could it be that the events simply arrive in a different order in the 
> functions, so that a later event that looks for state comes before an earlier 
> event that creates the state?
> 
> Greetings,
> Stephan
> 
> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com 
> <mailto:kim.s...@gmail.com>> wrote:
> Nope.
> I added log in End.
> but there is same log.
> is there any fault in my code?
> 
> thank you.
> 
> 
> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org 
> > <mailto:m...@apache.org>> wrote:
> >
> > You're clearing the "handState" on "GameEndHistory". I'm assuming this
> > event comes in before "CommCardHistory" where you check the state.
> >
> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com 
> > <mailto:kim.s...@gmail.com>> wrote:
> >> in my code, is the config of ExecutionEnv alright?
> >>
> >>
> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com 
> >>> <mailto:kim.s...@gmail.com>> wrote:
> >>>
> >>>
> >>> my code and log is as below.
> >>>
> >>>
> >>>   val getExecuteEnv: StreamExecutionEnvironment = {
> >>>   val env = 
> >>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(1)
> >>>   env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
> >>>   
> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> >>>   env.getCheckpointConfig.setCheckpointTimeout(6)
> >>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> >>>   env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 
> >>> 3))
> >>>   env
> >>>   }
> >>>
> >>> def transform(target: DataStream[(String, String, String, String, 
> >>> Long)]): DataStream[WinLossBase] =
> >>>   target.keyBy(_._3).flatMap(new StateOperator)
> >>>
> >>> def main(args: Array[String]) {
> >>>   val env = getExecuteEnv
> >>>   val source: DataStream[String] = 
> >>> extractFromKafka(env).name("KafkaSource")
> >>>   val json = deserializeToJsonObj(source).name("ConvertToJson")
> >>>   val target: DataStream[(String, String, String, String, Long)] = 
> >>> preTransform(json)
> >>>   val result: DataStream[WinLossBase] = 
> >>> transform(target).name("ToKeyedStream”)
> >>> …
> >>> }
> >>>
> >>> class StateOperator extends RichFlatMapFunction[(String, String, String, 
> >>> String, Long), WinLossBase] {
> >>>   var playerState: ValueState[util.Map[String, PotPlayer]] = _
> >>>   var handState: ValueState[HandHistoryInfo] = _
> >>>
> >>>   override def open(param: Configuration): Unit = {
> >>>   val playerValueStateDescriptor = new 
> >>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
> >>>   classOf[util.Map[String, PotPlayer]], 
> >>> Maps.newHashMap[String, PotPlayer]())
> >>>   playerState = 
> >>> getRuntimeContext.getState(playerValueStateDescriptor)
> >>>   handState = getRuntimeContext.getState(new 
> >>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
> >>>   }
> >>>
> >>>   override def flatMap(in: (String, String, String, String, Long), 
> >>> out: Collector[WinLossBase]): Unit = {
> >>>   in._2 match {
> >>>   case "GameStartHistory" =>
> >>>   

Re: ValueState is missing

2016-08-12 Thread Dong-iL, Kim
Nope. 
I added log in End.
but there is same log.
is there any fault in my code?

thank you.


> On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org> wrote:
> 
> You're clearing the "handState" on "GameEndHistory". I'm assuming this
> event comes in before "CommCardHistory" where you check the state.
> 
> On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>> in my code, is the config of ExecutionEnv alright?
>> 
>> 
>>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>> 
>>> 
>>> my code and log is as below.
>>> 
>>> 
>>>   val getExecuteEnv: StreamExecutionEnvironment = {
>>>   val env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(1)
>>>   env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>   
>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>>>   env.getCheckpointConfig.setCheckpointTimeout(6)
>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>   env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 3))
>>>   env
>>>   }
>>> 
>>> def transform(target: DataStream[(String, String, String, String, Long)]): 
>>> DataStream[WinLossBase] =
>>>   target.keyBy(_._3).flatMap(new StateOperator)
>>> 
>>> def main(args: Array[String]) {
>>>   val env = getExecuteEnv
>>>   val source: DataStream[String] = 
>>> extractFromKafka(env).name("KafkaSource")
>>>   val json = deserializeToJsonObj(source).name("ConvertToJson")
>>>   val target: DataStream[(String, String, String, String, Long)] = 
>>> preTransform(json)
>>>   val result: DataStream[WinLossBase] = 
>>> transform(target).name("ToKeyedStream”)
>>> …
>>> }
>>> 
>>> class StateOperator extends RichFlatMapFunction[(String, String, String, 
>>> String, Long), WinLossBase] {
>>>   var playerState: ValueState[util.Map[String, PotPlayer]] = _
>>>   var handState: ValueState[HandHistoryInfo] = _
>>> 
>>>   override def open(param: Configuration): Unit = {
>>>   val playerValueStateDescriptor = new 
>>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>>>   classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, 
>>> PotPlayer]())
>>>   playerState = 
>>> getRuntimeContext.getState(playerValueStateDescriptor)
>>>   handState = getRuntimeContext.getState(new 
>>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>>>   }
>>> 
>>>   override def flatMap(in: (String, String, String, String, Long), out: 
>>> Collector[WinLossBase]): Unit = {
>>>   in._2 match {
>>>   case "GameStartHistory" =>
>>>   val players = playerState.value()
>>>   val obj = _convertJsonToRecord(in._4, 
>>> classOf[GameStartHistoryRecord])
>>>   val record = obj.asInstanceOf[GameStartHistoryRecord]
>>>   val handHistoryInfo: HandHistoryInfo = 
>>> _setUpHandHistoryInfo(record)
>>>   if (LOG.isInfoEnabled())
>>>   LOG.info("hand start {}", if (handHistoryInfo != 
>>> null) handHistoryInfo.handHistoryId else "NULL”)
>>> ….
>>>   playerState.update(players)
>>>   handState.update(handHistoryInfo)
>>>   case "HoleCardHistory" =>
>>>   val players = playerState.value()
>>>   if (players != null) {
>>>  ...
>>>playerState.update(players)
>>>   } else LOG.warn("there is no player[hole card]. {}", 
>>> in._4)
>>>   case "PlayerStateHistory" =>
>>>   val players = playerState.value()
>>>   if (players != null) {
>>>  ….
>>>   playerState.update(players)
>>>   } else LOG.warn("there is no player[player state]. {}", 
>>> in._4)
>>>   case "CommCardHistory" =>
>>>

Re: ValueState is missing

2016-08-11 Thread Dong-iL, Kim
in my code, is the config of ExecutionEnv alright?


> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
> 
> 
> my code and log is as below.
> 
> 
>val getExecuteEnv: StreamExecutionEnvironment = {
>val env = 
> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(1)
>env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
>env.getCheckpointConfig.setCheckpointTimeout(6)
>env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 3))
>env
>}
> 
> def transform(target: DataStream[(String, String, String, String, Long)]): 
> DataStream[WinLossBase] =
>target.keyBy(_._3).flatMap(new StateOperator)
> 
> def main(args: Array[String]) {
>val env = getExecuteEnv
>val source: DataStream[String] = 
> extractFromKafka(env).name("KafkaSource")
>val json = deserializeToJsonObj(source).name("ConvertToJson")
>val target: DataStream[(String, String, String, String, Long)] = 
> preTransform(json)
>val result: DataStream[WinLossBase] = 
> transform(target).name("ToKeyedStream”)
> …
> }
> 
> class StateOperator extends RichFlatMapFunction[(String, String, String, 
> String, Long), WinLossBase] {
>var playerState: ValueState[util.Map[String, PotPlayer]] = _
>var handState: ValueState[HandHistoryInfo] = _
> 
>override def open(param: Configuration): Unit = {
>val playerValueStateDescriptor = new 
> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
>classOf[util.Map[String, PotPlayer]], Maps.newHashMap[String, 
> PotPlayer]())
>playerState = 
> getRuntimeContext.getState(playerValueStateDescriptor)
>handState = getRuntimeContext.getState(new 
> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
>}
> 
>override def flatMap(in: (String, String, String, String, Long), out: 
> Collector[WinLossBase]): Unit = {
>in._2 match {
>case "GameStartHistory" =>
>val players = playerState.value()
>val obj = _convertJsonToRecord(in._4, 
> classOf[GameStartHistoryRecord])
>val record = obj.asInstanceOf[GameStartHistoryRecord]
>val handHistoryInfo: HandHistoryInfo = 
> _setUpHandHistoryInfo(record)
>if (LOG.isInfoEnabled())
>LOG.info("hand start {}", if (handHistoryInfo != null) 
> handHistoryInfo.handHistoryId else "NULL”)
>  ….
>playerState.update(players)
>handState.update(handHistoryInfo)
>case "HoleCardHistory" =>
>val players = playerState.value()
>if (players != null) {
>   ...
> playerState.update(players)
>} else LOG.warn("there is no player[hole card]. {}", in._4)
>case "PlayerStateHistory" =>
>val players = playerState.value()
>if (players != null) {
>   ….
>playerState.update(players)
>} else LOG.warn("there is no player[player state]. {}", 
> in._4)
>case "CommCardHistory" =>
>val handHistoryInfo = handState.value()
>val commCardHistory: CommCardHistory = 
> commCardState.value()
>if (handHistoryInfo != null) {
>   ...
>handState.update(handHistoryInfo)
>commCardState.update(commCardHistory)
>} else LOG.warn("there is no handhistory info[comm card]. 
> {}", in._4)
>case "PlayerActionHistory" =>
>val handHistoryInfo = handState.value()
>val players = playerState.value()
> 
>if (handHistoryInfo != null) {
>   ...
>} else LOG.warn("there is no handhistory info[player 
> action]. {}", in._4)
>case "PotHistory" =>
>val players = playerState.value()
>val handHistoryInfo = handState.value()
>

Re: specify user name when connecting to hdfs

2016-08-11 Thread Dong-iL, Kim
Hi.
In this case , I used standalone cluster(aws EC2) and I wanna connect to remote 
HDFS machine(aws EMR).
I register the location of core-site.xml as below.
does it need other properties?



fs.defaultFS
hdfs://…:8020


hadoop.security.authentication
simple


hadoop.security.key.provider.path
kms://:9700/kms


hadoop.job.ugi
hadoop


Thanks.

> On Aug 11, 2016, at 9:31 PM, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> Do you register the Hadoop Config at the Flink Configuration?
> Also, do you use Flink standalone or on Yarn?
> 
> Stephan
> 
> On Tue, Aug 9, 2016 at 11:00 AM, Dong-iL, Kim <kim.s...@gmail.com 
> <mailto:kim.s...@gmail.com>> wrote:
> Hi.
> I’m trying to set external hdfs as state backend.
> my os user name is ec2-user. hdfs user is hadoop.
> there is a permission denied exception.
> I wanna specify hdfs user name.
> I set hadoop.job.ugi in core-site.xml and HADOOP_USER_NAME on command line.
> but not works.
> what shall I do?
> thanks.
> 



Re: ValueState is missing

2016-08-11 Thread Dong-iL, Kim
t;, 
record.getHandHistoryId)
playerState.clear()
handState.clear()
case _ =>
}
}

—— log —— 
2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map 
-> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] 
INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand start 
5769392597641628595

2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, Map, Map 
-> Sink: winloss to HBase, Map -> Sink: winloss for opponent to HBase) (3/4)] 
WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there is no handhistory 
info[pot]. 

> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org> wrote:
> 
> What do you mean with lost exactly?
> 
> You call value() and it returns a value (!= null/defaultValue) and you
> call it again and it returns null/defaultValue for the same key with
> no update in between?
> 
> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
> <k.klou...@data-artisans.com> wrote:
>> Hello,
>> 
>> Could you share the code of the job you are running?
>> With only this information I am afraid we cannot help much.
>> 
>> Thanks,
>> Kostas
>> 
>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>> 
>>> Hi.
>>> I’m using flink 1.0.3 on aws EMR.
>>> sporadically value of ValueState is lost.
>>> what is starting point for solving this problem.
>>> Thank you.
>> 



ValueState is missing

2016-08-11 Thread Dong-iL, Kim
Hi.
I’m using flink 1.0.3 on aws EMR.
sporadically value of ValueState is lost.
what is starting point for solving this problem.
Thank you.

specify user name when connecting to hdfs

2016-08-09 Thread Dong-iL, Kim
Hi.
I’m trying to set external hdfs as state backend.
my os user name is ec2-user. hdfs user is hadoop.
there is a permission denied exception.
I wanna specify hdfs user name.
I set hadoop.job.ugi in core-site.xml and HADOOP_USER_NAME on command line.
but not works.
what shall I do?
thanks.

Re: Scala Table API with Java POJO

2016-08-01 Thread Dong-iL, Kim
in org.apache.flink.api.table.plan.PlanTranslator.

val inputType = set.getType().asInstanceOf[CompositeType[A]]

if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
  throw new ExpressionException(s"You cannot rename fields upon Table 
creation: " +
s"Field order of input type $inputType is not deterministic." )
}

when A is a PojoType, hasDeterministicFieldOrder always returns false.

what shall I do using Pojo.
Thanks.

> On Aug 1, 2016, at 6:11 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
> 
> I’ve tried like this, but not work.
> 
> dataSet.as(‘id as ‘id, ‘amount as ‘amount)
> 
> dataSet.as(‘id, ‘amount)
> 
> dataSet.as(“id, amount”)
> 
> thanks.
> 
>> On Aug 1, 2016, at 6:03 PM, Timo Walther <twal...@apache.org> wrote:
>> 
>> I think you need to use ".as()" instead of "toTable()" to supply the field 
>> order.
>> 
>> Am 01/08/16 um 10:56 schrieb Dong-iL, Kim:
>>> Hi Timo.
>>> I’m using scala API.
>>> There is no error with java API.
>>> my code snippet is this.
>>> 
>>> dataSet.toTable
>>>.groupBy(“id")
>>>.select(‘id, ‘amount.sum as ‘amount)
>>>.where(‘amount > 0)
>>>.toDataSet[TestPojo]
>>>.print()
>>> 
>>> Thanks.
>>> 
>>>> On Aug 1, 2016, at 5:50 PM, Timo Walther <twal...@apache.org> wrote:
>>>> 
>>>> Hi Kim,
>>>> 
>>>> as the exception says: POJOs have no deterministic field order. You have 
>>>> to specify the order during the DataSet to Table conversion:
>>>> 
>>>> Table table = tableEnv.fromDataSet(pojoDataSet, "pojoField as a, 
>>>> pojoField2 as b");
>>>> 
>>>> I hope that helps. Otherwise it would help if you could supply a code 
>>>> snippet of your program.
>>>> 
>>>> Timo
>>>> 
>>>> Am 01/08/16 um 10:19 schrieb Dong-iL, Kim:
>>>>> my flink ver is 1.0.3.
>>>>> thanks.
>>>>> 
>>>>>> On Aug 1, 2016, at 5:18 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>>>>> 
>>>>>> I’ve create a program using table API and get an exception like this.
>>>>>> org.apache.flink.api.table.ExpressionException: You cannot rename fields 
>>>>>> upon Table creation: Field order of input type PojoType<….> is not 
>>>>>> deterministic.
>>>>>> There is an error not in java program, but in scala program.
>>>>>> how can I use java POJO with scala Table API.
>>>>>> 
>>>> 
>>>> -- 
>>>> Freundliche Grüße / Kind Regards
>>>> 
>>>> Timo Walther
>>>> 
>>>> Follow me: @twalthr
>>>> https://www.linkedin.com/in/twalthr
>>>> 
>> 
>> 
>> -- 
>> Freundliche Grüße / Kind Regards
>> 
>> Timo Walther
>> 
>> Follow me: @twalthr
>> https://www.linkedin.com/in/twalthr
>> 
> 



Re: Scala Table API with Java POJO

2016-08-01 Thread Dong-iL, Kim
I’ve tried like this, but not work.

dataSet.as(‘id as ‘id, ‘amount as ‘amount)

dataSet.as(‘id, ‘amount)

dataSet.as(“id, amount”)

thanks.

> On Aug 1, 2016, at 6:03 PM, Timo Walther <twal...@apache.org> wrote:
> 
> I think you need to use ".as()" instead of "toTable()" to supply the field 
> order.
> 
> Am 01/08/16 um 10:56 schrieb Dong-iL, Kim:
>> Hi Timo.
>> I’m using scala API.
>> There is no error with java API.
>> my code snippet is this.
>> 
>> dataSet.toTable
>> .groupBy(“id")
>> .select(‘id, ‘amount.sum as ‘amount)
>> .where(‘amount > 0)
>> .toDataSet[TestPojo]
>> .print()
>> 
>> Thanks.
>> 
>>> On Aug 1, 2016, at 5:50 PM, Timo Walther <twal...@apache.org> wrote:
>>> 
>>> Hi Kim,
>>> 
>>> as the exception says: POJOs have no deterministic field order. You have to 
>>> specify the order during the DataSet to Table conversion:
>>> 
>>> Table table = tableEnv.fromDataSet(pojoDataSet, "pojoField as a, pojoField2 
>>> as b");
>>> 
>>> I hope that helps. Otherwise it would help if you could supply a code 
>>> snippet of your program.
>>> 
>>> Timo
>>> 
>>> Am 01/08/16 um 10:19 schrieb Dong-iL, Kim:
>>>> my flink ver is 1.0.3.
>>>> thanks.
>>>> 
>>>>> On Aug 1, 2016, at 5:18 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>>>> 
>>>>> I’ve create a program using table API and get an exception like this.
>>>>> org.apache.flink.api.table.ExpressionException: You cannot rename fields 
>>>>> upon Table creation: Field order of input type PojoType<….> is not 
>>>>> deterministic.
>>>>> There is an error not in java program, but in scala program.
>>>>> how can I use java POJO with scala Table API.
>>>>> 
>>> 
>>> -- 
>>> Freundliche Grüße / Kind Regards
>>> 
>>> Timo Walther
>>> 
>>> Follow me: @twalthr
>>> https://www.linkedin.com/in/twalthr
>>> 
> 
> 
> -- 
> Freundliche Grüße / Kind Regards
> 
> Timo Walther
> 
> Follow me: @twalthr
> https://www.linkedin.com/in/twalthr
> 



Re: Scala Table API with Java POJO

2016-08-01 Thread Dong-iL, Kim
Hi Timo.
I’m using scala API.
There is no error with java API.
my code snippet is this.

dataSet.toTable
.groupBy(“id")
.select(‘id, ‘amount.sum as ‘amount)
.where(‘amount > 0)
.toDataSet[TestPojo]
.print()

Thanks.

> On Aug 1, 2016, at 5:50 PM, Timo Walther <twal...@apache.org> wrote:
> 
> Hi Kim,
> 
> as the exception says: POJOs have no deterministic field order. You have to 
> specify the order during the DataSet to Table conversion:
> 
> Table table = tableEnv.fromDataSet(pojoDataSet, "pojoField as a, pojoField2 
> as b");
> 
> I hope that helps. Otherwise it would help if you could supply a code snippet 
> of your program.
> 
> Timo
> 
> Am 01/08/16 um 10:19 schrieb Dong-iL, Kim:
>> my flink ver is 1.0.3.
>> thanks.
>> 
>>> On Aug 1, 2016, at 5:18 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
>>> 
>>> I’ve create a program using table API and get an exception like this.
>>> org.apache.flink.api.table.ExpressionException: You cannot rename fields 
>>> upon Table creation: Field order of input type PojoType<….> is not 
>>> deterministic.
>>> There is an error not in java program, but in scala program.
>>> how can I use java POJO with scala Table API.
>>> 
> 
> 
> -- 
> Freundliche Grüße / Kind Regards
> 
> Timo Walther
> 
> Follow me: @twalthr
> https://www.linkedin.com/in/twalthr
> 



Re: Scala Table API with Java POJO

2016-08-01 Thread Dong-iL, Kim
my flink ver is 1.0.3.
thanks.

> On Aug 1, 2016, at 5:18 PM, Dong-iL, Kim <kim.s...@gmail.com> wrote:
> 
> I’ve create a program using table API and get an exception like this.
> org.apache.flink.api.table.ExpressionException: You cannot rename fields upon 
> Table creation: Field order of input type PojoType<….> is not deterministic.
> There is an error not in java program, but in scala program.
> how can I use java POJO with scala Table API.
> 



Scala Table API with Java POJO

2016-08-01 Thread Dong-iL, Kim
I’ve create a program using table API and get an exception like this.
org.apache.flink.api.table.ExpressionException: You cannot rename fields upon 
Table creation: Field order of input type PojoType<….> is not deterministic.
There is an error not in java program, but in scala program.
how can I use java POJO with scala Table API.



customize class path using on yarn

2016-07-24 Thread Dong-iL, Kim
I’m sorry. I’ve asked but there is no reply.
I wanna add config file to class path.(eg. hibernate.cfg.xml)
I’ve put it in FLINK_LIB_DIR. It was shipped but not in class path.
I’ve used the option(-C, —class path) of flink common but not work.
what shall I do?





Re: Variable not initialized in the open() method of RichMapFunction

2016-07-22 Thread Dong iL, Kim
declare objectMapper out of map class.

final ObjectMapper objectMapper = new ObjectMapper();

source.map(str -> objectMapper.readValue(value, Request.class));

On Sat, Jul 23, 2016 at 12:28 AM, Yassin Marzouki <yassmar...@gmail.com>
wrote:

> Thank you Stephan and Kim, that solved the problem.
> Just to make sure, is using a MapFunction as in the following code any
> different? i.e. does it initialize the objectMapper for every element in
> the stream?
>
> .map(new MapFunction<String, Request>() {
>
> private ObjectMapper objectMapper = new ObjectMapper();
>
> @Override
>  public Request map(String value) throws Exception {
>  return objectMapper.readValue(value, Request.class);
>         }
> })
>
> On Fri, Jul 22, 2016 at 5:20 PM, Dong iL, Kim <kim.s...@gmail.com> wrote:
>
>> oops. stephan already answered.
>> sorry. T^T
>>
>> On Sat, Jul 23, 2016 at 12:16 AM, Dong iL, Kim <kim.s...@gmail.com>
>> wrote:
>>
>>> is open method signature right? or typo?
>>>
>>> void open(Configuration parameters) throws Exception;
>>>
>>> On Sat, Jul 23, 2016 at 12:09 AM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> I think you overrode the open method with the wrong signature. The
>>>> right signature would be "open(Configuration cfg) {...}". You probably
>>>> overlooked this because you missed the "@Override" annotation.
>>>>
>>>> On Fri, Jul 22, 2016 at 4:49 PM, Yassin Marzouki <yassmar...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I want to convert a stream of json strings to POJOs using Jackson, so
>>>>> I did the following:
>>>>>
>>>>> .map(new RichMapFunction<String, Request>() {
>>>>>
>>>>> private ObjectMapper objectMapper;
>>>>>
>>>>> public void open() {
>>>>> objectMapper = new ObjectMapper();
>>>>> }
>>>>>
>>>>> @Override
>>>>>  public Request map(String value) throws Exception {
>>>>>  return objectMapper.readValue(value, Request.class);
>>>>> }
>>>>> })
>>>>>
>>>>> But this code gave me a NullPointerException because the objectMapper
>>>>> was not initialized successfully.
>>>>>
>>>>> 1. Isn't the open() method supposed to be called before map() and
>>>>> initialize objectMapper?
>>>>> 2. I figured out that initializing objectMapper before the open()
>>>>> method resolves the problem, and that it works also with a simple
>>>>> MapFunction. In that case, is there an advantage for using a
>>>>> RichMapFunction?
>>>>>
>>>>> Best,
>>>>> Yassine
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> http://www.kiva.org; TARGET="_top">
>>> http://www.kiva.org/images/bannerlong.png; WIDTH="460"
>>> HEIGHT="60" ALT="Kiva - loans that change lives" BORDER="0"
>>> ALIGN="BOTTOM">
>>>
>>
>>
>>
>> --
>> http://www.kiva.org; TARGET="_top">
>> http://www.kiva.org/images/bannerlong.png; WIDTH="460"
>> HEIGHT="60" ALT="Kiva - loans that change lives" BORDER="0"
>> ALIGN="BOTTOM">
>>
>
>


-- 
http://www.kiva.org; TARGET="_top">
http://www.kiva.org/images/bannerlong.png; WIDTH="460"
HEIGHT="60" ALT="Kiva - loans that change lives" BORDER="0"
ALIGN="BOTTOM">


Re: Variable not initialized in the open() method of RichMapFunction

2016-07-22 Thread Dong iL, Kim
oops. stephan already answered.
sorry. T^T

On Sat, Jul 23, 2016 at 12:16 AM, Dong iL, Kim <kim.s...@gmail.com> wrote:

> is open method signature right? or typo?
>
> void open(Configuration parameters) throws Exception;
>
> On Sat, Jul 23, 2016 at 12:09 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> I think you overrode the open method with the wrong signature. The right
>> signature would be "open(Configuration cfg) {...}". You probably overlooked
>> this because you missed the "@Override" annotation.
>>
>> On Fri, Jul 22, 2016 at 4:49 PM, Yassin Marzouki <yassmar...@gmail.com>
>> wrote:
>>
>>> Hi everyone,
>>>
>>> I want to convert a stream of json strings to POJOs using Jackson, so I
>>> did the following:
>>>
>>> .map(new RichMapFunction<String, Request>() {
>>>
>>> private ObjectMapper objectMapper;
>>>
>>> public void open() {
>>> objectMapper = new ObjectMapper();
>>> }
>>>
>>> @Override
>>>  public Request map(String value) throws Exception {
>>>  return objectMapper.readValue(value, Request.class);
>>> }
>>> })
>>>
>>> But this code gave me a NullPointerException because the objectMapper
>>> was not initialized successfully.
>>>
>>> 1. Isn't the open() method supposed to be called before map() and
>>> initialize objectMapper?
>>> 2. I figured out that initializing objectMapper before the open() method
>>> resolves the problem, and that it works also with a simple MapFunction. In
>>> that case, is there an advantage for using a RichMapFunction?
>>>
>>> Best,
>>> Yassine
>>>
>>
>>
>
>
> --
> http://www.kiva.org; TARGET="_top">
> http://www.kiva.org/images/bannerlong.png; WIDTH="460"
> HEIGHT="60" ALT="Kiva - loans that change lives" BORDER="0"
> ALIGN="BOTTOM">
>



-- 
http://www.kiva.org; TARGET="_top">
http://www.kiva.org/images/bannerlong.png; WIDTH="460"
HEIGHT="60" ALT="Kiva - loans that change lives" BORDER="0"
ALIGN="BOTTOM">


Re: Variable not initialized in the open() method of RichMapFunction

2016-07-22 Thread Dong iL, Kim
is open method signature right? or typo?

void open(Configuration parameters) throws Exception;

On Sat, Jul 23, 2016 at 12:09 AM, Stephan Ewen  wrote:

> I think you overrode the open method with the wrong signature. The right
> signature would be "open(Configuration cfg) {...}". You probably overlooked
> this because you missed the "@Override" annotation.
>
> On Fri, Jul 22, 2016 at 4:49 PM, Yassin Marzouki 
> wrote:
>
>> Hi everyone,
>>
>> I want to convert a stream of json strings to POJOs using Jackson, so I
>> did the following:
>>
>> .map(new RichMapFunction() {
>>
>> private ObjectMapper objectMapper;
>>
>> public void open() {
>> objectMapper = new ObjectMapper();
>> }
>>
>> @Override
>>  public Request map(String value) throws Exception {
>>  return objectMapper.readValue(value, Request.class);
>> }
>> })
>>
>> But this code gave me a NullPointerException because the objectMapper was
>> not initialized successfully.
>>
>> 1. Isn't the open() method supposed to be called before map() and
>> initialize objectMapper?
>> 2. I figured out that initializing objectMapper before the open() method
>> resolves the problem, and that it works also with a simple MapFunction. In
>> that case, is there an advantage for using a RichMapFunction?
>>
>> Best,
>> Yassine
>>
>
>


-- 
http://www.kiva.org; TARGET="_top">
http://www.kiva.org/images/bannerlong.png; WIDTH="460"
HEIGHT="60" ALT="Kiva - loans that change lives" BORDER="0"
ALIGN="BOTTOM">


add FLINK_LIB_DIR to classpath on yarn

2016-07-21 Thread Dong iL, Kim
Hello.
I have a flink cluster on yarn.
I wanna add FLINK_LIB_DIR to classpath.
because hibernate.cfg.xml need to be on the classpath.
when i'm using stand alone cluster, just add FLINK_LIB_DIR to
FLINK_CLASSPATH.
but on yarn, Fixing config.sh, yarn-session.sh and flink-daemon.sh is not
working.

Best Regards,
Dong-iL, Kim.
-- 
http://www.kiva.org; TARGET="_top">
http://www.kiva.org/images/bannerlong.png; WIDTH="460"
HEIGHT="60" ALT="Kiva - loans that change lives" BORDER="0"
ALIGN="BOTTOM">