Hi all, It definitely would be nice to reuse whatever state is still valid after a topology update, and KIP-307 is indeed likely what we have to do to solve that problem. The discuss thread for the KIP hasn't gotten a lot of traffic recently, so it might be nice if you reply to it with your thoughts to keep things moving.
I agree with Adam that you can try some acrobatics to keep the state around, but it's likely to get very messy very fast. Using "auto offset reset = earliest" would let you rebuild from scratch, but only if your input topics are all set to have long retention periods (otherwise, the data you previously processed will be gone). Probably, you'd use topic compaction as well. It does seem suitable to skip Schema Registry for internal topics. I view the registry as being most important for your interfaces with other systems (topics others have produced for you or you have produced for others). For an internal topic, you are both producer and consumer, so you can embed the schema in your program and just publish the serialized data to the internal topic. Alternatively, I think that the app reset tool should delete the changelog topics anyway, so writing a script to run the app reset tool, and then delete all the app's registry data should be perfectly fine. -John On Thu, Aug 9, 2018 at 9:59 AM Adam Bellemare <adam.bellem...@gmail.com> wrote: > Hi Cédric > > 1. You should be able to rebuild your internal state after doing a full > reset (Deleting all internal topics). The application will just need to > consume data from the beginning of the input streams and rebuild the state > accordingly. If you don't want to lose the state, or if it is too expensive > to rebuild the entire application, you may wish to look into external > storage options instead. In my experience, this begins to get more > complicated though, so I prefer to just rebuild the state. > > 2. I use Scala for my Kafka Streams work, in particular this very helpful > library which converts case classes into Avro format and back: > https://github.com/sksamuel/avro4s > > In my solution, I only send the encoded Avro data but not the schema. It is > up to the developer of the application to know which data is being stored > in which internal topic. This proves to be very easy since I'm only ever > handling Plain-Ol-Java/Scala-Objects in the Kafka Streams DSL. > I should add that you are not required to use any schema for internal > topics, but I much prefer to anyways because I find it the simplest and > least error prone out of all my options. > In terms of performance, there is no difference from that of using the > schema registry ones since the process is the same. > > Hope this helps a bit. > > Adam > > > > > > > On Thu, Aug 9, 2018 at 4:33 AM, Cedric BERTRAND < > bertrandcedric....@gmail.com> wrote: > > > Thanks John and Adam for your answer, > > > > After investigation, I am exactly in the case you describe John. > > After a modification in my toplogy, a processor KEY-SELECT get the same > > number of an old processor KEY-SELECT with the associated repartition > > topic. > > We use the app reset tool to clean all internal topic but the tool > doesn't > > clean the schema registry. > > > > In see, 2 solutions to solve this problem when it occured. > > > > 1. Clean all internal topic and subjects in schema registry > > The problem with this solution is that I also clean internal-changelog > > topic. Sometime I don't want to loose this internal state. > > > > 2. I don't use schema registry for internal topic (the solution exposed > by > > Adam) > > Without schema registry, do I send all the object (data + schema avro) > into > > Kafka ? > > What about performance with the solution ? > > > > > > The solution to give an explicit name to all operator seam to be > > interesting to solve this problem. > > > > I found this KIP which propose to implement this solution. > > > > KIP-307: Allow to define custom processor names with KStreams DSL > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL> > > > > I know that the probalilty a KEY-SELECT node get the same number than an > > old one is very low. > > But when it occured, it's extremely hard to understand. > > > > Thanks for your time, > > > > Cédric > > > > > > > > > > > > Le mer. 8 août 2018 à 22:34, John Roesler <j...@confluent.io> a écrit : > > > > > Hi Cédric, > > > > > > The suffix is generated when we build the topology in such a way to > > > guarantee each node/interna-topic/state-store gets a unique name. > > > > > > Generally speaking, it is unsafe to modify the topology and restart it. > > We > > > recommend using the app reset tool whenever you update your topology. > > > > > > That said, some changes to the topology might be safe, so your mileage > > may > > > vary; just be aware that changing the topology in place will > potentially > > > produce corrupt data. > > > > > > The main example I'd give is if you were to restructure your topology > and > > > you wind up with some other node type, like a "KSTREAM-TRANSFORM-" > > getting > > > number 99, then you won't have a problem. The new node will create > > whatever > > > internal state/topics are needed with a non-colliding name. But if you > > > restructured the topology and a *different* key select happened to get > > > number 99, then you'd have a big problem. Streams would have no idea > that > > > the existing repartition topic was for a different key select; it would > > > just start using the existing topic. But this means that the > repartition > > > topic would be half one set of data and half another. Clearly, this is > > not > > > good. > > > > > > It sounds to me like this is maybe what happened to you. > > > > > > We have been discussing various mechanisms by which we could support > > > modifying the topology in place. Typically, this would involve giving > > each > > > operator a semantic name so that the internal names would be related to > > > what the nodes are doing, not the order in which the nodes are created. > > > > > > At the very least, we'd like to have some way of detecting that the > > > topology has changed during a restart and refusing to start up, to > > protect > > > the integrity of your data. > > > > > > I hope this helps, > > > -John > > > > > > On Wed, Aug 8, 2018 at 12:51 PM Adam Bellemare < > adam.bellem...@gmail.com > > > > > > wrote: > > > > > > > Hi Cédric > > > > > > > > I do not know how the topology names are chosen, but provided that > you > > > > didn't change any of the topology then new topics will not be created > > or > > > > require alteration. > > > > > > > > If you modify the topology then the naming can indeed change, but it > > > would > > > > then create a new internal topic and there would be no compatibility > > > issue. > > > > It could very well be that your topology was modified in such a way > > that > > > > another, different internal topic is attempting to register an > > > incompatible > > > > schema. In this case though, I would expect that the error > information > > > > returned from the schema registry registration process to highlight > > > exactly > > > > what the failure is. It has been a while since we run into one of > these > > > so > > > > I could be wrong on that front though. > > > > > > > > My recommendation to you is to create a simple "InternalSerde" for > your > > > > Avro classes used in internal topics, such that you do *not* register > > > them > > > > to the schema registry. I have found that registering internal topics > > to > > > > the schema registry turns it into a garbage dump and prevents > > developers > > > > from making independent changes to their internal schemas. The rule > of > > > > thumb we use is that we only register schemas to the schema registry > > when > > > > the events leave the application's bounded context - ie: final output > > > > events only. > > > > > > > > Hope this helps, > > > > > > > > Adam > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 8, 2018 at 11:14 AM, Cedric BERTRAND < > > > > bertrandcedric....@gmail.com> wrote: > > > > > > > > > Within the Kafka Stream topology, internal topic are created. > > > > > For this internal topics, schema avro for key and value are > > registered > > > > into > > > > > schema registry. > > > > > > > > > > For the topic > > > internal-MYAPPS-KSTREAM-KEY-SELECT-0000000099-repartition, > > > > I > > > > > have 2 subjects into schema registry : > > > > > - internal-MYAPPS-KSTREAM-KEY-SELECT-0000000099-repartition-key > > > > > - internal-MYAPPS-KSTREAM-KEY-SELECT-0000000099-repartition-value > > > > > > > > > > My questions are : > > > > > > > > > > How Kafka create the internal topology name (how the suffix number > is > > > > > changed) ? > > > > > > > > > > When if I change the processing into the toplogy => change in the > > DAG ? > > > > > - If I have a name with 0000000099, do I have the same number > after a > > > > > modification of the topology ? > > > > > - If not, is Kafka Stream allowed to use an already used number ? > > > > > > > > > > > > > > > I ask this question because I have an incompatible schema on an > > > internal > > > > > topic and from my point of view, no changes have been made on the > > > schema. > > > > > The only change is a modification on the topology which change the > > DAG > > > > and > > > > > maybe the name of internal topic. > > > > > > > > > > > > > > > Thanks for your time, > > > > > > > > > > Cédric > > > > > > > > > > > > > > >