Re: Statefun embedded functions - parallel per partition, sequential per key

2021-11-03 Thread Igal Shilman
Glad to hear it worked out for you :-)

Cheers,
Igal

On Tue, Nov 2, 2021 at 1:57 PM Filip Karnicki 
wrote:

> Hi All
>
> Just an update for future reference, it turned out that the machine we
> were using for this test didn't have enough memory for what we were asking
> it to do. It was that simple. The upside is that not even with the world's
> most unstable cluster did we manage to lose a single message.
>
> Just as an aside, we got the best results by switching back to undertow,
> but we ended up using it slightly differently than the current example in
> the docs suggests. We needed to pass the work onto a worker thread because
> we had a blocking call in our funcion
>
> class Handler extends HttpHandler{
> (...)
>
>   def handleRequest(exchange: HttpServerExchange): Unit = {
> if (exchange.isInIoThread) {
>   exchange.dispatch(this)
>   return
> }
> exchange.getRequestReceiver.receiveFullBytes((exchange, bytes) => {
>   flinkHandler
> .handle(Slices.wrap(bytes))
> .whenComplete((response: Slice, exception: Throwable) =>
> onComplete(exchange, response, exception))
> })
>   }
>
>   def onComplete(exchange: HttpServerExchange, slice: Slice, throwable:
> Throwable) = (... as per the example)
>
> }
>
> Many thanks again for your help, Igal
>
> On Wed, 27 Oct 2021 at 13:59, Filip Karnicki 
> wrote:
>
>> Thanks for your reply Igal
>>
>> The reason why I'm using data stream integration is that the messages on
>> kafka are in .json, and I need to convert them to protobufs for embedded
>> functions. If I was using remote functions I wouldn't need to do that.
>>
>> With regards to performance, in order to exclude the possibility that
>> it's the remote service that's causing a slowdown, I replaced the undertow
>> example from the docs with 5 instances of webflux services that hand off
>> the work from an event loop to a worker which then sleeps for 1 second. I
>> then launched an nginx instance to forward the request in a round robin
>> fashion to the 5 webflux instances.
>>
>> When I push 10_000 messages onto the ingress kafka topic, it takes
>> upwards of 100 seconds to process all messages. The flink cluster first
>> works pretty hard for about 30 seconds (at ~100% of cpu utilisation) then
>> everything slows down and eventually I get tens of messages trickling down
>> until, after the flink-side statefun job (not the remote job) crashes and
>> gets restarted, which is when the last few stragglers get sent to the
>> egress after 120+ seconds from the launch of the test.
>>
>> I can try to replicate this outside of my work environment if you'd like
>> to run it yourself, but in the meantime, is there a way to achieve this
>> 'sequencial-per-key' behaviour with the use of embedded functions? Those
>> seem to be rock-solid. Maybe there are some internal classes that would at
>> least provide a template on how to do it? I have a naive implementation
>> ready (the one I described in the previous email) but I'm sure there are
>> some edge cases I haven't thought of.
>>
>> Thanks again,
>> Fil
>>
>>
>>
>> On Wed, 27 Oct 2021 at 09:43, Igal Shilman 
>> wrote:
>>
>>> Hello Fil,
>>>
>>> Indeed what you are describing is exactly what a remote function does.
>>>
>>> I am curious to learn more about the current performance limitations
>>> that you encounter with the remote functions.
>>>
>>> One thing to try in combination with the async transport, is to increase
>>> the total number of in fight async operations, by setting the following
>>> property in flink-conf.yaml:
>>>
>>> statefun.async.max-per-task
>>>
>>> To a much higher value than 1024, try experimenting with 16k,32k,64k and
>>> even higher.
>>>
>>> Let me know if that improves the situation, and we can continue from
>>> there.
>>>
>>> p.s,
>>>
>>> You've mentioned that you are using the data stream integration, where
>>> there any particular reason you chose that? It has some limitations at the
>>> moment with respect to remote functions.
>>>
>>>
>>> Cheers,
>>> Igal
>>>
>>> On Wed 27. Oct 2021 at 08:49, Filip Karnicki 
>>> wrote:
>>>
 Hi

 I have a kafka topic with json messages that I map to protobufs within
 a data stream, and then send those to embedded stateful functions using the
 datastream integration api (DataStream[RoutableMessage]). From there I need
 to make an idempotent long-running blocking IO call.

 I noticed that I was processing messages sequentially per kafka
 partition. Is there a way that I could process them sequentially by key
 only (but in parallel per partition)?

 I created some code that uses the embedded functions'
 registerAsyncOperation capabilities to make my long-running IO calls
 effectively asynchronous, but I had to add all this custom code to enqueue
 and persist any messages for a key that came in while there was an
 in-flight IO call happening for that key. I'm fairly confident that I can
 figure out al

Re: Statefun embedded functions - parallel per partition, sequential per key

2021-11-02 Thread Filip Karnicki
Hi All

Just an update for future reference, it turned out that the machine we were
using for this test didn't have enough memory for what we were asking it to
do. It was that simple. The upside is that not even with the world's most
unstable cluster did we manage to lose a single message.

Just as an aside, we got the best results by switching back to undertow,
but we ended up using it slightly differently than the current example in
the docs suggests. We needed to pass the work onto a worker thread because
we had a blocking call in our funcion

class Handler extends HttpHandler{
(...)

  def handleRequest(exchange: HttpServerExchange): Unit = {
if (exchange.isInIoThread) {
  exchange.dispatch(this)
  return
}
exchange.getRequestReceiver.receiveFullBytes((exchange, bytes) => {
  flinkHandler
.handle(Slices.wrap(bytes))
.whenComplete((response: Slice, exception: Throwable) =>
onComplete(exchange, response, exception))
})
  }

  def onComplete(exchange: HttpServerExchange, slice: Slice, throwable:
Throwable) = (... as per the example)

}

Many thanks again for your help, Igal

On Wed, 27 Oct 2021 at 13:59, Filip Karnicki 
wrote:

> Thanks for your reply Igal
>
> The reason why I'm using data stream integration is that the messages on
> kafka are in .json, and I need to convert them to protobufs for embedded
> functions. If I was using remote functions I wouldn't need to do that.
>
> With regards to performance, in order to exclude the possibility that it's
> the remote service that's causing a slowdown, I replaced the undertow
> example from the docs with 5 instances of webflux services that hand off
> the work from an event loop to a worker which then sleeps for 1 second. I
> then launched an nginx instance to forward the request in a round robin
> fashion to the 5 webflux instances.
>
> When I push 10_000 messages onto the ingress kafka topic, it takes upwards
> of 100 seconds to process all messages. The flink cluster first works
> pretty hard for about 30 seconds (at ~100% of cpu utilisation) then
> everything slows down and eventually I get tens of messages trickling down
> until, after the flink-side statefun job (not the remote job) crashes and
> gets restarted, which is when the last few stragglers get sent to the
> egress after 120+ seconds from the launch of the test.
>
> I can try to replicate this outside of my work environment if you'd like
> to run it yourself, but in the meantime, is there a way to achieve this
> 'sequencial-per-key' behaviour with the use of embedded functions? Those
> seem to be rock-solid. Maybe there are some internal classes that would at
> least provide a template on how to do it? I have a naive implementation
> ready (the one I described in the previous email) but I'm sure there are
> some edge cases I haven't thought of.
>
> Thanks again,
> Fil
>
>
>
> On Wed, 27 Oct 2021 at 09:43, Igal Shilman  wrote:
>
>> Hello Fil,
>>
>> Indeed what you are describing is exactly what a remote function does.
>>
>> I am curious to learn more about the current performance limitations that
>> you encounter with the remote functions.
>>
>> One thing to try in combination with the async transport, is to increase
>> the total number of in fight async operations, by setting the following
>> property in flink-conf.yaml:
>>
>> statefun.async.max-per-task
>>
>> To a much higher value than 1024, try experimenting with 16k,32k,64k and
>> even higher.
>>
>> Let me know if that improves the situation, and we can continue from
>> there.
>>
>> p.s,
>>
>> You've mentioned that you are using the data stream integration, where
>> there any particular reason you chose that? It has some limitations at the
>> moment with respect to remote functions.
>>
>>
>> Cheers,
>> Igal
>>
>> On Wed 27. Oct 2021 at 08:49, Filip Karnicki 
>> wrote:
>>
>>> Hi
>>>
>>> I have a kafka topic with json messages that I map to protobufs within a
>>> data stream, and then send those to embedded stateful functions using the
>>> datastream integration api (DataStream[RoutableMessage]). From there I need
>>> to make an idempotent long-running blocking IO call.
>>>
>>> I noticed that I was processing messages sequentially per kafka
>>> partition. Is there a way that I could process them sequentially by key
>>> only (but in parallel per partition)?
>>>
>>> I created some code that uses the embedded functions'
>>> registerAsyncOperation capabilities to make my long-running IO calls
>>> effectively asynchronous, but I had to add all this custom code to enqueue
>>> and persist any messages for a key that came in while there was an
>>> in-flight IO call happening for that key. I'm fairly confident that I can
>>> figure out all the fault tolerance cases _eventually_ (including re-sending
>>> the in-flight message upon getting the UNKNOWN status back from the async
>>> operation).
>>>
>>> That said, am I missing a trick that would allow Flink/statefun to take
>>> care of this "parallel 

Re: Statefun embedded functions - parallel per partition, sequential per key

2021-10-27 Thread Filip Karnicki
Thanks for your reply Igal

The reason why I'm using data stream integration is that the messages on
kafka are in .json, and I need to convert them to protobufs for embedded
functions. If I was using remote functions I wouldn't need to do that.

With regards to performance, in order to exclude the possibility that it's
the remote service that's causing a slowdown, I replaced the undertow
example from the docs with 5 instances of webflux services that hand off
the work from an event loop to a worker which then sleeps for 1 second. I
then launched an nginx instance to forward the request in a round robin
fashion to the 5 webflux instances.

When I push 10_000 messages onto the ingress kafka topic, it takes upwards
of 100 seconds to process all messages. The flink cluster first works
pretty hard for about 30 seconds (at ~100% of cpu utilisation) then
everything slows down and eventually I get tens of messages trickling down
until, after the flink-side statefun job (not the remote job) crashes and
gets restarted, which is when the last few stragglers get sent to the
egress after 120+ seconds from the launch of the test.

I can try to replicate this outside of my work environment if you'd like to
run it yourself, but in the meantime, is there a way to achieve this
'sequencial-per-key' behaviour with the use of embedded functions? Those
seem to be rock-solid. Maybe there are some internal classes that would at
least provide a template on how to do it? I have a naive implementation
ready (the one I described in the previous email) but I'm sure there are
some edge cases I haven't thought of.

Thanks again,
Fil



On Wed, 27 Oct 2021 at 09:43, Igal Shilman  wrote:

> Hello Fil,
>
> Indeed what you are describing is exactly what a remote function does.
>
> I am curious to learn more about the current performance limitations that
> you encounter with the remote functions.
>
> One thing to try in combination with the async transport, is to increase
> the total number of in fight async operations, by setting the following
> property in flink-conf.yaml:
>
> statefun.async.max-per-task
>
> To a much higher value than 1024, try experimenting with 16k,32k,64k and
> even higher.
>
> Let me know if that improves the situation, and we can continue from there.
>
> p.s,
>
> You've mentioned that you are using the data stream integration, where
> there any particular reason you chose that? It has some limitations at the
> moment with respect to remote functions.
>
>
> Cheers,
> Igal
>
> On Wed 27. Oct 2021 at 08:49, Filip Karnicki 
> wrote:
>
>> Hi
>>
>> I have a kafka topic with json messages that I map to protobufs within a
>> data stream, and then send those to embedded stateful functions using the
>> datastream integration api (DataStream[RoutableMessage]). From there I need
>> to make an idempotent long-running blocking IO call.
>>
>> I noticed that I was processing messages sequentially per kafka
>> partition. Is there a way that I could process them sequentially by key
>> only (but in parallel per partition)?
>>
>> I created some code that uses the embedded functions'
>> registerAsyncOperation capabilities to make my long-running IO calls
>> effectively asynchronous, but I had to add all this custom code to enqueue
>> and persist any messages for a key that came in while there was an
>> in-flight IO call happening for that key. I'm fairly confident that I can
>> figure out all the fault tolerance cases _eventually_ (including re-sending
>> the in-flight message upon getting the UNKNOWN status back from the async
>> operation).
>>
>> That said, am I missing a trick that would allow Flink/statefun to take
>> care of this "parallel per partition, sequential-per-key" behaviour? Remote
>> functions don't seem to have the performance we need, even with async http
>> transport.
>>
>> Many thanks!
>> Fil
>>
> --
>
> ---
>
> about.me/igalshilman
>
>


Re: Statefun embedded functions - parallel per partition, sequential per key

2021-10-27 Thread Igal Shilman
Hello Fil,

Indeed what you are describing is exactly what a remote function does.

I am curious to learn more about the current performance limitations that
you encounter with the remote functions.

One thing to try in combination with the async transport, is to increase
the total number of in fight async operations, by setting the following
property in flink-conf.yaml:

statefun.async.max-per-task

To a much higher value than 1024, try experimenting with 16k,32k,64k and
even higher.

Let me know if that improves the situation, and we can continue from there.

p.s,

You've mentioned that you are using the data stream integration, where
there any particular reason you chose that? It has some limitations at the
moment with respect to remote functions.


Cheers,
Igal

On Wed 27. Oct 2021 at 08:49, Filip Karnicki 
wrote:

> Hi
>
> I have a kafka topic with json messages that I map to protobufs within a
> data stream, and then send those to embedded stateful functions using the
> datastream integration api (DataStream[RoutableMessage]). From there I need
> to make an idempotent long-running blocking IO call.
>
> I noticed that I was processing messages sequentially per kafka partition.
> Is there a way that I could process them sequentially by key only (but in
> parallel per partition)?
>
> I created some code that uses the embedded functions'
> registerAsyncOperation capabilities to make my long-running IO calls
> effectively asynchronous, but I had to add all this custom code to enqueue
> and persist any messages for a key that came in while there was an
> in-flight IO call happening for that key. I'm fairly confident that I can
> figure out all the fault tolerance cases _eventually_ (including re-sending
> the in-flight message upon getting the UNKNOWN status back from the async
> operation).
>
> That said, am I missing a trick that would allow Flink/statefun to take
> care of this "parallel per partition, sequential-per-key" behaviour? Remote
> functions don't seem to have the performance we need, even with async http
> transport.
>
> Many thanks!
> Fil
>
-- 

---

about.me/igalshilman


Statefun embedded functions - parallel per partition, sequential per key

2021-10-26 Thread Filip Karnicki
Hi

I have a kafka topic with json messages that I map to protobufs within a
data stream, and then send those to embedded stateful functions using the
datastream integration api (DataStream[RoutableMessage]). From there I need
to make an idempotent long-running blocking IO call.

I noticed that I was processing messages sequentially per kafka partition.
Is there a way that I could process them sequentially by key only (but in
parallel per partition)?

I created some code that uses the embedded functions'
registerAsyncOperation capabilities to make my long-running IO calls
effectively asynchronous, but I had to add all this custom code to enqueue
and persist any messages for a key that came in while there was an
in-flight IO call happening for that key. I'm fairly confident that I can
figure out all the fault tolerance cases _eventually_ (including re-sending
the in-flight message upon getting the UNKNOWN status back from the async
operation).

That said, am I missing a trick that would allow Flink/statefun to take
care of this "parallel per partition, sequential-per-key" behaviour? Remote
functions don't seem to have the performance we need, even with async http
transport.

Many thanks!
Fil