On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik <ros...@hortonworks.com> wrote:

> Thanks Jay and Ewen for the response.
>
>
> >@Jay
> >
> > 3. This has a built in notion of parallelism throughout.
>
>
>
> It was not obvious how it will look like or differ from existing systemsÅ 
> since all of existing ones do parallelize data movement.
>

I'm guessing some confusion here might also be because we want both
parallelization and distribution.

Roughly speaking, I think of Copycat making the consumer group abstraction
available for any import task, and the idea is to make this automatic and
transparent to the user. This isn't interesting for systems that literally
only have a single input stream, but Copycat source connectors have a
built-in notion of parallel input streams. The connector's job is to inform
the the Copycat framework of what input streams there are and Copycat
handles running tasks, balancing the streams across them, handles failures
by rebalancing as necessary, provides offset commit and storage so tasks
can resume from the last known-good state, etc.

On the sink side, the input is the Kafka consumer group, which obviously
already has this parallelism built in. Depending on the output, this may
manifest in different ways. For HDFS, the effect is just that your output
files are partitioned (one per topic-partition).

As for other systems, can you be more specific? Some of them obviously do
(e.g. Camus), but others require you to handle this manually. I don't want
to pick on Flume specifically, but as an example, it requires either
configuring multiple (or multiplexed) flows in a single agent or manage
multiple agents independently. This isn't really the same as what I've
described above where you hand Copycat one config and it automatically
spreads the work across multiple, fault-tolerant tasks. But flume is also
targeting a much different general problem, trying to build potentially
large, multi-stage data flows with all sorts of transformations, filtering,
etc.


>
>
> @Ewen,
>
> >Import: Flume is just one of many similar systems designed around log
> >collection. See notes below, but one major point is that they generally
> >don't provide any sort of guaranteed delivery semantics.
>
>
> I think most of them do provide guarantees of some sort (Ex. Flume &
> FluentD).
>

This part of the discussion gets a little bit tricky, not least because it
seems people can't agree on exactly what these terms mean.

First, some systems that you didn't mention. Logstash definitely doesn't
have any guarantees as it uses a simple 20-event in-memory buffer between
stages. As far as I can tell, Heka doesn't provide these semantics either,
although I have not investigated it as deeply.

fluentd has an article discussing the options for it (
http://docs.fluentd.org/articles/high-availability), but I actually think
the article on writing plugins is more informative
http://docs.fluentd.org/articles/plugin-development The most important
point is that input plugins have no way to track or discovery downstream
delivery (i.e. they cannot get acks, nor is there any sort of offset
tracked that it can lookup to discover where to restart upon failure, nor
is it guaranteed that after router.emit() returns that the data will have
already been delivered downstream). So if I have a replicated input data
store, e.g. a replicated database, and I am just reading off it's
changelog, does fluentd actually guarantee something like at least once
delivery to the sink? In fact, fluentd's own documentation (the high
availability doc) describes data loss scenarios that aren't inherent to
every system (e.g., if their log aggregator dies, which not every system is
susceptible to, vs. if an event is generated on a single host and that host
dies before reporting it anywhere, then of course the data is permanently
lost).

Flume actually does have a (somewhat confusingly named) transaction concept
to help control this. The reliability actually depends on what type of
channel implementation you use. Gwen and Jeff from Cloudera integrated
Kafka and Flume, including a Kafka channel (see
http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/).
This does allow for better control over delivery semantics, and I think if
you use something like Kafka for every channel in your pipeline, you can
get something like what Copycat can provide. I'd argue flume's approach has
some other drawbacks though. In order to work correctly, every source and
sink has to handle the transaction semantics, which adds complexity
(although they do offer great skeleton examples in their docs!).

Copycat tries to avoid that complexity for connector developers by changing
the framework to use streams, offsets, and commits, and pushing the
complexities of dealing with any sorts of errors/failures into the
framework. Ideally connector developers only need to a) check for offsets
at startup and rewind to the last known committed offset and b) load events
from the source system (with stream IDs and offsets) and pass them to the
framework.


>
> >YARN: My point isn't that YARN is bad, it's that tying to any particular
> >cluster manager severely limits the applicability of the tool. The goal is
> >to make Copycat agnostic to the cluster manager so it can run under Mesos,
> >YARN, etc.
>
> ok. Got it. Sounds like there is plan to do some work here to ensure
> out-of-the-box it works with more than one scheduler (as @Jay listed out).
> In that case, IMO it would be better to actually rephrase it in the KIP
> that it will support more than one scheduler.
>
>
Tried to add some wording to clarify that.


>
> >Exactly once: You accomplish this in any system by managing offsets in the
> >destination system atomically with the data or through some kind of
> >deduplication. Jiangjie actually just gave a great talk about this issue
> >at
> >a recent Kafka meetup, perhaps he can share some slides about it. When you
> >see all the details involved, you'll see why I think it might be nice to
> >have the framework help you manage the complexities of achieving different
> >delivery semantics ;)
>
>
> Deduplication as a post processing step is a common recommendation done
> today Å  but that is a workaround/fix for the inability to provide
> exactly-once by the delivery systems. IMO such post processing should not
> be considered part of the "exacty-once" guarantee of Copycat.
>
>
> Will be good to know how this guarantee will be possible when delivering
> to HDFS.
> Would be great if someone can share those slides if it is discussed there.
>
>
For HDFS, the gist of the solution is to write to temporary files and then
rename atomically to their final destination, including offset information
(e.g., it can just be in the filename). Readers only see files that have
been "committed". If there is a failure, any existing temp files get
cleaned up and reading is reset to the last committed offset. There are
some tricky details if you have zombie processes and depending on how you
organize the data across files, but this isn't really the point of this
KIP. If you're interested in HDFS specifically, I'd suggest looking at
Camus's implementation.


>
>
>
> Was looking for clarification on this ..
> - Export side - is this like a map reduce kind of job or something else ?
> If delivering to hdfs would this be running on the hadoop cluster or
> outside ?
>
- Import side - how does this look ? Is it a bunch of flume like processes
> ? maybe just some kind of a broker that translates the incoming protocol
> into outgoing Kafka producer api protocol ? If delivering to hdfs, will
> this run on the cluster or outside ?
>

No mapreduce; in fact, no other frameworks required unless the connector
needs it for some reason. Both source and sink look structurally the same.
Probably the most common scenario is to run a set of workers that provide
the copycat service. You submit connector jobs to run on these workers. A
coordinator handles distributing the work across worker nodes. Coordinators
determine how to divide the tasks and generate configs for them, then the
framework handles distributing that work. Each individual task handles some
subset of the job. For source tasks, that subset is a set of input streams
(in the JDBC example in the KIP, each table would have a corresponding
stream). For sink tasks, the subset is determined automatically by the
framework via the underlying consumer group as a subset of topic-partitions
(since the input is from Kafka). Connectors are kept simple, just
processing streams of records (either generating them by reading from the
source system or recording them into the sink system). Source tasks also
include information about offsets, and sink tasks either need to manage
offsets themselves or implement flush() functionality. Given these
primitives, the framework can then handle other complexities like different
delivery semantics without any additional support from the connectors.

The motivation for the additional modes of execution (agent, embedded) was
to support a couple of other common use cases. Agent mode is completely
standalone, which provides for a much simpler implementation and handles
use cases where there isn't an easy way to avoid running the job across
many machines (e.g., if you have to load logs directly from log files).
Embedded mode is actually a simple variant of the distributed mode, but
lets you setup and run the entire cluster alongside the rest of your
distributed app. This is useful if you want to get up and running with an
application where you need to, for example, import data from another
service into Kafka, then consume and process that data. You can setup the
worker and submit a job directly from your code, reducing the operational
complexity. It's probably not the right long term solution as your usage
expands, but it can significantly ease adoption.


>
>
> I still think adding one or two specific end-to-end use-cases in the KIP,
> showing how copycat will pan out for them for import/export will really
> clarify things.
>

There were a couple of examples already in the KIP -- JDBC, HDFS, log
import, and now I've also added mirror maker. Were you looking for
something more specific? I could also explain a full source -> kafka ->
sink pipeline, but I don't know that there's much to add there beyond the
fact that we would like schemas to carry across the entire pipeline.
Otherwise it's just chaining connectors. Besides, I think most of the
interesting use cases actually have additional processing steps in between,
i.e. using stream processing frameworks or custom consumers + producers.

-- 
Thanks,
Ewen

Reply via email to