Hi Prasanna,

here are some quick thoughts

1) Batching is an aggregation operation.But what I have seen in the
> examples of windowing is that they get the count/max/min operation in the
> particular window.  So could the batching be implemented via a windowing
> mechanism ?
>

I guess a custom function with state and processing timers should give you
the flexibility you want. BTW Apache Beam is using this approach to the
batching you're describing (they call this bundles) [1][2].

2) Does the performance of broadcasted state better than LookUp Cache?
> (Personally i have implemented broadcasted state for other purpose and not
> sure about the performance of Querying DB+LookUpCache)
>

My intuition is that broadcasted state should be faster, because it's local
to the process. However, this would depend on state size, memory
requirements, garbage collector ... I think with smart batching throughput
of remote lookup may be as good as broadcasted state (in most of the
clouds, network is fast enough to do this). You should also take into
consideration if you want to have some kind of (watermark) synchronization
between the two streams (this is basically a special type of stream to
stream join), or if you don't care that config may not be to date.


> Using Keyed Process BroadCast looks Better than using non keyed as the
> same data is not replicated against all the parallel operators.
> A caveat here is that the load across all subscriptions are not the same .
> So if we key the stream , then we might have unbalanced job running.
> Thoughts on this ?
>

If one side (broadcasted) is way smaller compared to other side (and fits
in memory), it should be better to broadcast as you don't have to perform
an extra shuffle of the bigger side. This is also called map-side join.

4) Latency must be minimal , so the first thought is to store the
> messages to be batched in HashMapStateBackend.
> But to store both the State of config and the data in HEAP might increase
> the memory usage a lot. Since there could be a huge spike in load.    Are
> there any other things that need to be factored in
>

There is a nice write-up on this topic in FLIP-50 [3]. Also GC is a great
factor when it comes to low latency (depends on what your requirements
are). We had a really great results with ZGC and Java 16, but it takes some
effort to get everything working (this is also not officially supported by
Flink community).

5) Auto Scaling capability would save a lot of cost because of consistent
> load patterns with occasional spikes. Though reactive scaling is introduced
> in flink 1.13 , we don't know whether its battle hardened .
>

There is still lot of work to be done in this direction. Having reactive
scaling is just one piece of the puzzle. Right now, re-scaling time
(savepoint + restore) is probably the biggest concern. If you want to have
a super low latency, re-scale might hit you hard, especially with larger
states.

6) After looking at the solutions , does flink seem to be a natural fit for
> this use case in comparison to Spring Reactor framework/vert.x ?
> One thing we see from the documentation is that spring reactive can auto
> scale very well but we need to work on fault tolerance/stability from the
> dev side which flink is good at.
>

These technologies are designed for a different purpose than Flink.

[1]
https://github.com/apache/beam/blob/v2.31.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L525
[2]
https://github.com/apache/beam/blob/v2.31.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L878
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-50%3A+Spill-able+Heap+Keyed+State+Backend

Best,
D.


On Sat, Aug 14, 2021 at 5:12 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Hi,
>
> Aim                                        : Building an event delivery
> service
> Scale                                     : Peak load 50k messages/sec.
> Average load 5k messages/sec Expected to grow every passing month
> Unique Customer Endpoints : 10k+
> Unique events(kafka topics)  : 500+
> Unique tenants                      : 30k+
> Subscription Level                 : Events are generated for tenants.
>                                                 Customers may subscribe a)
> entirely to an event or b) either at tenant level ( 5 tenants or 100
> tenants) or c) even at sub-tenant level. ( Tenant 2. Dept 100,200,300)
> *Other Requirements *:
> 1) Batching events based on quantity or minimum threshold time whichever
> comes first . Example 1000 messages or 1 sec.
> 2) Message size < 1kb
>
> *Possible Solutions:*
>
> 1) Build an app using reactive programming say vert.x/spring reactive etc
> 2) Use apache flink
>
> *Flink Solution *
> RDS : Has the subscription connection details
>
> [image: Flink HTTP Publisher.png]
>
> 2a ) Use DB and Look UP Cache to retrieve Configurations
>
> (i)   Stream the data from kafka
> (ii)  For every message flowing in , query RDS(postgres) ,get the
> connection/subscription details, and apply filters. [Use lookup Cache to
> improve performance]
> (iii a)  if it's a streaming customer , form the message with appropriate
> authentication details.
> (iii b) if it's a batch customer, push the message to the state backend.
> Once maximum message or minimum threshold batch time is reached, retrieve
> the messages and form a single batch message with appropriate
> authentication details.
> (iv) Send message and endpoint info to async sink. which delivers to
> customers. In case of failure write to a dead letter queue where customers
> can poll later.,
>
>
> 2b ) Load Configurations to BroadCastState and Update it in a regular
> interval
>
> (i) Stream the data from kafka
> (iI) Periodically query the PROXY API (on top of RDS) to get the latest
> added/updated subscription/connection details .
> (iii) For every message flowing in from kafka , Check against the
> broadcasted configuration to find the customers subscribed for the event,
> their filter requirement and connection details.
> (iv a)  if it's a streaming customer , form the message with appropriate
> authentication details.
> (v b) if it's a batch customer, push the message to the state backend.
> Once maximum message or minimum threshold batch time is reached, retrieve
> the messages and form a single batch message with appropriate
> authentication details.
> (vi) Send message and endpoint info to async sink. which delivers to
> customers. In case of failure write to a dead letter queue where customers
> can poll later.
>
> *Questions : *
> 1) Batching is an aggregation operation.But what I have seen in the
> examples of windowing is that they get the count/max/min operation in the
> particular window.  So could the batching be implemented via a windowing
> mechanism ?
>
> 2) Is it a good design to have both batched delivery and per-event
> delivery in the same job or should it be different ?
>
> 2) Does the performance of broadcasted state better than LookUp Cache?
> (Personally i have implemented broadcasted state for other purpose and not
> sure about the performance of Querying DB+LookUpCache)
>
> 3) I read this
> " The working copy of broadcast state is always on the heap; not in
> RocksDB. So, it has to be small enough to fit in memory. Furthermore, each
> instance will copy all of the broadcast state into its checkpoints, so all
> checkpoints and savepoints will have *n* copies of the broadcast state
> (where *n* is the parallelism).
> If you are able to key partition this data, then you might not need to
> broadcast it. It sounds like it might be per-employee data that could be
> keyed by the employeeId. But if not, then you'll have to keep it small
> enough to fit into memory. "
>
> Using Keyed Process BroadCast looks Better than using non keyed as the
> same data is not replicated against all the parallel operators.
> A caveat here is that the load across all subscriptions are not the same .
> So if we key the stream , then we might have unbalanced job running.
> Thoughts on this ?
>
> 4) Latency must be minimal , so the first thought is to store the
> messages to be batched in HashMapStateBackend.
> But to store both the State of config and the data in HEAP might increase
> the memory usage a lot. Since there could be a huge spike in load.    Are
> there any other things that need to be factored in ?
>
> 5) Auto Scaling capability would save a lot of cost because of consistent
> load patterns with occasional spikes. Though reactive scaling is introduced
> in flink 1.13 , we don't know whether its battle hardened .
>
> 6) After looking at the solutions , does flink seem to be a natural fit
> for this use case in comparison to Spring Reactor framework/vert.x ?
> One thing we see from the documentation is that spring reactive can auto
> scale very well but we need to work on fault tolerance/stability from the
> dev side which flink is good at.
>
> Spring reactor is new to us and we wanted to look at that after exploring
> flink for this use case.
>
> Thanks,
> Prasanna.
>
>
>
>
>
>

Reply via email to