Re: Bootstrapping multiple state within same operator

2023-03-24 Thread David Artiga
That sounds like a great hack :D
I'll give it a try for sure. Thank you!
/David

On Fri, Mar 24, 2023 at 5:25 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi David,
>
> … coming in late into this discussion
>
>
>
> We had a very similar problem and I found a simple way to implement
> priming savepoints with mixed keyed/operator state.
>
> The trick is this:
>
>- In your KeyedStateBootstrapFunction also implement
>CheckpointedFunction
>- In initializeState() you can initialize the broad state primitive
>(the code skeleton below uses getUnionListState, same principle)
>- Then in the processElement() function I process a tuple of state
>collections for each state primitive, i.e. event object per key
>- For the union list state I forge special key “broadcast”, and only
>the 3rd tuple vector contains anything,
>   - (the upstream operator feeding into this bootstrap function makes
>   sure only one event with “broadcast” key is generated)
>
>
>
> Peruse the code skeleton (scala) if you like (I removed some stuff I’m not
> supposed to disclose):
>
>
>
>
>
> /** Event type for state updates for savepoint generation. A 4-tuple of
>
> * 1) vector of valid [[CSC]] entries
>
> * 2) a vector of valid [[DAS]] entries
>
> * 3) a vector of valid broadcast FFR.
>
> * 4) a vector of timers for state cleanup */
>
> type DAFFS = (Vector[CSC], Vector[DAS], Vector[FFR], Vector[Long])
>
>
>
> /** [[StateUpdate]] type for [[DAFFS]] state along with the [[String]] key
> context. */
>
> type DAFFSHU = StateUpdate[String, DAFFS]
>
>
>
> class DAFFOperatorStateBootstrapFunction
>
>   extends KeyedStateBootstrapFunction[String, DAFFSHU]
>
> with CheckpointedFunction {
>
>
>
>   override def open(parameters: Configuration): Unit = {
>
> super.open(parameters)
>
> val rtc: RuntimeContext = getRuntimeContext
>
> //keyed state setup:
>
> // cSC = rtc.getListState(new ListStateDescriptor[CSC](...
>
> // dAS = rtc.getListState(new ListStateDescriptor[DAS](...
>
>   }
>
>
>
>   override def processElement(value: DAFFSHU, ctx:
> KeyedStateBootstrapFunction[String, DAFFSHU]#Context): Unit = {
>
>
>
> val daffs = value.state
>
> val ts = ctx.timerService()
>
>
>
> for (csc <- daffs._1) {
>
>   cSC.add(csc)
>
> }
>
> for (das <- daffs._2) {
>
>   dAS.add(das)
>
> }
>
> for (ffr <- daffs._3) {
>
>   fFRState.add(ffr)
>
> }
>
> for (timer <- daffs._4) {
>
>   ts.registerEventTimeTimer(timer)
>
> }
>
>
>
> val stop = 0
>
>   }
>
>
>
>   @transient var fFRState: ListState[FFR] = null
>
>
>
>   override def snapshotState(context: FunctionSnapshotContext): Unit = {
>
>   }
>
>
>
>   override def initializeState(context: FunctionInitializationContext):
> Unit = {
>
> val fFRStateDescriptor = new ListStateDescriptor[FFR]("ffr", ffrTI)
>
> fFRState =
> context.getOperatorStateStore.getUnionListState(fFRStateDescriptor)
>
>   }
>
> }
>
>
>
> Hope this helps …
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* David Artiga 
> *Sent:* Wednesday, March 22, 2023 11:31 AM
> *To:* Hang Ruan 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Bootstrapping multiple state within same operator
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Not familiar with the implementation but thinking some options:
>
> - composable transformations
> - underlying MultiMap
> - ...
>
>
>
> On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan  wrote:
>
> Hi, David,
>
> I also read the code about the `SavepointWriter#withOperator`. The
> transformations are stored in a `Map` whose key is `OperatorID`. I don't
> come up with a way that we could register multi transformations for one
> operator with the provided API.
>
>
>
> Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
> states at one time.
>
>
>
> Best,
>
> Hang
>
>
>
> David Artiga  于2023年3月22日周三 15:22写道:
>
> We are using state
> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
>  processor
> API
> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
>  to
> bootstrap the state of some operators. It has been working fine until now,
> when we tried to bootstrap an operator that has both a keyed 

RE: Bootstrapping multiple state within same operator

2023-03-24 Thread Schwalbe Matthias
Hi David,
… coming in late into this discussion

We had a very similar problem and I found a simple way to implement priming 
savepoints with mixed keyed/operator state.
The trick is this:

  *   In your KeyedStateBootstrapFunction also implement CheckpointedFunction
  *   In initializeState() you can initialize the broad state primitive (the 
code skeleton below uses getUnionListState, same principle)
  *   Then in the processElement() function I process a tuple of state 
collections for each state primitive, i.e. event object per key
  *   For the union list state I forge special key “broadcast”, and only the 
3rd tuple vector contains anything,
 *   (the upstream operator feeding into this bootstrap function makes sure 
only one event with “broadcast” key is generated)

Peruse the code skeleton (scala) if you like (I removed some stuff I’m not 
supposed to disclose):


/** Event type for state updates for savepoint generation. A 4-tuple of
* 1) vector of valid [[CSC]] entries
* 2) a vector of valid [[DAS]] entries
* 3) a vector of valid broadcast FFR.
* 4) a vector of timers for state cleanup */
type DAFFS = (Vector[CSC], Vector[DAS], Vector[FFR], Vector[Long])

/** [[StateUpdate]] type for [[DAFFS]] state along with the [[String]] key 
context. */
type DAFFSHU = StateUpdate[String, DAFFS]

class DAFFOperatorStateBootstrapFunction
  extends KeyedStateBootstrapFunction[String, DAFFSHU]
with CheckpointedFunction {

  override def open(parameters: Configuration): Unit = {
super.open(parameters)
val rtc: RuntimeContext = getRuntimeContext
//keyed state setup:
// cSC = rtc.getListState(new ListStateDescriptor[CSC](...
// dAS = rtc.getListState(new ListStateDescriptor[DAS](...
  }

  override def processElement(value: DAFFSHU, ctx: 
KeyedStateBootstrapFunction[String, DAFFSHU]#Context): Unit = {

val daffs = value.state
val ts = ctx.timerService()

for (csc <- daffs._1) {
  cSC.add(csc)
}
for (das <- daffs._2) {
  dAS.add(das)
}
for (ffr <- daffs._3) {
  fFRState.add(ffr)
}
for (timer <- daffs._4) {
  ts.registerEventTimeTimer(timer)
}

val stop = 0
  }

  @transient var fFRState: ListState[FFR] = null

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
val fFRStateDescriptor = new ListStateDescriptor[FFR]("ffr", ffrTI)
fFRState = 
context.getOperatorStateStore.getUnionListState(fFRStateDescriptor)
  }
}

Hope this helps …

Sincere greetings

Thias


From: David Artiga 
Sent: Wednesday, March 22, 2023 11:31 AM
To: Hang Ruan 
Cc: user@flink.apache.org
Subject: Re: Bootstrapping multiple state within same operator

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Not familiar with the implementation but thinking some options:

- composable transformations
- underlying MultiMap
- ...

On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan 
mailto:ruanhang1...@gmail.com>> wrote:
Hi, David,
I also read the code about the `SavepointWriter#withOperator`. The 
transformations are stored in a `Map` whose key is `OperatorID`. I don't come 
up with a way that we could register multi transformations for one operator 
with the provided API.

Maybe we need a new type of  `XXXStateBootstrapFunction` to change more states 
at one time.

Best,
Hang

David Artiga mailto:david.art...@gmail.com>> 
于2023年3月22日周三 15:22写道:
We are using 
state<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
 processor 
API<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
 to bootstrap the state of some operators. It has been working fine until now, 
when we tried to bootstrap an operator that has both a keyed state and a 
broadcasted state. Seems the API does not provide a convenient method to apply 
multiple transformations on the same uid...

Is there a way to do that and we just missed it? Any insights appreciated.

Cheers,
/David
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and

Re: Bootstrapping multiple state within same operator

2023-03-22 Thread David Artiga
Not familiar with the implementation but thinking some options:

- composable transformations
- underlying MultiMap
- ...

On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan  wrote:

> Hi, David,
> I also read the code about the `SavepointWriter#withOperator`. The
> transformations are stored in a `Map` whose key is `OperatorID`. I don't
> come up with a way that we could register multi transformations for one
> operator with the provided API.
>
> Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
> states at one time.
>
> Best,
> Hang
>
> David Artiga  于2023年3月22日周三 15:22写道:
>
>> We are using state
>> 
>>  processor
>> API
>> 
>>  to
>> bootstrap the state of some operators. It has been working fine until now,
>> when we tried to bootstrap an operator that has both a keyed state and a
>> broadcasted state. Seems the API does not provide a convenient method to
>> apply multiple transformations on the same *uid...*
>>
>> Is there a way to do that and we just missed it? Any insights appreciated.
>>
>> Cheers,
>> /David
>>
>


Re: Bootstrapping multiple state within same operator

2023-03-22 Thread Hang Ruan
Hi, David,
I also read the code about the `SavepointWriter#withOperator`. The
transformations are stored in a `Map` whose key is `OperatorID`. I don't
come up with a way that we could register multi transformations for one
operator with the provided API.

Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
states at one time.

Best,
Hang

David Artiga  于2023年3月22日周三 15:22写道:

> We are using state
> 
>  processor
> API
> 
>  to
> bootstrap the state of some operators. It has been working fine until now,
> when we tried to bootstrap an operator that has both a keyed state and a
> broadcasted state. Seems the API does not provide a convenient method to
> apply multiple transformations on the same *uid...*
>
> Is there a way to do that and we just missed it? Any insights appreciated.
>
> Cheers,
> /David
>


Re: Re: Bootstrapping the state

2018-07-23 Thread Fabian Hueske
Hi Henkka,

You might want to consider implementing a dedicated job for state
bootstrapping that uses the same operator UUID and state names. That might
be easier than integrating the logic into your regular job.

I think you have to use the monitoring file source because AFAIK it won't
be possible to start a savepoint once a task is finished, because Flink is
not able to inject a checkpoint / savepoint barrier into finished tasks.
Detecting that all data was read is of course tricky, but you could monitor
the processed records count metrics and take a save point once they don't
change anymore.

Best, Fabian

2018-07-23 8:24 GMT+02:00 Henri Heiskanen :

> Hi,
>
> With state bootstrapping I mean loading the state with initial data before
> starting the actual job. For example, in our case I would like to load
> information like registration date of our users (>5 years of data) so that
> I can enrich our event data in streaming (5 days retention).
>
> Before watching the presentation by Lyft, I was loading this data per key
> from Cassandra DB in the mapper if the state was not found.
>
> Br,
> Henkka
>
> Br,
> Henkka
>
> On Fri, Jul 20, 2018 at 7:03 PM Vino yang  wrote:
>
>> Hi Henkka,
>>
>> If you want to customize the datastream text source for your purpose. You
>> can use a read counter, if the value of counter would not change in a
>> interval you can guess all the data has been read. Just a idea, you can
>> choose other solution.
>>
>> About creating a savepoint automatically on job exists, it sounds a good
>> idea. I did not know any plan about this, I would try to submit this idea
>> to the community.
>>
>> And about "how to bootstrap a state", what does that mean? can you
>> explain this?
>>
>> Thank, vino
>>
>>
>> On 2018-07-20 20:00 , Henri Heiskanen  Wrote:
>>
>> Hi,
>>
>> Thanks. Just to clarify, where would you then invoke the savepoint
>> creation? I basically need to know when all data is read, create a
>> savepoint and then exit. I think I could just as well use the
>> PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to
>> cancel with savepoint.
>>
>> Any plans to have feature where I could choose Flink to make a savepoint
>> on job exists? I am also keen on hearing other ideas how to bootstrap a
>> state. I was initially thinking of just reading data from Cassandra if no
>> state available.
>>
>> Br,
>> Henkka
>>
>> On Thu, Jul 19, 2018 at 3:15 PM vino yang  wrote:
>>
>>> Hi Henkka,
>>>
>>> The behavior of the text file source meets expectation. Flink will not
>>> keep your source task thread when it exit from it's invoke method. That
>>> means you should keep your source task alive. So to implement this, you
>>> should customize a text file source (implement SourceFunction interface).
>>>
>>>  For your requirement, you can check a no more data idle time, if
>>> expire, then exit, finally the job will stop.
>>>
>>> You can also refer the implementation of other source connectors.
>>>
>>> Thanks, vino.
>>>
>>> 2018-07-19 19:52 GMT+08:00 Henri Heiskanen :
>>>
 Hi,

 I've been looking into how to initialise large state and especially
 checked this presentation by Lyft referenced in this group as well:
 https://www.youtube.com/watch?v=WdMcyN5QZZQ

 In our use case we would like to load roughly 4 billion entries into
 this state and I believe loading this data from s3, creating a savepoint
 and then restarting in streaming mode from a savepoint would work very
 well. In the presentation I get an impression that I could read from s3 and
 when all done (without any custom termination detector etc) I could just
 make a savepoint by calling the rest api from the app. However, I've
 noticed that if I read data from files the job will auto-terminate when all
 data is read and job appears not to be running even if I add the sleep in
 the main program (very simple app attached below).

 I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job
 from terminating and create the savepoint from outside the app, but that
 would require termination detection etc and would make the solution less
 clean.

 Has anyone more details how I could accomplish this?

 Br,
 Henkka

 public class StreamingJob {

public static void main(String[] args) throws Exception {
   if (args.length == 0) {
  args = "--initFile init.csv".split(" ");
   }

   // set up the streaming execution environment
   final StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();

   ParameterTool params = ParameterTool.fromArgs(args);

   String initFile = params.get("initFile");
   if (initFile != null) {
  env.readTextFile(initFile).map(new MapFunction>>> Tuple4>() {
 @Override
 public Tuple4 map(String s) 
 throws Exception {

Re: Re: Bootstrapping the state

2018-07-23 Thread Henri Heiskanen
Hi,

With state bootstrapping I mean loading the state with initial data before
starting the actual job. For example, in our case I would like to load
information like registration date of our users (>5 years of data) so that
I can enrich our event data in streaming (5 days retention).

Before watching the presentation by Lyft, I was loading this data per key
from Cassandra DB in the mapper if the state was not found.

Br,
Henkka

Br,
Henkka

On Fri, Jul 20, 2018 at 7:03 PM Vino yang  wrote:

> Hi Henkka,
>
> If you want to customize the datastream text source for your purpose. You
> can use a read counter, if the value of counter would not change in a
> interval you can guess all the data has been read. Just a idea, you can
> choose other solution.
>
> About creating a savepoint automatically on job exists, it sounds a good
> idea. I did not know any plan about this, I would try to submit this idea
> to the community.
>
> And about "how to bootstrap a state", what does that mean? can you explain
> this?
>
> Thank, vino
>
>
> On 2018-07-20 20:00 , Henri Heiskanen  Wrote:
>
> Hi,
>
> Thanks. Just to clarify, where would you then invoke the savepoint
> creation? I basically need to know when all data is read, create a
> savepoint and then exit. I think I could just as well use the
> PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to
> cancel with savepoint.
>
> Any plans to have feature where I could choose Flink to make a savepoint
> on job exists? I am also keen on hearing other ideas how to bootstrap a
> state. I was initially thinking of just reading data from Cassandra if no
> state available.
>
> Br,
> Henkka
>
> On Thu, Jul 19, 2018 at 3:15 PM vino yang  wrote:
>
>> Hi Henkka,
>>
>> The behavior of the text file source meets expectation. Flink will not
>> keep your source task thread when it exit from it's invoke method. That
>> means you should keep your source task alive. So to implement this, you
>> should customize a text file source (implement SourceFunction interface).
>>
>>  For your requirement, you can check a no more data idle time, if expire,
>> then exit, finally the job will stop.
>>
>> You can also refer the implementation of other source connectors.
>>
>> Thanks, vino.
>>
>> 2018-07-19 19:52 GMT+08:00 Henri Heiskanen :
>>
>>> Hi,
>>>
>>> I've been looking into how to initialise large state and especially
>>> checked this presentation by Lyft referenced in this group as well:
>>> https://www.youtube.com/watch?v=WdMcyN5QZZQ
>>>
>>> In our use case we would like to load roughly 4 billion entries into
>>> this state and I believe loading this data from s3, creating a savepoint
>>> and then restarting in streaming mode from a savepoint would work very
>>> well. In the presentation I get an impression that I could read from s3 and
>>> when all done (without any custom termination detector etc) I could just
>>> make a savepoint by calling the rest api from the app. However, I've
>>> noticed that if I read data from files the job will auto-terminate when all
>>> data is read and job appears not to be running even if I add the sleep in
>>> the main program (very simple app attached below).
>>>
>>> I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job
>>> from terminating and create the savepoint from outside the app, but that
>>> would require termination detection etc and would make the solution less
>>> clean.
>>>
>>> Has anyone more details how I could accomplish this?
>>>
>>> Br,
>>> Henkka
>>>
>>> public class StreamingJob {
>>>
>>>public static void main(String[] args) throws Exception {
>>>   if (args.length == 0) {
>>>  args = "--initFile init.csv".split(" ");
>>>   }
>>>
>>>   // set up the streaming execution environment
>>>   final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>   ParameterTool params = ParameterTool.fromArgs(args);
>>>
>>>   String initFile = params.get("initFile");
>>>   if (initFile != null) {
>>>  env.readTextFile(initFile).map(new MapFunction>> Tuple4>() {
>>> @Override
>>> public Tuple4 map(String s) 
>>> throws Exception {
>>>String[] data = s.split(",");
>>>return new Tuple4(data[0], 
>>> data[1], data[2], data[3]);
>>> }
>>>  }).keyBy(0, 1).map(new ProfileInitMapper());
>>>   }
>>>
>>>   // execute program
>>>   env.execute("Flink Streaming Java API Skeleton");
>>>
>>>   // when all data read, save the state
>>>   Thread.sleep(1);
>>>}
>>> }
>>>
>>>
>>>
>>>
>>


Re: Re: Bootstrapping the state

2018-07-20 Thread Vino yang
Hi Henkka, If you want to customize the datastream text source for your 
purpose. You can use a read counter, if the value of counter would not change 
in a interval you can guess all the data has been read. Just a idea, you can 
choose other solution. About creating a savepoint automatically on job exists, 
it sounds a good idea. I did not know any plan about this, I would try to 
submit this idea to the community. And about "how to bootstrap a state", what 
does that mean? can you explain this? Thank, vino On 2018-07-20 20:00 , Henri 
Heiskanen Wrote: Hi, Thanks. Just to clarify, where would you then invoke the 
savepoint creation? I basically need to know when all data is read, create a 
savepoint and then exit. I think I could just as well use the 
PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to cancel 
with savepoint. Any plans to have feature where I could choose Flink to make a 
savepoint on job exists? I am also keen on hearing other ideas how to bootstrap 
a state. I was initially thinking of just reading data from Cassandra if no 
state available. Br, Henkka On Thu, Jul 19, 2018 at 3:15 PM vino yang 
 wrote: Hi Henkka, The behavior of the text file source 
meets expectation. Flink will not keep your source task thread when it exit 
from it's invoke method. That means you should keep your source task alive. So 
to implement this, you should customize a text file source (implement 
SourceFunction interface). For your requirement, you can check a no more data 
idle time, if expire, then exit, finally the job will stop. You can also refer 
the implementation of other source connectors. Thanks, vino. 2018-07-19 19:52 
GMT+08:00 Henri Heiskanen : Hi, I've been looking 
into how to initialise large state and especially checked this presentation by 
Lyft referenced in this group as well: 
https://www.youtube.com/watch?v=WdMcyN5QZZQ In our use case we would like to 
load roughly 4 billion entries into this state and I believe loading this data 
from s3, creating a savepoint and then restarting in streaming mode from a 
savepoint would work very well. In the presentation I get an impression that I 
could read from s3 and when all done (without any custom termination detector 
etc) I could just make a savepoint by calling the rest api from the app. 
However, I've noticed that if I read data from files the job will 
auto-terminate when all data is read and job appears not to be running even if 
I add the sleep in the main program (very simple app attached below). I could 
use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job from terminating 
and create the savepoint from outside the app, but that would require 
termination detection etc and would make the solution less clean. Has anyone 
more details how I could accomplish this? Br, Henkka public class StreamingJob 
{public static void main(String[] args) throws Exception {   if 
(args.length == 0) {  args = "--initFile init.csv".split(" ");   }  
 // set up the streaming execution environment   final 
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();   ParameterTool 
params = ParameterTool.fromArgs(args);   String initFile = 
params.get("initFile");   if (initFile != null) {  
env.readTextFile(initFile).map(new MapFunction>() { @Override public Tuple4 map(String s) throws Exception {
String[] data = s.split(",");return new Tuple4(data[0], data[1], data[2], data[3]); }  
}).keyBy(0, 1).map(new ProfileInitMapper());   }   // execute program   
env.execute("Flink Streaming Java API Skeleton");   // when all data 
read, save the state   Thread.sleep(1);} }

Re: Bootstrapping the state

2018-07-20 Thread Henri Heiskanen
Hi,

Thanks. Just to clarify, where would you then invoke the savepoint
creation? I basically need to know when all data is read, create a
savepoint and then exit. I think I could just as well use the
PROCESS_CONTINUOSLY, monitor the stream outside and then use rest api to
cancel with savepoint.

Any plans to have feature where I could choose Flink to make a savepoint on
job exists? I am also keen on hearing other ideas how to bootstrap a state.
I was initially thinking of just reading data from Cassandra if no state
available.

Br,
Henkka

On Thu, Jul 19, 2018 at 3:15 PM vino yang  wrote:

> Hi Henkka,
>
> The behavior of the text file source meets expectation. Flink will not
> keep your source task thread when it exit from it's invoke method. That
> means you should keep your source task alive. So to implement this, you
> should customize a text file source (implement SourceFunction interface).
>
>  For your requirement, you can check a no more data idle time, if expire,
> then exit, finally the job will stop.
>
> You can also refer the implementation of other source connectors.
>
> Thanks, vino.
>
> 2018-07-19 19:52 GMT+08:00 Henri Heiskanen :
>
>> Hi,
>>
>> I've been looking into how to initialise large state and especially
>> checked this presentation by Lyft referenced in this group as well:
>> https://www.youtube.com/watch?v=WdMcyN5QZZQ
>>
>> In our use case we would like to load roughly 4 billion entries into this
>> state and I believe loading this data from s3, creating a savepoint and
>> then restarting in streaming mode from a savepoint would work very well. In
>> the presentation I get an impression that I could read from s3 and when all
>> done (without any custom termination detector etc) I could just make a
>> savepoint by calling the rest api from the app. However, I've noticed that
>> if I read data from files the job will auto-terminate when all data is read
>> and job appears not to be running even if I add the sleep in the main
>> program (very simple app attached below).
>>
>> I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job
>> from terminating and create the savepoint from outside the app, but that
>> would require termination detection etc and would make the solution less
>> clean.
>>
>> Has anyone more details how I could accomplish this?
>>
>> Br,
>> Henkka
>>
>> public class StreamingJob {
>>
>>public static void main(String[] args) throws Exception {
>>   if (args.length == 0) {
>>  args = "--initFile init.csv".split(" ");
>>   }
>>
>>   // set up the streaming execution environment
>>   final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>   ParameterTool params = ParameterTool.fromArgs(args);
>>
>>   String initFile = params.get("initFile");
>>   if (initFile != null) {
>>  env.readTextFile(initFile).map(new MapFunction> Tuple4>() {
>> @Override
>> public Tuple4 map(String s) 
>> throws Exception {
>>String[] data = s.split(",");
>>return new Tuple4(data[0], 
>> data[1], data[2], data[3]);
>> }
>>  }).keyBy(0, 1).map(new ProfileInitMapper());
>>   }
>>
>>   // execute program
>>   env.execute("Flink Streaming Java API Skeleton");
>>
>>   // when all data read, save the state
>>   Thread.sleep(1);
>>}
>> }
>>
>>
>>
>>
>


Re: Bootstrapping the state

2018-07-19 Thread vino yang
Hi Henkka,

The behavior of the text file source meets expectation. Flink will not keep
your source task thread when it exit from it's invoke method. That means
you should keep your source task alive. So to implement this, you should
customize a text file source (implement SourceFunction interface).

 For your requirement, you can check a no more data idle time, if expire,
then exit, finally the job will stop.

You can also refer the implementation of other source connectors.

Thanks, vino.

2018-07-19 19:52 GMT+08:00 Henri Heiskanen :

> Hi,
>
> I've been looking into how to initialise large state and especially
> checked this presentation by Lyft referenced in this group as well:
> https://www.youtube.com/watch?v=WdMcyN5QZZQ
>
> In our use case we would like to load roughly 4 billion entries into this
> state and I believe loading this data from s3, creating a savepoint and
> then restarting in streaming mode from a savepoint would work very well. In
> the presentation I get an impression that I could read from s3 and when all
> done (without any custom termination detector etc) I could just make a
> savepoint by calling the rest api from the app. However, I've noticed that
> if I read data from files the job will auto-terminate when all data is read
> and job appears not to be running even if I add the sleep in the main
> program (very simple app attached below).
>
> I could use FileProcessingMode.PROCESS_CONTINUOUSLY to prevent the job
> from terminating and create the savepoint from outside the app, but that
> would require termination detection etc and would make the solution less
> clean.
>
> Has anyone more details how I could accomplish this?
>
> Br,
> Henkka
>
> public class StreamingJob {
>
>public static void main(String[] args) throws Exception {
>   if (args.length == 0) {
>  args = "--initFile init.csv".split(" ");
>   }
>
>   // set up the streaming execution environment
>   final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>   ParameterTool params = ParameterTool.fromArgs(args);
>
>   String initFile = params.get("initFile");
>   if (initFile != null) {
>  env.readTextFile(initFile).map(new MapFunction Tuple4>() {
> @Override
> public Tuple4 map(String s) 
> throws Exception {
>String[] data = s.split(",");
>return new Tuple4(data[0], 
> data[1], data[2], data[3]);
> }
>  }).keyBy(0, 1).map(new ProfileInitMapper());
>   }
>
>   // execute program
>   env.execute("Flink Streaming Java API Skeleton");
>
>   // when all data read, save the state
>   Thread.sleep(1);
>}
> }
>
>
>
>