> How large is the record buffer? Is it configurable? I seem to have just discovered this answer to this: buffered.records.per.partition
On Sat, Dec 9, 2017 at 2:48 PM, Dmitry Minkovsky <dminkov...@gmail.com> wrote: > Hi Matthias, yes that definitely helps. A few thoughts inline below. > > Thank you! > > On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Hard to give a generic answer. >> >> 1. We recommend to over-partitions your input topics to start with (to >> avoid that you need to add new partitions later on); problem avoidance >> is the best strategy. There will be some overhead for this obviously on >> the broker side, but it's not too big. >> > > Yes, I will definitely be doing this. > > >> >> 2. Not sure why you would need a new cluster? You can just create a new >> topic in the same cluster and let Kafka Streams read from there. >> > > Motivated by fear of disturbing/manipulating a production cluster and the > relative ease of putting up a new cluster. Perhaps that fear is irrational. > I could alternatively just prefix topics. > > >> >> 3. Depending on your state requirements, you could also run two >> applications in parallel -- the new one reads from the new input topic >> with more partitions and you configure your producer to write to the new >> topic (or maybe even to dual writes to both). If your new application is >> ramped up, you can stop the old one. >> > > Yes, this is my plan for migrations. If I could run it past you: > > (i) Write input topics from the old prefix to the new prefix. > (ii) Start the new Kafka Streams application against the new prefix. > (iii) When the two applications are in sync, stop writing to the old > topics > > Since I will be copying from an old prefix to new prefix, it seems > essential here to have timestamps embedded in the data records along with a > custom timestamp extractor. > > I really wish I could get some more flavor on "Flow Control With > Timestamps > <https://docs.confluent.io/current/streams/architecture.html#flow-control-with-timestamps>" > in this regard. Assuming my timestamps are monotonically increasing within > each input topic, from my reading of that section it still appears that the > result of reprocessing input topics is non-deterministic beyond the > "records in its stream record buffer". Some seemingly crucial sentences: > > > *This flow control is best-effort because it is not always possible to > strictly enforce execution order across streams by record timestamp; in > fact, in order to enforce strict execution ordering, one must either wait > until the system has received all the records from all streams (which may > be quite infeasible in practice) or inject additional information about > timestamp boundaries or heuristic estimates such as MillWheel’s watermarks.* > > > Practically, how am I to understand this? How large is the record buffer? > Is it configurable? > > For example, suppose I am re-processing an inner join on partitions P1 > (left) and P2 (right). In the original processing, record K1V1T1 was > recorded onto P1, then some time laster record K1V2T2 was recorded onto P2. > As a result, K1V2T2 was joined with K1V1T1. Now, during re-processing, P1 > and P2 contain historical data and the Kafka Streams consumers can read P2 > before P1. If the consumer reads P2 before P1, will the task still > properly align these two records given their timestamps for the correct > inner join, assuming both records within the record buffer? I've > experimented with this, but unfortunately I didn't have time to really set > up good experiments to satisfy myself. > > >> 4. If you really need to add new partitions, you need to fix up all >> topics manually -- including all topics Kafka Streams created for you. >> Adding partitions messes up all your state shared as key-based >> partitioning changes. This implies that you application must be stopped! >> Thus, if you have zero downtime requirements you can't do this at all. >> >> 5. If you have a stateless application all those issues go away though >> and you can even add new partitions during runtime. >> >> > Stateless in what sense? Kafka Streams seems to be all about aligning and > manipulating state to create more state. Are you referring to internal > state, specifically? > > > >> >> Hope this helps. >> >> >> -Matthias >> >> >> >> On 12/8/17 11:02 AM, Dmitry Minkovsky wrote: >> > I am about to put a topology into production and I am concerned that I >> > don't know how to repartition/rebalance the topics in the event that I >> need >> > to add more partitions. >> > >> > My inclination is that I should spin up a new cluster and run some kind >> of >> > consumer/producer combination that takes data from the previous cluster >> and >> > writes it to the new cluster. A new instance of the Kafka Streams >> > application then works against this new cluster. But I'm not sure how to >> > best execute this, or whether this approach is sound at all. I am >> imagining >> > many things may go wrong. Without going into further speculation, what >> is >> > the best way to do this? >> > >> > Thank you, >> > Dmitry >> > >> >> >