One more reason to have CopyCat as a separate project is to sidestep
the entire "Why CopyCat and not X" discussion :)

On Tue, Jun 23, 2015 at 6:26 PM, Gwen Shapira <gshap...@cloudera.com> wrote:
> Re: Flume vs. CopyCat
>
> I would love to have an automagically-parallelizing, schema-aware
> version of Flume with great reliability guarantees. Flume has good
> core architecture and I'm sure that if the Flume community is
> interested, it can be extended in that direction.
>
> However, the Apache way is not to stop new innovation just because
> some systems already exists. We develop the best systems we can, and
> users choose the ones they prefer - thats how ecosystems thrive.
> If we can have Flume and NiFi, Sentry and Argus, Flink and Storm,
> Parquet and ORC, I'm sure we can also have CopyCat in the zoo :)
>
> Gwen
>
>
>
> On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava
> <e...@confluent.io> wrote:
>> On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik <ros...@hortonworks.com> wrote:
>>
>>> Thanks Jay and Ewen for the response.
>>>
>>>
>>> >@Jay
>>> >
>>> > 3. This has a built in notion of parallelism throughout.
>>>
>>>
>>>
>>> It was not obvious how it will look like or differ from existing systemsÅ 
>>> since all of existing ones do parallelize data movement.
>>>
>>
>> I'm guessing some confusion here might also be because we want both
>> parallelization and distribution.
>>
>> Roughly speaking, I think of Copycat making the consumer group abstraction
>> available for any import task, and the idea is to make this automatic and
>> transparent to the user. This isn't interesting for systems that literally
>> only have a single input stream, but Copycat source connectors have a
>> built-in notion of parallel input streams. The connector's job is to inform
>> the the Copycat framework of what input streams there are and Copycat
>> handles running tasks, balancing the streams across them, handles failures
>> by rebalancing as necessary, provides offset commit and storage so tasks
>> can resume from the last known-good state, etc.
>>
>> On the sink side, the input is the Kafka consumer group, which obviously
>> already has this parallelism built in. Depending on the output, this may
>> manifest in different ways. For HDFS, the effect is just that your output
>> files are partitioned (one per topic-partition).
>>
>> As for other systems, can you be more specific? Some of them obviously do
>> (e.g. Camus), but others require you to handle this manually. I don't want
>> to pick on Flume specifically, but as an example, it requires either
>> configuring multiple (or multiplexed) flows in a single agent or manage
>> multiple agents independently. This isn't really the same as what I've
>> described above where you hand Copycat one config and it automatically
>> spreads the work across multiple, fault-tolerant tasks. But flume is also
>> targeting a much different general problem, trying to build potentially
>> large, multi-stage data flows with all sorts of transformations, filtering,
>> etc.
>>
>>
>>>
>>>
>>> @Ewen,
>>>
>>> >Import: Flume is just one of many similar systems designed around log
>>> >collection. See notes below, but one major point is that they generally
>>> >don't provide any sort of guaranteed delivery semantics.
>>>
>>>
>>> I think most of them do provide guarantees of some sort (Ex. Flume &
>>> FluentD).
>>>
>>
>> This part of the discussion gets a little bit tricky, not least because it
>> seems people can't agree on exactly what these terms mean.
>>
>> First, some systems that you didn't mention. Logstash definitely doesn't
>> have any guarantees as it uses a simple 20-event in-memory buffer between
>> stages. As far as I can tell, Heka doesn't provide these semantics either,
>> although I have not investigated it as deeply.
>>
>> fluentd has an article discussing the options for it (
>> http://docs.fluentd.org/articles/high-availability), but I actually think
>> the article on writing plugins is more informative
>> http://docs.fluentd.org/articles/plugin-development The most important
>> point is that input plugins have no way to track or discovery downstream
>> delivery (i.e. they cannot get acks, nor is there any sort of offset
>> tracked that it can lookup to discover where to restart upon failure, nor
>> is it guaranteed that after router.emit() returns that the data will have
>> already been delivered downstream). So if I have a replicated input data
>> store, e.g. a replicated database, and I am just reading off it's
>> changelog, does fluentd actually guarantee something like at least once
>> delivery to the sink? In fact, fluentd's own documentation (the high
>> availability doc) describes data loss scenarios that aren't inherent to
>> every system (e.g., if their log aggregator dies, which not every system is
>> susceptible to, vs. if an event is generated on a single host and that host
>> dies before reporting it anywhere, then of course the data is permanently
>> lost).
>>
>> Flume actually does have a (somewhat confusingly named) transaction concept
>> to help control this. The reliability actually depends on what type of
>> channel implementation you use. Gwen and Jeff from Cloudera integrated
>> Kafka and Flume, including a Kafka channel (see
>> http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/).
>> This does allow for better control over delivery semantics, and I think if
>> you use something like Kafka for every channel in your pipeline, you can
>> get something like what Copycat can provide. I'd argue flume's approach has
>> some other drawbacks though. In order to work correctly, every source and
>> sink has to handle the transaction semantics, which adds complexity
>> (although they do offer great skeleton examples in their docs!).
>>
>> Copycat tries to avoid that complexity for connector developers by changing
>> the framework to use streams, offsets, and commits, and pushing the
>> complexities of dealing with any sorts of errors/failures into the
>> framework. Ideally connector developers only need to a) check for offsets
>> at startup and rewind to the last known committed offset and b) load events
>> from the source system (with stream IDs and offsets) and pass them to the
>> framework.
>>
>>
>>>
>>> >YARN: My point isn't that YARN is bad, it's that tying to any particular
>>> >cluster manager severely limits the applicability of the tool. The goal is
>>> >to make Copycat agnostic to the cluster manager so it can run under Mesos,
>>> >YARN, etc.
>>>
>>> ok. Got it. Sounds like there is plan to do some work here to ensure
>>> out-of-the-box it works with more than one scheduler (as @Jay listed out).
>>> In that case, IMO it would be better to actually rephrase it in the KIP
>>> that it will support more than one scheduler.
>>>
>>>
>> Tried to add some wording to clarify that.
>>
>>
>>>
>>> >Exactly once: You accomplish this in any system by managing offsets in the
>>> >destination system atomically with the data or through some kind of
>>> >deduplication. Jiangjie actually just gave a great talk about this issue
>>> >at
>>> >a recent Kafka meetup, perhaps he can share some slides about it. When you
>>> >see all the details involved, you'll see why I think it might be nice to
>>> >have the framework help you manage the complexities of achieving different
>>> >delivery semantics ;)
>>>
>>>
>>> Deduplication as a post processing step is a common recommendation done
>>> today Å  but that is a workaround/fix for the inability to provide
>>> exactly-once by the delivery systems. IMO such post processing should not
>>> be considered part of the "exacty-once" guarantee of Copycat.
>>>
>>>
>>> Will be good to know how this guarantee will be possible when delivering
>>> to HDFS.
>>> Would be great if someone can share those slides if it is discussed there.
>>>
>>>
>> For HDFS, the gist of the solution is to write to temporary files and then
>> rename atomically to their final destination, including offset information
>> (e.g., it can just be in the filename). Readers only see files that have
>> been "committed". If there is a failure, any existing temp files get
>> cleaned up and reading is reset to the last committed offset. There are
>> some tricky details if you have zombie processes and depending on how you
>> organize the data across files, but this isn't really the point of this
>> KIP. If you're interested in HDFS specifically, I'd suggest looking at
>> Camus's implementation.
>>
>>
>>>
>>>
>>>
>>> Was looking for clarification on this ..
>>> - Export side - is this like a map reduce kind of job or something else ?
>>> If delivering to hdfs would this be running on the hadoop cluster or
>>> outside ?
>>>
>> - Import side - how does this look ? Is it a bunch of flume like processes
>>> ? maybe just some kind of a broker that translates the incoming protocol
>>> into outgoing Kafka producer api protocol ? If delivering to hdfs, will
>>> this run on the cluster or outside ?
>>>
>>
>> No mapreduce; in fact, no other frameworks required unless the connector
>> needs it for some reason. Both source and sink look structurally the same.
>> Probably the most common scenario is to run a set of workers that provide
>> the copycat service. You submit connector jobs to run on these workers. A
>> coordinator handles distributing the work across worker nodes. Coordinators
>> determine how to divide the tasks and generate configs for them, then the
>> framework handles distributing that work. Each individual task handles some
>> subset of the job. For source tasks, that subset is a set of input streams
>> (in the JDBC example in the KIP, each table would have a corresponding
>> stream). For sink tasks, the subset is determined automatically by the
>> framework via the underlying consumer group as a subset of topic-partitions
>> (since the input is from Kafka). Connectors are kept simple, just
>> processing streams of records (either generating them by reading from the
>> source system or recording them into the sink system). Source tasks also
>> include information about offsets, and sink tasks either need to manage
>> offsets themselves or implement flush() functionality. Given these
>> primitives, the framework can then handle other complexities like different
>> delivery semantics without any additional support from the connectors.
>>
>> The motivation for the additional modes of execution (agent, embedded) was
>> to support a couple of other common use cases. Agent mode is completely
>> standalone, which provides for a much simpler implementation and handles
>> use cases where there isn't an easy way to avoid running the job across
>> many machines (e.g., if you have to load logs directly from log files).
>> Embedded mode is actually a simple variant of the distributed mode, but
>> lets you setup and run the entire cluster alongside the rest of your
>> distributed app. This is useful if you want to get up and running with an
>> application where you need to, for example, import data from another
>> service into Kafka, then consume and process that data. You can setup the
>> worker and submit a job directly from your code, reducing the operational
>> complexity. It's probably not the right long term solution as your usage
>> expands, but it can significantly ease adoption.
>>
>>
>>>
>>>
>>> I still think adding one or two specific end-to-end use-cases in the KIP,
>>> showing how copycat will pan out for them for import/export will really
>>> clarify things.
>>>
>>
>> There were a couple of examples already in the KIP -- JDBC, HDFS, log
>> import, and now I've also added mirror maker. Were you looking for
>> something more specific? I could also explain a full source -> kafka ->
>> sink pipeline, but I don't know that there's much to add there beyond the
>> fact that we would like schemas to carry across the entire pipeline.
>> Otherwise it's just chaining connectors. Besides, I think most of the
>> interesting use cases actually have additional processing steps in between,
>> i.e. using stream processing frameworks or custom consumers + producers.
>>
>> --
>> Thanks,
>> Ewen

Reply via email to