Kafka Indexing Service - Decoupling segments from consumer tasks

2018-05-02 Thread Dylan Wylie
Hey there,

With the recent improvements to the Kafka Indexing Service we've been
migrating over from Tranquility and have had a very positive experience.

However one of the downsides to using the KIS, is that the number of
segments generated for each period can't be smaller than the number of
tasks required to consume the queue. So if you have a use case involving
ingesting from a topic with a high rate of large messages but your spec
only extracts a small proportion of fields you may be forced to run a large
number of tasks that generate very small segments.

This email is to check in for peoples thoughts on separating consuming and
parsing messages from indexing and segment management, in a similar fashion
to how Tranquility operates.

Potentially - we could have the supervisor spawn two types of task that can
be configured independently, a consumer and an appender. The consumer would
parse the message based on the spec and then pass the results to the
appropriate appender task which builds the segment. Another advantage to
this approach is that it would allow creating multiple datasources from a
single consumer group rather than ingesting the same topic multiple times.

I'm quite new to the codebase so all thoughts and comments are welcome!

Best regards,
Dylan


Re: Kafka Indexing Service - Decoupling segments from consumer tasks

2018-05-02 Thread Gian Merlino
Hey Dylan,

Great to hear that your experience has generally been positive!

What do you think about using compaction for this? (The feature added in
https://github.com/druid-io/druid/pull/5102.) The idea with compaction was
that it would enable a background process that goes through freshly
inserted segments and re-partitions them optimally.

For creating multiple datasources out of one topic, there is a PR wending
its way through review right now that is relevant: https://github.com/
druid-io/druid/pull/5556.

On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie  wrote:

> Hey there,
>
> With the recent improvements to the Kafka Indexing Service we've been
> migrating over from Tranquility and have had a very positive experience.
>
> However one of the downsides to using the KIS, is that the number of
> segments generated for each period can't be smaller than the number of
> tasks required to consume the queue. So if you have a use case involving
> ingesting from a topic with a high rate of large messages but your spec
> only extracts a small proportion of fields you may be forced to run a large
> number of tasks that generate very small segments.
>
> This email is to check in for peoples thoughts on separating consuming and
> parsing messages from indexing and segment management, in a similar fashion
> to how Tranquility operates.
>
> Potentially - we could have the supervisor spawn two types of task that can
> be configured independently, a consumer and an appender. The consumer would
> parse the message based on the spec and then pass the results to the
> appropriate appender task which builds the segment. Another advantage to
> this approach is that it would allow creating multiple datasources from a
> single consumer group rather than ingesting the same topic multiple times.
>
> I'm quite new to the codebase so all thoughts and comments are welcome!
>
> Best regards,
> Dylan
>


Re: Kafka Indexing Service - Decoupling segments from consumer tasks

2018-05-03 Thread Dylan Wylie
Hey Gian,

Thanks for your response!

Automatic compaction looks great and it'll definitely help out with general
efficiency. I'm also excited for the second PR, we've some use cases it'll
be helpful for.

Even with those I still think there may be value in something along of the
lines of my suggestion.

To present a real life example, we have a Kafka topic which emits ~200k
messages/second, each of these messages has around 300 fields (in a nested
avro structure) and its partitioned on some key which is not ingested into
Druid. We produce a datasource from that message with around 40-50 fields.

Using the Kafka Indexing Service, we measured that a single indexing task
extracting the fields for that datasource can consume 7k messages/second.
Meaning in order to keep up we have to run around 30 indexing tasks. This
results in around 30 segments at around 300mb each for each hour of data
compared to 6 segments at around 650mb each when batch ingested, so the
cluster's holding up to 3x as much data as it might do if the data was
ingested into a smaller number of segments.
(I realise that Tranquility & Batch Ingestion partitions data by a hash of
all the fields, which the KIS can't so will always have less optimal
segments unless the Kafka topic is partitioned using a good key)

Profiling the indexing peon shows that the largest chunk of time is being
spent deserialising the large avro message. So we think that if we could
split the current ingestion process into the two job-types as described we
could scale one up to handle consuming and parsing the message and another
could manage appending the rows to a (smaller) set of segments.

>From some initial playing around we've noticed that
https://github.com/druid-io/druid/pull/5261 introduced a stand-alone
realtime task using the newer appenderator API that could potentially be
built on top of.

It might be simpler for us to introduce a stream processor which parses and
extracts the parts of the messages that are needed and emit only those
fields to be used in a topic partitioned on something that aids roll-up.
However separating parsing data from indexing it feels like it might be
more generally useful and avoid the extra work in maintaining a stream
processor. (And it seems like a fun way to get hacking with the Druid
codebase!).

Sorry for the long email, thoughts or comments appreciated!

Best regards,
Dylan



On 3 May 2018 at 02:32, Gian Merlino  wrote:

> Hey Dylan,
>
> Great to hear that your experience has generally been positive!
>
> What do you think about using compaction for this? (The feature added in
> https://github.com/druid-io/druid/pull/5102.) The idea with compaction was
> that it would enable a background process that goes through freshly
> inserted segments and re-partitions them optimally.
>
> For creating multiple datasources out of one topic, there is a PR wending
> its way through review right now that is relevant: https://github.com/
> druid-io/druid/pull/5556.
>
> On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie  wrote:
>
> > Hey there,
> >
> > With the recent improvements to the Kafka Indexing Service we've been
> > migrating over from Tranquility and have had a very positive experience.
> >
> > However one of the downsides to using the KIS, is that the number of
> > segments generated for each period can't be smaller than the number of
> > tasks required to consume the queue. So if you have a use case involving
> > ingesting from a topic with a high rate of large messages but your spec
> > only extracts a small proportion of fields you may be forced to run a
> large
> > number of tasks that generate very small segments.
> >
> > This email is to check in for peoples thoughts on separating consuming
> and
> > parsing messages from indexing and segment management, in a similar
> fashion
> > to how Tranquility operates.
> >
> > Potentially - we could have the supervisor spawn two types of task that
> can
> > be configured independently, a consumer and an appender. The consumer
> would
> > parse the message based on the spec and then pass the results to the
> > appropriate appender task which builds the segment. Another advantage to
> > this approach is that it would allow creating multiple datasources from a
> > single consumer group rather than ingesting the same topic multiple
> times.
> >
> > I'm quite new to the codebase so all thoughts and comments are welcome!
> >
> > Best regards,
> > Dylan
> >
>


Re: Kafka Indexing Service - Decoupling segments from consumer tasks

2018-05-08 Thread Gian Merlino
Hi Dylan,

My feeling is that it is going to be challenging to add a layer of
indirection to the Kafka indexing stuff while maintaining its
exactly-onceness. The exactly-onceness is based around tracking the
specific Kafka offsets that are read by each task, and is tightly linked to
which partitions are assigned to which task. I think what you describe
would be doable, but it would add a lot of complexity to a code base that
is already pretty complex. If I were in your position I'd try
repartitioning the original Kafka topic into a second Kafka topic using a
good key. It doubles the traffic at Kafka but at least it's simple! It
follows a general rule of making Kafka do as much of the work as possible.

What do you think?

On Thu, May 3, 2018 at 1:53 PM, Dylan Wylie  wrote:

> Hey Gian,
>
> Thanks for your response!
>
> Automatic compaction looks great and it'll definitely help out with general
> efficiency. I'm also excited for the second PR, we've some use cases it'll
> be helpful for.
>
> Even with those I still think there may be value in something along of the
> lines of my suggestion.
>
> To present a real life example, we have a Kafka topic which emits ~200k
> messages/second, each of these messages has around 300 fields (in a nested
> avro structure) and its partitioned on some key which is not ingested into
> Druid. We produce a datasource from that message with around 40-50 fields.
>
> Using the Kafka Indexing Service, we measured that a single indexing task
> extracting the fields for that datasource can consume 7k messages/second.
> Meaning in order to keep up we have to run around 30 indexing tasks. This
> results in around 30 segments at around 300mb each for each hour of data
> compared to 6 segments at around 650mb each when batch ingested, so the
> cluster's holding up to 3x as much data as it might do if the data was
> ingested into a smaller number of segments.
> (I realise that Tranquility & Batch Ingestion partitions data by a hash of
> all the fields, which the KIS can't so will always have less optimal
> segments unless the Kafka topic is partitioned using a good key)
>
> Profiling the indexing peon shows that the largest chunk of time is being
> spent deserialising the large avro message. So we think that if we could
> split the current ingestion process into the two job-types as described we
> could scale one up to handle consuming and parsing the message and another
> could manage appending the rows to a (smaller) set of segments.
>
> From some initial playing around we've noticed that
> https://github.com/druid-io/druid/pull/5261 introduced a stand-alone
> realtime task using the newer appenderator API that could potentially be
> built on top of.
>
> It might be simpler for us to introduce a stream processor which parses and
> extracts the parts of the messages that are needed and emit only those
> fields to be used in a topic partitioned on something that aids roll-up.
> However separating parsing data from indexing it feels like it might be
> more generally useful and avoid the extra work in maintaining a stream
> processor. (And it seems like a fun way to get hacking with the Druid
> codebase!).
>
> Sorry for the long email, thoughts or comments appreciated!
>
> Best regards,
> Dylan
>
>
>
> On 3 May 2018 at 02:32, Gian Merlino  wrote:
>
> > Hey Dylan,
> >
> > Great to hear that your experience has generally been positive!
> >
> > What do you think about using compaction for this? (The feature added in
> > https://github.com/druid-io/druid/pull/5102.) The idea with compaction
> was
> > that it would enable a background process that goes through freshly
> > inserted segments and re-partitions them optimally.
> >
> > For creating multiple datasources out of one topic, there is a PR wending
> > its way through review right now that is relevant: https://github.com/
> > druid-io/druid/pull/5556.
> >
> > On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie 
> wrote:
> >
> > > Hey there,
> > >
> > > With the recent improvements to the Kafka Indexing Service we've been
> > > migrating over from Tranquility and have had a very positive
> experience.
> > >
> > > However one of the downsides to using the KIS, is that the number of
> > > segments generated for each period can't be smaller than the number of
> > > tasks required to consume the queue. So if you have a use case
> involving
> > > ingesting from a topic with a high rate of large messages but your spec
> > > only extracts a small proportion of fields you may be forced to run a
> > large
> > > number of tasks that generate very small segments.
> > >
> > > This email is to check in for peoples thoughts on separating consuming
> > and
> > > parsing messages from indexing and segment management, in a similar
> > fashion
> > > to how Tranquility operates.
> > >
> > > Potentially - we could have the supervisor spawn two types of task that
> > can
> > > be configured independently, a consumer and an appender. The consumer
> > would
> > > parse

Re: Kafka Indexing Service - Decoupling segments from consumer tasks

2018-05-10 Thread Dylan Wylie
Hey Gian,

Thanks for the feedback and taking the time to read through the proposal.

Yeah your suggestion is much less complex and I think makes more sense!

Cheers,
Dylan

On 8 May 2018 at 22:06, Gian Merlino  wrote:

> Hi Dylan,
>
> My feeling is that it is going to be challenging to add a layer of
> indirection to the Kafka indexing stuff while maintaining its
> exactly-onceness. The exactly-onceness is based around tracking the
> specific Kafka offsets that are read by each task, and is tightly linked to
> which partitions are assigned to which task. I think what you describe
> would be doable, but it would add a lot of complexity to a code base that
> is already pretty complex. If I were in your position I'd try
> repartitioning the original Kafka topic into a second Kafka topic using a
> good key. It doubles the traffic at Kafka but at least it's simple! It
> follows a general rule of making Kafka do as much of the work as possible.
>
> What do you think?
>
> On Thu, May 3, 2018 at 1:53 PM, Dylan Wylie  wrote:
>
> > Hey Gian,
> >
> > Thanks for your response!
> >
> > Automatic compaction looks great and it'll definitely help out with
> general
> > efficiency. I'm also excited for the second PR, we've some use cases
> it'll
> > be helpful for.
> >
> > Even with those I still think there may be value in something along of
> the
> > lines of my suggestion.
> >
> > To present a real life example, we have a Kafka topic which emits ~200k
> > messages/second, each of these messages has around 300 fields (in a
> nested
> > avro structure) and its partitioned on some key which is not ingested
> into
> > Druid. We produce a datasource from that message with around 40-50
> fields.
> >
> > Using the Kafka Indexing Service, we measured that a single indexing task
> > extracting the fields for that datasource can consume 7k messages/second.
> > Meaning in order to keep up we have to run around 30 indexing tasks. This
> > results in around 30 segments at around 300mb each for each hour of data
> > compared to 6 segments at around 650mb each when batch ingested, so the
> > cluster's holding up to 3x as much data as it might do if the data was
> > ingested into a smaller number of segments.
> > (I realise that Tranquility & Batch Ingestion partitions data by a hash
> of
> > all the fields, which the KIS can't so will always have less optimal
> > segments unless the Kafka topic is partitioned using a good key)
> >
> > Profiling the indexing peon shows that the largest chunk of time is being
> > spent deserialising the large avro message. So we think that if we could
> > split the current ingestion process into the two job-types as described
> we
> > could scale one up to handle consuming and parsing the message and
> another
> > could manage appending the rows to a (smaller) set of segments.
> >
> > From some initial playing around we've noticed that
> > https://github.com/druid-io/druid/pull/5261 introduced a stand-alone
> > realtime task using the newer appenderator API that could potentially be
> > built on top of.
> >
> > It might be simpler for us to introduce a stream processor which parses
> and
> > extracts the parts of the messages that are needed and emit only those
> > fields to be used in a topic partitioned on something that aids roll-up.
> > However separating parsing data from indexing it feels like it might be
> > more generally useful and avoid the extra work in maintaining a stream
> > processor. (And it seems like a fun way to get hacking with the Druid
> > codebase!).
> >
> > Sorry for the long email, thoughts or comments appreciated!
> >
> > Best regards,
> > Dylan
> >
> >
> >
> > On 3 May 2018 at 02:32, Gian Merlino  wrote:
> >
> > > Hey Dylan,
> > >
> > > Great to hear that your experience has generally been positive!
> > >
> > > What do you think about using compaction for this? (The feature added
> in
> > > https://github.com/druid-io/druid/pull/5102.) The idea with compaction
> > was
> > > that it would enable a background process that goes through freshly
> > > inserted segments and re-partitions them optimally.
> > >
> > > For creating multiple datasources out of one topic, there is a PR
> wending
> > > its way through review right now that is relevant: https://github.com/
> > > druid-io/druid/pull/5556.
> > >
> > > On Wed, May 2, 2018 at 12:46 PM, Dylan Wylie 
> > wrote:
> > >
> > > > Hey there,
> > > >
> > > > With the recent improvements to the Kafka Indexing Service we've been
> > > > migrating over from Tranquility and have had a very positive
> > experience.
> > > >
> > > > However one of the downsides to using the KIS, is that the number of
> > > > segments generated for each period can't be smaller than the number
> of
> > > > tasks required to consume the queue. So if you have a use case
> > involving
> > > > ingesting from a topic with a high rate of large messages but your
> spec
> > > > only extracts a small proportion of fields you may be forced to run a
> > > large
> > > > nu