Hi All

Just an update for future reference, it turned out that the machine we were
using for this test didn't have enough memory for what we were asking it to
do. It was that simple. The upside is that not even with the world's most
unstable cluster did we manage to lose a single message.

Just as an aside, we got the best results by switching back to undertow,
but we ended up using it slightly differently than the current example in
the docs suggests. We needed to pass the work onto a worker thread because
we had a blocking call in our funcion

class Handler extends HttpHandler{
(...)

  def handleRequest(exchange: HttpServerExchange): Unit = {
    if (exchange.isInIoThread) {
      exchange.dispatch(this)
      return
    }
    exchange.getRequestReceiver.receiveFullBytes((exchange, bytes) => {
      flinkHandler
        .handle(Slices.wrap(bytes))
        .whenComplete((response: Slice, exception: Throwable) =>
        onComplete(exchange, response, exception))
    })
  }

  def onComplete(exchange: HttpServerExchange, slice: Slice, throwable:
Throwable) = (... as per the example)

}

Many thanks again for your help, Igal

On Wed, 27 Oct 2021 at 13:59, Filip Karnicki <filip.karni...@gmail.com>
wrote:

> Thanks for your reply Igal
>
> The reason why I'm using data stream integration is that the messages on
> kafka are in .json, and I need to convert them to protobufs for embedded
> functions. If I was using remote functions I wouldn't need to do that.
>
> With regards to performance, in order to exclude the possibility that it's
> the remote service that's causing a slowdown, I replaced the undertow
> example from the docs with 5 instances of webflux services that hand off
> the work from an event loop to a worker which then sleeps for 1 second. I
> then launched an nginx instance to forward the request in a round robin
> fashion to the 5 webflux instances.
>
> When I push 10_000 messages onto the ingress kafka topic, it takes upwards
> of 100 seconds to process all messages. The flink cluster first works
> pretty hard for about 30 seconds (at ~100% of cpu utilisation) then
> everything slows down and eventually I get tens of messages trickling down
> until, after the flink-side statefun job (not the remote job) crashes and
> gets restarted, which is when the last few stragglers get sent to the
> egress after 120+ seconds from the launch of the test.
>
> I can try to replicate this outside of my work environment if you'd like
> to run it yourself, but in the meantime, is there a way to achieve this
> 'sequencial-per-key' behaviour with the use of embedded functions? Those
> seem to be rock-solid. Maybe there are some internal classes that would at
> least provide a template on how to do it? I have a naive implementation
> ready (the one I described in the previous email) but I'm sure there are
> some edge cases I haven't thought of.
>
> Thanks again,
> Fil
>
>
>
> On Wed, 27 Oct 2021 at 09:43, Igal Shilman <igal.shil...@gmail.com> wrote:
>
>> Hello Fil,
>>
>> Indeed what you are describing is exactly what a remote function does.
>>
>> I am curious to learn more about the current performance limitations that
>> you encounter with the remote functions.
>>
>> One thing to try in combination with the async transport, is to increase
>> the total number of in fight async operations, by setting the following
>> property in flink-conf.yaml:
>>
>> statefun.async.max-per-task
>>
>> To a much higher value than 1024, try experimenting with 16k,32k,64k and
>> even higher.
>>
>> Let me know if that improves the situation, and we can continue from
>> there.
>>
>> p.s,
>>
>> You've mentioned that you are using the data stream integration, where
>> there any particular reason you chose that? It has some limitations at the
>> moment with respect to remote functions.
>>
>>
>> Cheers,
>> Igal
>>
>> On Wed 27. Oct 2021 at 08:49, Filip Karnicki <filip.karni...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> I have a kafka topic with json messages that I map to protobufs within a
>>> data stream, and then send those to embedded stateful functions using the
>>> datastream integration api (DataStream[RoutableMessage]). From there I need
>>> to make an idempotent long-running blocking IO call.
>>>
>>> I noticed that I was processing messages sequentially per kafka
>>> partition. Is there a way that I could process them sequentially by key
>>> only (but in parallel per partition)?
>>>
>>> I created some code that uses the embedded functions'
>>> registerAsyncOperation capabilities to make my long-running IO calls
>>> effectively asynchronous, but I had to add all this custom code to enqueue
>>> and persist any messages for a key that came in while there was an
>>> in-flight IO call happening for that key. I'm fairly confident that I can
>>> figure out all the fault tolerance cases _eventually_ (including re-sending
>>> the in-flight message upon getting the UNKNOWN status back from the async
>>> operation).
>>>
>>> That said, am I missing a trick that would allow Flink/statefun to take
>>> care of this "parallel per partition, sequential-per-key" behaviour? Remote
>>> functions don't seem to have the performance we need, even with async http
>>> transport.
>>>
>>> Many thanks!
>>> Fil
>>>
>> --
>>
>> ---
>>
>> about.me/igalshilman
>>
>>

Reply via email to