Hi Divij,

I think that your understanding is correct. The rest of replies inline...

On 30. 05. 23 15:31, Divij Vaidya wrote:
Caution: This email originated from outside of the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.


Thank you for mentioning the use case.

I will try to summarize what you mentioned above in my own words to ensure
that I have understood this correctly.
There are practical use cases where the user application has a dependency
on Kafka producer and requires very quick cold starts. In such cases, CRaC
could be used to restore the user application. However, since Kafka clients
do not support CRaC, they become a bottleneck for snapshotting the user
application. With this change, we will provide the ability for user
applications to snapshot including a clean restore of the kafka clients.

Now, having understood the motivation, I have the following comments:

#1 I think we need to make it very clear that a snapshot of a kafka
producer will mean:
- Losing all records which are waiting to be sent to the broker (stored in
RecordAccumulator). Perhaps, we need to print logs (warn level) and emit
metrics on suspend wrt how many requests were waiting to be sent.


I don't think this is entirely true, unless there's an error in the implementation: CRaC will snapshot even the waiting records. To the broker, this should look like a temporary network error, unless it decides that it should discard the records upon arriving (e.g. due to transaction timeout).


#2 We need to document all customer facing changes in the KIP. As an
example, the metrics that will be added, changes to any interface that is
public facing (I don't think any public interface change is required) and
changes to customer facing behaviour (we need to add documentation around
this).

How detailed should this planning be? My impression would be that the KIP should decide whether to support CRaC at all (this early etc...) and outline things, but it might be impractical to sketch every detail before implementing it. The support for different use cases would then come on as-needed basis, rather than one big PR solving everything.


#3 We need to document what happens to existing uncommitted transactions. I
think the answer would probably be that the behaviour will be similar to a
client closing an existing connection but if we don't close things properly
(such as in transactionManager), there may be lingering effects.

Would be the client informed about the transactions by the broker, or should it decide about the transactions just on its own?



Question:
#4 Does this JDK feature retain the state of the local data structures such
as a Map/List? If yes, do we want to add validation of recovered data as
part of the startup process?

All objects, both on heap and in native memory stay as these were, unless changed as a result of the notifications. Some changes happen this way even in JDK, e.g. for security reasons the SecureRandom instance might be reinitialized. Any anti-tampering protection should be part of the CRaC project, the application should assume the memory to be restored correctly.


#5 How do we plan to test the correctness of the recovery/restore?

It would not be practical to test the C/R itself as part of the testsuite, as this requires a non-standard build of JDK and so far works only on Linux. However, issuing a fake notification and testing correctness of the system afterwards is possible even without CRaC build.



Overall, I think that simply restoring the Sender might not work. We will
probably need to devise a clean "suspend" mechanism for the entire producer
including components such as TransactionManager, Partitioner, Record
Accumulator etc. Additionally, Justine (github: jolshan) is our expert on
producer side things, perhaps, they have a better idea to create a clean
producer suspension.

I understand that what I did might be limited, but suspending *everything* is a big effort. Even from the performance aspect we would like to do as little as possible. That's why the POC implementation shuts down only the connections and blocks any thread trying to access the suspended component.

What I would rather see is verifying that error handling if something is rejected (e.g. because of TX timeout/rollback) works as expected. Application should handle network errors and timeouts anyway. One case that might be of concern is identity duplication - when the snapshot host an unique identifier of the client or transaction. It might be useful to ensure that those are removed as part of the checkpoint process (and in fact this means cancelling those transactions).

Radim



--
Divij Vaidya



On Thu, May 25, 2023 at 11:15 AM Radim Vansa <rva...@azul.com.invalid>
wrote:

Hi Divij,

I have prototyped this using Quarkus Superheroes [1], a demo application
consisting of several microservices that communicate with each other
using both HTTP and Kafka. I wanted to add the ability to transparently
checkpoint and restore this application - while the regular startup
takes seconds, the restore could bring this application online in the
order of tens of milliseconds.

I agree that the change will not help Kafka itself to get any faster; it
will enable CRaC for the whole application that, amongst other
technologies, uses Kafka. You're saying that the clients are not
supposed to be re-created quickly, I hope that a use case where the app
is scaled down if it's idle e.g. 60 seconds and then needs to be started
on a request (to serve it ASAP) would make sense to you. It's really not
about Kafka per-se - it's about the needs of those who consume it. Of
course, I'll be glad for any comments pointing out difficulties e.g. if
the producer is replicated.

An alternative, and less transparent approach, would handle this in the
integration layer. However from my experience this can be problematic if
the integration layer provides Kafka API directly, losing control over
the instance - it's not possible to simply shutdown the client and
reopen the instance, and some sort of proxy would be needed that
prevents access to this closed instance. And besides complexity, proxy
means degraded performance.

Another motivation to push changes as far down the dependency tree is
the fan-out of these changes: we don't want to target Quarkus
specifically, but other frameworks (Spring Boot, ...) and stand-alone
applications as well. By keeping it low level we can concentrate the
maintenance efforts to once place.

Thank you for spending time reviewing the proposal and let me know if I
can clarify this further.

Radim


[1] https://quarkus.io/quarkus-workshops/super-heroes/

On 24. 05. 23 17:13, Divij Vaidya wrote:
Caution: This email originated from outside of the organization. Do not
click links or open attachments unless you recognize the sender and know
the content is safe.

Hey Radim

After reading the KIP, I am still not sure about the motivation for this
change. The bottleneck in starting a producer on Kafka is setup of the
network connection with the broker (since it performs SSL + AuthN). From
what I understand, checkpoint and restore is not going to help with that.
Also, Kafka clients are supposed to be long running clients which aren't
supposed to be destroyed and re-created quickly in a short span. I fail
to
understand the benefit of using CRaC for the producer. Perhaps I am
missing
something very obvious here, but please help me understand more.

I was wondering if you are aware of any Kafka user use cases which could
be
positively impacted by this change? Adding these use cases to the
motivation will greatly help in convincing the community about the impact
of this change.

A use case that I can think of is to aid debugging of a faulty cluster.
If
we get the ability to snapshot a broker when it is mis-behaving, then we
could re-use the snapshot to re-create the exact production setup in a
local environment where we can test things. But that would be a larger
change since capturing the state of a broker is much more than in-memory
Java objects. Perhaps, you can try approaching the usefulness of this new
technology from this lens?

--
Divij Vaidya



On Thu, May 11, 2023 at 2:33 PM Radim Vansa <rva...@azul.com.invalid>
wrote:
Thank you, Divij. I'll give it more time and remind the list in that
timeframe, then.

Radim

On 11. 05. 23 13:47, Divij Vaidya wrote:
Caution: This email originated from outside of the organization. Do not
click links or open attachments unless you recognize the sender and know
the content is safe.
Hey Radim

One of the reasons for the slowdown is preparation of upcoming releases
(the community is currently in code freeze/resolve release blockers
mode)
and preparation for Kafka Summit next week. I would suggest giving
another
2-3 weeks for folks to chime in. I would personally visit this KIP in
the
last week of May.

--
Divij Vaidya



On Thu, May 11, 2023 at 1:34 PM Radim Vansa <rva...@azul.com.invalid>
wrote:
Hello all,

it seems that this KIP did not sparkle much interest, not sure if
people
just don't care or whether there are any objections against the
proposal. What should be the next step, I don't think it has been
discussed enough to proceed with voting.

Cheers,

Radim

On 27. 04. 23 8:39, Radim Vansa wrote:
Caution: This email originated from outside of the organization. Do
not click links or open attachments unless you recognize the sender
and know the content is safe.


Thank you for those questions, as I've mentioned, my knowledge of
Kafka
is quite limited so these are the things that need careful thinking!
Comments inline.

On 26. 04. 23 16:28, Mickael Maison wrote:
Hi Radim,

Thanks for the KIP! CRaC is an interesting project and it could be a
useful feature in Kafka clients.

The KIP is pretty vague in terms of the expected behavior of clients
when checkpointing and restoring. For example:

1. A consumer may have pre-fetched records in memory. When it is
checkpointed, its group will rebalance and another consumer will
consume the same records. When the initial consumer is restored,
will
it process its pre-fetched records and basically reprocess record
already handled by other consumers?
How would the broker (?) know what records were really consumed? I
think
that there must be some form of Two Generals Problem.

The checkpoint should keep as much of the application untouched as it
could. Here, I would expect that the prefetched records would be
consumed after the restore operation as if nothing happened. I can
imagine this could cause some trouble if the data is dependent on the
'external' world, e.g. other members of the cluster. But I wouldn't
break the general guarantees Kafka provides if we can avoid it. We
certainly have an option to do the checkpoint more gracefully,
deregistering with the group (the checkpoint is effectively blocked
by
the notification handler).

If we're talking about using CRaC for boot speedup this is not that
important - when the app is about to be checkpointed it will likely
stop
processing data anyway. For other use-cases (e.g. live migration) it
might matter.


2. Producers may have records in-flight or in the producer buffer
when
they are checkpointed. How do you propose to handle these cases?
If there's something in flight we can wait for the acks.
Alternatively
if the receiver guards against double receive using unique
ids/sequence
numbers we could resend that after restore. As for the data in the
buffer, IMO that can wait until restore.


3. Clients may have loaded plugins such as serializers. These
plugins
may establish network connections too. How are these expected to
automatically reconnect when the application is restored?
If there's an independent pool of connections, it's up to the plugin
author to support CRaC, I doubt there's anything that the generic
code
could do. Also it's likely that the plugins won't need any extension
to
the SPI; these would register its handlers independently (if ordering
matters there are ways to prioritize one resource over another).

Cheers!

Radim Vansa


Thanks,
Mickael


On Wed, Apr 26, 2023 at 8:27 AM Radim Vansa <rva...@azul.com.invalid
wrote:
Hi all,

I haven't seen much reactions on this proposal. Is there any
general
policy regarding dependencies, or a prior decision that would hint
on this?

Thanks!

Radim


On 21. 04. 23 10:10, Radim Vansa wrote:
Caution: This email originated from outside of the organization.
Do
not click links or open attachments unless you recognize the
sender
and know the content is safe.


Thank you,

now to be tracked as KIP-921:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-921%3A+OpenJDK+CRaC+support
Radim

On 20. 04. 23 15:26, Josep Prat wrote:
Hi Radim,
You should have now permissions to create a KIP.

Best,

On Thu, Apr 20, 2023 at 2:22 PM Radim Vansa
<rva...@azul.com.invalid
wrote:

Hello,

upon filing a PR [1] with some initial support for OpenJDK CRaC
[2][3] I
was directed here to raise a KIP (I don't have the permissions
in
wiki/JIRA to create the KIP page yet, though).

In a nutshell, CRaC intends to provide a way to checkpoint
(snapshot)
and persist a running Java application and later restore it,
possibly on
a different computer. This can be used to significantly speed up
the
boot process (from seconds or minutes to tens of milliseconds),
live
replication or migration of the heated up application. This is
not
entirely transparent to the application; the application can
register
for notification when this is happening, and sometime has to
assist
with
that to prevent unexpected state after restore - e.g. close
network
connections and files.

CRaC is not integrated yet into the mainline JDK; JEP is being
prepared,
and users are welcome to try out our builds. However even when
this
gets
into JDK we can't expect users jump onto the latest release
immediately;
therefore we provide a facade package org.crac [4] that
delegates
to
the
implementation, if it is present in the running JDK, or
provides a
no-op
implementation.

With or without the implementation, the support for CRaC in the
application should be designed to have a minimal impact on
performance
(few extra objects, some volatile reads...). On the other hand
the
checkpoint operation itself can be non-trivial in this matter.
Therefore
the main consideration should be about the maintenance costs -
keeping a
small JAR in dependencies and some extra code in networking and
persistence.

The support for CRaC does not have to be all-in for all
components -
maybe it does not make sense to snapshot a Broker. My PR was for
Kafka
Clients because the open network connections need to be handled
in a
web
application (in my case I am enabling CRaC in Quarkus Superheros
[5]
demo). The PR does not handle all possible client-side uses; as
I
am
not
familiar with Kafka I follow the whack-a-mole strategy.

It is possible that the C/R could be handled in a different
layer, e.g.
in Quarkus integration code. However our intent is to push the
changes
as low in the technology stack as possible, to provide the best
fanout
to users without duplicating maintenance efforts. Also having
the
support higher up can be fragile and break encapsulation.

Thank you for your consideration, I hope that you'll appreciate
our
attempt to innovate the Java ecosystem.

Radim Vansa

PS: I'd appreciate if someone could give me the permissions on
wiki to
create a proper KIP! Username: rvansa (both Confluence and
JIRA).
[1] https://github.com/apache/kafka/pull/13619

[2] https://wiki.openjdk.org/display/crac

[3] https://github.com/openjdk/crac

[4] https://github.com/CRaC/org.crac

[5] https://quarkus.io/quarkus-workshops/super-heroes/


--
[image: Aiven] <https://www.aiven.io>

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io <https://www.aiven.io>   |
<https://www.facebook.com/aivencloud>
       <https://www.linkedin.com/company/aiven/>
<https://twitter.com/aiven_io>
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

Reply via email to