Hi Dylan, My feeling is that it is going to be challenging to add a layer of indirection to the Kafka indexing stuff while maintaining its exactly-onceness. The exactly-onceness is based around tracking the specific Kafka offsets that are read by each task, and is tightly linked to which partitions are assigned to which task. I think what you describe would be doable, but it would add a lot of complexity to a code base that is already pretty complex. If I were in your position I'd try repartitioning the original Kafka topic into a second Kafka topic using a good key. It doubles the traffic at Kafka but at least it's simple! It follows a general rule of making Kafka do as much of the work as possible.
What do you think? On Thu, May 3, 2018 at 1:53 PM, Dylan Wylie <dylanwy...@gmail.com> wrote: > Hey Gian, > > Thanks for your response! > > Automatic compaction looks great and it'll definitely help out with general > efficiency. I'm also excited for the second PR, we've some use cases it'll > be helpful for. > > Even with those I still think there may be value in something along of the > lines of my suggestion. > > To present a real life example, we have a Kafka topic which emits ~200k > messages/second, each of these messages has around 300 fields (in a nested > avro structure) and its partitioned on some key which is not ingested into > Druid. We produce a datasource from that message with around 40-50 fields. > > Using the Kafka Indexing Service, we measured that a single indexing task > extracting the fields for that datasource can consume 7k messages/second. > Meaning in order to keep up we have to run around 30 indexing tasks. This > results in around 30 segments at around 300mb each for each hour of data > compared to 6 segments at around 650mb each when batch ingested, so the > cluster's holding up to 3x as much data as it might do if the data was > ingested into a smaller number of segments. > (I realise that Tranquility & Batch Ingestion partitions data by a hash of > all the fields, which the KIS can't so will always have less optimal > segments unless the Kafka topic is partitioned using a good key) > > Profiling the indexing peon shows that the largest chunk of time is being > spent deserialising the large avro message. So we think that if we could > split the current ingestion process into the two job-types as described we > could scale one up to handle consuming and parsing the message and another > could manage appending the rows to a (smaller) set of segments. > > From some initial playing around we've noticed that > https://github.com/druid-io/druid/pull/5261 introduced a stand-alone > realtime task using the newer appenderator API that could potentially be > built on top of. > > It might be simpler for us to introduce a stream processor which parses and > extracts the parts of the messages that are needed and emit only those > fields to be used in a topic partitioned on something that aids roll-up. > However separating parsing data from indexing it feels like it might be > more generally useful and avoid the extra work in maintaining a stream > processor. (And it seems like a fun way to get hacking with the Druid > codebase!). > > Sorry for the long email, thoughts or comments appreciated! > > Best regards, > Dylan > > > > On 3 May 2018 at 02:32, Gian Merlino <g...@apache.org> wrote: > > > Hey Dylan, > > > > Great to hear that your experience has generally been positive! > > > > What do you think about using compaction for this? (The feature added in > > https://github.com/druid-io/druid/pull/5102.) The idea with compaction > was > > that it would enable a background process that goes through freshly > > inserted segments and re-partitions them optimally. > > > > For creating multiple datasources out of one topic, there is a PR wending > > its way through review right now that is relevant: https://github.com/ > > druid-io/druid/pull/5556. > > > > On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie <dylanwy...@gmail.com> > wrote: > > > > > Hey there, > > > > > > With the recent improvements to the Kafka Indexing Service we've been > > > migrating over from Tranquility and have had a very positive > experience. > > > > > > However one of the downsides to using the KIS, is that the number of > > > segments generated for each period can't be smaller than the number of > > > tasks required to consume the queue. So if you have a use case > involving > > > ingesting from a topic with a high rate of large messages but your spec > > > only extracts a small proportion of fields you may be forced to run a > > large > > > number of tasks that generate very small segments. > > > > > > This email is to check in for peoples thoughts on separating consuming > > and > > > parsing messages from indexing and segment management, in a similar > > fashion > > > to how Tranquility operates. > > > > > > Potentially - we could have the supervisor spawn two types of task that > > can > > > be configured independently, a consumer and an appender. The consumer > > would > > > parse the message based on the spec and then pass the results to the > > > appropriate appender task which builds the segment. Another advantage > to > > > this approach is that it would allow creating multiple datasources > from a > > > single consumer group rather than ingesting the same topic multiple > > times. > > > > > > I'm quite new to the codebase so all thoughts and comments are welcome! > > > > > > Best regards, > > > Dylan > > > > > >