Kafka Indexing Service - Decoupling segments from consumer tasks
Hey there, With the recent improvements to the Kafka Indexing Service we've been migrating over from Tranquility and have had a very positive experience. However one of the downsides to using the KIS, is that the number of segments generated for each period can't be smaller than the number of tasks required to consume the queue. So if you have a use case involving ingesting from a topic with a high rate of large messages but your spec only extracts a small proportion of fields you may be forced to run a large number of tasks that generate very small segments. This email is to check in for peoples thoughts on separating consuming and parsing messages from indexing and segment management, in a similar fashion to how Tranquility operates. Potentially - we could have the supervisor spawn two types of task that can be configured independently, a consumer and an appender. The consumer would parse the message based on the spec and then pass the results to the appropriate appender task which builds the segment. Another advantage to this approach is that it would allow creating multiple datasources from a single consumer group rather than ingesting the same topic multiple times. I'm quite new to the codebase so all thoughts and comments are welcome! Best regards, Dylan
Re: Kafka Indexing Service - Decoupling segments from consumer tasks
Hey Dylan, Great to hear that your experience has generally been positive! What do you think about using compaction for this? (The feature added in https://github.com/druid-io/druid/pull/5102.) The idea with compaction was that it would enable a background process that goes through freshly inserted segments and re-partitions them optimally. For creating multiple datasources out of one topic, there is a PR wending its way through review right now that is relevant: https://github.com/ druid-io/druid/pull/5556. On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie wrote: > Hey there, > > With the recent improvements to the Kafka Indexing Service we've been > migrating over from Tranquility and have had a very positive experience. > > However one of the downsides to using the KIS, is that the number of > segments generated for each period can't be smaller than the number of > tasks required to consume the queue. So if you have a use case involving > ingesting from a topic with a high rate of large messages but your spec > only extracts a small proportion of fields you may be forced to run a large > number of tasks that generate very small segments. > > This email is to check in for peoples thoughts on separating consuming and > parsing messages from indexing and segment management, in a similar fashion > to how Tranquility operates. > > Potentially - we could have the supervisor spawn two types of task that can > be configured independently, a consumer and an appender. The consumer would > parse the message based on the spec and then pass the results to the > appropriate appender task which builds the segment. Another advantage to > this approach is that it would allow creating multiple datasources from a > single consumer group rather than ingesting the same topic multiple times. > > I'm quite new to the codebase so all thoughts and comments are welcome! > > Best regards, > Dylan >
Re: Kafka Indexing Service - Decoupling segments from consumer tasks
Hey Gian, Thanks for your response! Automatic compaction looks great and it'll definitely help out with general efficiency. I'm also excited for the second PR, we've some use cases it'll be helpful for. Even with those I still think there may be value in something along of the lines of my suggestion. To present a real life example, we have a Kafka topic which emits ~200k messages/second, each of these messages has around 300 fields (in a nested avro structure) and its partitioned on some key which is not ingested into Druid. We produce a datasource from that message with around 40-50 fields. Using the Kafka Indexing Service, we measured that a single indexing task extracting the fields for that datasource can consume 7k messages/second. Meaning in order to keep up we have to run around 30 indexing tasks. This results in around 30 segments at around 300mb each for each hour of data compared to 6 segments at around 650mb each when batch ingested, so the cluster's holding up to 3x as much data as it might do if the data was ingested into a smaller number of segments. (I realise that Tranquility & Batch Ingestion partitions data by a hash of all the fields, which the KIS can't so will always have less optimal segments unless the Kafka topic is partitioned using a good key) Profiling the indexing peon shows that the largest chunk of time is being spent deserialising the large avro message. So we think that if we could split the current ingestion process into the two job-types as described we could scale one up to handle consuming and parsing the message and another could manage appending the rows to a (smaller) set of segments. >From some initial playing around we've noticed that https://github.com/druid-io/druid/pull/5261 introduced a stand-alone realtime task using the newer appenderator API that could potentially be built on top of. It might be simpler for us to introduce a stream processor which parses and extracts the parts of the messages that are needed and emit only those fields to be used in a topic partitioned on something that aids roll-up. However separating parsing data from indexing it feels like it might be more generally useful and avoid the extra work in maintaining a stream processor. (And it seems like a fun way to get hacking with the Druid codebase!). Sorry for the long email, thoughts or comments appreciated! Best regards, Dylan On 3 May 2018 at 02:32, Gian Merlino wrote: > Hey Dylan, > > Great to hear that your experience has generally been positive! > > What do you think about using compaction for this? (The feature added in > https://github.com/druid-io/druid/pull/5102.) The idea with compaction was > that it would enable a background process that goes through freshly > inserted segments and re-partitions them optimally. > > For creating multiple datasources out of one topic, there is a PR wending > its way through review right now that is relevant: https://github.com/ > druid-io/druid/pull/5556. > > On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie wrote: > > > Hey there, > > > > With the recent improvements to the Kafka Indexing Service we've been > > migrating over from Tranquility and have had a very positive experience. > > > > However one of the downsides to using the KIS, is that the number of > > segments generated for each period can't be smaller than the number of > > tasks required to consume the queue. So if you have a use case involving > > ingesting from a topic with a high rate of large messages but your spec > > only extracts a small proportion of fields you may be forced to run a > large > > number of tasks that generate very small segments. > > > > This email is to check in for peoples thoughts on separating consuming > and > > parsing messages from indexing and segment management, in a similar > fashion > > to how Tranquility operates. > > > > Potentially - we could have the supervisor spawn two types of task that > can > > be configured independently, a consumer and an appender. The consumer > would > > parse the message based on the spec and then pass the results to the > > appropriate appender task which builds the segment. Another advantage to > > this approach is that it would allow creating multiple datasources from a > > single consumer group rather than ingesting the same topic multiple > times. > > > > I'm quite new to the codebase so all thoughts and comments are welcome! > > > > Best regards, > > Dylan > > >
Re: Kafka Indexing Service - Decoupling segments from consumer tasks
Hi Dylan, My feeling is that it is going to be challenging to add a layer of indirection to the Kafka indexing stuff while maintaining its exactly-onceness. The exactly-onceness is based around tracking the specific Kafka offsets that are read by each task, and is tightly linked to which partitions are assigned to which task. I think what you describe would be doable, but it would add a lot of complexity to a code base that is already pretty complex. If I were in your position I'd try repartitioning the original Kafka topic into a second Kafka topic using a good key. It doubles the traffic at Kafka but at least it's simple! It follows a general rule of making Kafka do as much of the work as possible. What do you think? On Thu, May 3, 2018 at 1:53 PM, Dylan Wylie wrote: > Hey Gian, > > Thanks for your response! > > Automatic compaction looks great and it'll definitely help out with general > efficiency. I'm also excited for the second PR, we've some use cases it'll > be helpful for. > > Even with those I still think there may be value in something along of the > lines of my suggestion. > > To present a real life example, we have a Kafka topic which emits ~200k > messages/second, each of these messages has around 300 fields (in a nested > avro structure) and its partitioned on some key which is not ingested into > Druid. We produce a datasource from that message with around 40-50 fields. > > Using the Kafka Indexing Service, we measured that a single indexing task > extracting the fields for that datasource can consume 7k messages/second. > Meaning in order to keep up we have to run around 30 indexing tasks. This > results in around 30 segments at around 300mb each for each hour of data > compared to 6 segments at around 650mb each when batch ingested, so the > cluster's holding up to 3x as much data as it might do if the data was > ingested into a smaller number of segments. > (I realise that Tranquility & Batch Ingestion partitions data by a hash of > all the fields, which the KIS can't so will always have less optimal > segments unless the Kafka topic is partitioned using a good key) > > Profiling the indexing peon shows that the largest chunk of time is being > spent deserialising the large avro message. So we think that if we could > split the current ingestion process into the two job-types as described we > could scale one up to handle consuming and parsing the message and another > could manage appending the rows to a (smaller) set of segments. > > From some initial playing around we've noticed that > https://github.com/druid-io/druid/pull/5261 introduced a stand-alone > realtime task using the newer appenderator API that could potentially be > built on top of. > > It might be simpler for us to introduce a stream processor which parses and > extracts the parts of the messages that are needed and emit only those > fields to be used in a topic partitioned on something that aids roll-up. > However separating parsing data from indexing it feels like it might be > more generally useful and avoid the extra work in maintaining a stream > processor. (And it seems like a fun way to get hacking with the Druid > codebase!). > > Sorry for the long email, thoughts or comments appreciated! > > Best regards, > Dylan > > > > On 3 May 2018 at 02:32, Gian Merlino wrote: > > > Hey Dylan, > > > > Great to hear that your experience has generally been positive! > > > > What do you think about using compaction for this? (The feature added in > > https://github.com/druid-io/druid/pull/5102.) The idea with compaction > was > > that it would enable a background process that goes through freshly > > inserted segments and re-partitions them optimally. > > > > For creating multiple datasources out of one topic, there is a PR wending > > its way through review right now that is relevant: https://github.com/ > > druid-io/druid/pull/5556. > > > > On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie > wrote: > > > > > Hey there, > > > > > > With the recent improvements to the Kafka Indexing Service we've been > > > migrating over from Tranquility and have had a very positive > experience. > > > > > > However one of the downsides to using the KIS, is that the number of > > > segments generated for each period can't be smaller than the number of > > > tasks required to consume the queue. So if you have a use case > involving > > > ingesting from a topic with a high rate of large messages but your spec > > > only extracts a small proportion of fields you may be forced to run a > > large > > > number of tasks that generate very small segments. > > > > > > This email is to check in for peoples thoughts on separating consuming > > and > > > parsing messages from indexing and segment management, in a similar > > fashion > > > to how Tranquility operates. > > > > > > Potentially - we could have the supervisor spawn two types of task that > > can > > > be configured independently, a consumer and an appender. The consumer > > would > > > parse
Re: Kafka Indexing Service - Decoupling segments from consumer tasks
Hey Gian, Thanks for the feedback and taking the time to read through the proposal. Yeah your suggestion is much less complex and I think makes more sense! Cheers, Dylan On 8 May 2018 at 22:06, Gian Merlino wrote: > Hi Dylan, > > My feeling is that it is going to be challenging to add a layer of > indirection to the Kafka indexing stuff while maintaining its > exactly-onceness. The exactly-onceness is based around tracking the > specific Kafka offsets that are read by each task, and is tightly linked to > which partitions are assigned to which task. I think what you describe > would be doable, but it would add a lot of complexity to a code base that > is already pretty complex. If I were in your position I'd try > repartitioning the original Kafka topic into a second Kafka topic using a > good key. It doubles the traffic at Kafka but at least it's simple! It > follows a general rule of making Kafka do as much of the work as possible. > > What do you think? > > On Thu, May 3, 2018 at 1:53 PM, Dylan Wylie wrote: > > > Hey Gian, > > > > Thanks for your response! > > > > Automatic compaction looks great and it'll definitely help out with > general > > efficiency. I'm also excited for the second PR, we've some use cases > it'll > > be helpful for. > > > > Even with those I still think there may be value in something along of > the > > lines of my suggestion. > > > > To present a real life example, we have a Kafka topic which emits ~200k > > messages/second, each of these messages has around 300 fields (in a > nested > > avro structure) and its partitioned on some key which is not ingested > into > > Druid. We produce a datasource from that message with around 40-50 > fields. > > > > Using the Kafka Indexing Service, we measured that a single indexing task > > extracting the fields for that datasource can consume 7k messages/second. > > Meaning in order to keep up we have to run around 30 indexing tasks. This > > results in around 30 segments at around 300mb each for each hour of data > > compared to 6 segments at around 650mb each when batch ingested, so the > > cluster's holding up to 3x as much data as it might do if the data was > > ingested into a smaller number of segments. > > (I realise that Tranquility & Batch Ingestion partitions data by a hash > of > > all the fields, which the KIS can't so will always have less optimal > > segments unless the Kafka topic is partitioned using a good key) > > > > Profiling the indexing peon shows that the largest chunk of time is being > > spent deserialising the large avro message. So we think that if we could > > split the current ingestion process into the two job-types as described > we > > could scale one up to handle consuming and parsing the message and > another > > could manage appending the rows to a (smaller) set of segments. > > > > From some initial playing around we've noticed that > > https://github.com/druid-io/druid/pull/5261 introduced a stand-alone > > realtime task using the newer appenderator API that could potentially be > > built on top of. > > > > It might be simpler for us to introduce a stream processor which parses > and > > extracts the parts of the messages that are needed and emit only those > > fields to be used in a topic partitioned on something that aids roll-up. > > However separating parsing data from indexing it feels like it might be > > more generally useful and avoid the extra work in maintaining a stream > > processor. (And it seems like a fun way to get hacking with the Druid > > codebase!). > > > > Sorry for the long email, thoughts or comments appreciated! > > > > Best regards, > > Dylan > > > > > > > > On 3 May 2018 at 02:32, Gian Merlino wrote: > > > > > Hey Dylan, > > > > > > Great to hear that your experience has generally been positive! > > > > > > What do you think about using compaction for this? (The feature added > in > > > https://github.com/druid-io/druid/pull/5102.) The idea with compaction > > was > > > that it would enable a background process that goes through freshly > > > inserted segments and re-partitions them optimally. > > > > > > For creating multiple datasources out of one topic, there is a PR > wending > > > its way through review right now that is relevant: https://github.com/ > > > druid-io/druid/pull/5556. > > > > > > On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie > > wrote: > > > > > > > Hey there, > > > > > > > > With the recent improvements to the Kafka Indexing Service we've been > > > > migrating over from Tranquility and have had a very positive > > experience. > > > > > > > > However one of the downsides to using the KIS, is that the number of > > > > segments generated for each period can't be smaller than the number > of > > > > tasks required to consume the queue. So if you have a use case > > involving > > > > ingesting from a topic with a high rate of large messages but your > spec > > > > only extracts a small proportion of fields you may be forced to run a > > > large > > > > nu