And, one more piece of follow up. Some folks were wondering about more
specific details about what we had in mind for the framework. Along with a
prototype I had been writing up some documentation. This isn't meant in any
way to be finalized and I just wrote it up using the same tools we use
internally rather than integrating it directly with Kafka docs like we'd
want to eventually do, but I think the current version would help clarify
some of the details of what we think the framework should look like without
prematurely getting too far into the specifics of the API and
implementation.

You can find a draft of these docs here:
https://s3-us-west-2.amazonaws.com/confluent-files/copycat-docs-wip/intro.html

-Ewen

On Tue, Jun 23, 2015 at 6:11 PM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:

>
>
> On Mon, Jun 22, 2015 at 8:32 PM, Roshan Naik <ros...@hortonworks.com>
> wrote:
>
>> Thanks Jay and Ewen for the response.
>>
>>
>> >@Jay
>> >
>> > 3. This has a built in notion of parallelism throughout.
>>
>>
>>
>> It was not obvious how it will look like or differ from existing systemsÅ 
>> since all of existing ones do parallelize data movement.
>>
>
> I'm guessing some confusion here might also be because we want both
> parallelization and distribution.
>
> Roughly speaking, I think of Copycat making the consumer group abstraction
> available for any import task, and the idea is to make this automatic and
> transparent to the user. This isn't interesting for systems that literally
> only have a single input stream, but Copycat source connectors have a
> built-in notion of parallel input streams. The connector's job is to inform
> the the Copycat framework of what input streams there are and Copycat
> handles running tasks, balancing the streams across them, handles failures
> by rebalancing as necessary, provides offset commit and storage so tasks
> can resume from the last known-good state, etc.
>
> On the sink side, the input is the Kafka consumer group, which obviously
> already has this parallelism built in. Depending on the output, this may
> manifest in different ways. For HDFS, the effect is just that your output
> files are partitioned (one per topic-partition).
>
> As for other systems, can you be more specific? Some of them obviously do
> (e.g. Camus), but others require you to handle this manually. I don't want
> to pick on Flume specifically, but as an example, it requires either
> configuring multiple (or multiplexed) flows in a single agent or manage
> multiple agents independently. This isn't really the same as what I've
> described above where you hand Copycat one config and it automatically
> spreads the work across multiple, fault-tolerant tasks. But flume is also
> targeting a much different general problem, trying to build potentially
> large, multi-stage data flows with all sorts of transformations, filtering,
> etc.
>
>
>>
>>
>> @Ewen,
>>
>> >Import: Flume is just one of many similar systems designed around log
>> >collection. See notes below, but one major point is that they generally
>> >don't provide any sort of guaranteed delivery semantics.
>>
>>
>> I think most of them do provide guarantees of some sort (Ex. Flume &
>> FluentD).
>>
>
> This part of the discussion gets a little bit tricky, not least because it
> seems people can't agree on exactly what these terms mean.
>
> First, some systems that you didn't mention. Logstash definitely doesn't
> have any guarantees as it uses a simple 20-event in-memory buffer between
> stages. As far as I can tell, Heka doesn't provide these semantics either,
> although I have not investigated it as deeply.
>
> fluentd has an article discussing the options for it (
> http://docs.fluentd.org/articles/high-availability), but I actually think
> the article on writing plugins is more informative
> http://docs.fluentd.org/articles/plugin-development The most important
> point is that input plugins have no way to track or discovery downstream
> delivery (i.e. they cannot get acks, nor is there any sort of offset
> tracked that it can lookup to discover where to restart upon failure, nor
> is it guaranteed that after router.emit() returns that the data will have
> already been delivered downstream). So if I have a replicated input data
> store, e.g. a replicated database, and I am just reading off it's
> changelog, does fluentd actually guarantee something like at least once
> delivery to the sink? In fact, fluentd's own documentation (the high
> availability doc) describes data loss scenarios that aren't inherent to
> every system (e.g., if their log aggregator dies, which not every system is
> susceptible to, vs. if an event is generated on a single host and that host
> dies before reporting it anywhere, then of course the data is permanently
> lost).
>
> Flume actually does have a (somewhat confusingly named) transaction
> concept to help control this. The reliability actually depends on what type
> of channel implementation you use. Gwen and Jeff from Cloudera integrated
> Kafka and Flume, including a Kafka channel (see
> http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/).
> This does allow for better control over delivery semantics, and I think if
> you use something like Kafka for every channel in your pipeline, you can
> get something like what Copycat can provide. I'd argue flume's approach has
> some other drawbacks though. In order to work correctly, every source and
> sink has to handle the transaction semantics, which adds complexity
> (although they do offer great skeleton examples in their docs!).
>
> Copycat tries to avoid that complexity for connector developers by
> changing the framework to use streams, offsets, and commits, and pushing
> the complexities of dealing with any sorts of errors/failures into the
> framework. Ideally connector developers only need to a) check for offsets
> at startup and rewind to the last known committed offset and b) load events
> from the source system (with stream IDs and offsets) and pass them to the
> framework.
>
>
>>
>> >YARN: My point isn't that YARN is bad, it's that tying to any particular
>> >cluster manager severely limits the applicability of the tool. The goal
>> is
>> >to make Copycat agnostic to the cluster manager so it can run under
>> Mesos,
>> >YARN, etc.
>>
>> ok. Got it. Sounds like there is plan to do some work here to ensure
>> out-of-the-box it works with more than one scheduler (as @Jay listed out).
>> In that case, IMO it would be better to actually rephrase it in the KIP
>> that it will support more than one scheduler.
>>
>>
> Tried to add some wording to clarify that.
>
>
>>
>> >Exactly once: You accomplish this in any system by managing offsets in
>> the
>> >destination system atomically with the data or through some kind of
>> >deduplication. Jiangjie actually just gave a great talk about this issue
>> >at
>> >a recent Kafka meetup, perhaps he can share some slides about it. When
>> you
>> >see all the details involved, you'll see why I think it might be nice to
>> >have the framework help you manage the complexities of achieving
>> different
>> >delivery semantics ;)
>>
>>
>> Deduplication as a post processing step is a common recommendation done
>> today Å  but that is a workaround/fix for the inability to provide
>> exactly-once by the delivery systems. IMO such post processing should not
>> be considered part of the "exacty-once" guarantee of Copycat.
>>
>>
>> Will be good to know how this guarantee will be possible when delivering
>> to HDFS.
>> Would be great if someone can share those slides if it is discussed there.
>>
>>
> For HDFS, the gist of the solution is to write to temporary files and then
> rename atomically to their final destination, including offset information
> (e.g., it can just be in the filename). Readers only see files that have
> been "committed". If there is a failure, any existing temp files get
> cleaned up and reading is reset to the last committed offset. There are
> some tricky details if you have zombie processes and depending on how you
> organize the data across files, but this isn't really the point of this
> KIP. If you're interested in HDFS specifically, I'd suggest looking at
> Camus's implementation.
>
>
>>
>>
>>
>> Was looking for clarification on this ..
>> - Export side - is this like a map reduce kind of job or something else ?
>> If delivering to hdfs would this be running on the hadoop cluster or
>> outside ?
>>
> - Import side - how does this look ? Is it a bunch of flume like processes
>> ? maybe just some kind of a broker that translates the incoming protocol
>> into outgoing Kafka producer api protocol ? If delivering to hdfs, will
>> this run on the cluster or outside ?
>>
>
> No mapreduce; in fact, no other frameworks required unless the connector
> needs it for some reason. Both source and sink look structurally the same.
> Probably the most common scenario is to run a set of workers that provide
> the copycat service. You submit connector jobs to run on these workers. A
> coordinator handles distributing the work across worker nodes. Coordinators
> determine how to divide the tasks and generate configs for them, then the
> framework handles distributing that work. Each individual task handles some
> subset of the job. For source tasks, that subset is a set of input streams
> (in the JDBC example in the KIP, each table would have a corresponding
> stream). For sink tasks, the subset is determined automatically by the
> framework via the underlying consumer group as a subset of topic-partitions
> (since the input is from Kafka). Connectors are kept simple, just
> processing streams of records (either generating them by reading from the
> source system or recording them into the sink system). Source tasks also
> include information about offsets, and sink tasks either need to manage
> offsets themselves or implement flush() functionality. Given these
> primitives, the framework can then handle other complexities like different
> delivery semantics without any additional support from the connectors.
>
> The motivation for the additional modes of execution (agent, embedded) was
> to support a couple of other common use cases. Agent mode is completely
> standalone, which provides for a much simpler implementation and handles
> use cases where there isn't an easy way to avoid running the job across
> many machines (e.g., if you have to load logs directly from log files).
> Embedded mode is actually a simple variant of the distributed mode, but
> lets you setup and run the entire cluster alongside the rest of your
> distributed app. This is useful if you want to get up and running with an
> application where you need to, for example, import data from another
> service into Kafka, then consume and process that data. You can setup the
> worker and submit a job directly from your code, reducing the operational
> complexity. It's probably not the right long term solution as your usage
> expands, but it can significantly ease adoption.
>
>
>>
>>
>> I still think adding one or two specific end-to-end use-cases in the KIP,
>> showing how copycat will pan out for them for import/export will really
>> clarify things.
>>
>
> There were a couple of examples already in the KIP -- JDBC, HDFS, log
> import, and now I've also added mirror maker. Were you looking for
> something more specific? I could also explain a full source -> kafka ->
> sink pipeline, but I don't know that there's much to add there beyond the
> fact that we would like schemas to carry across the entire pipeline.
> Otherwise it's just chaining connectors. Besides, I think most of the
> interesting use cases actually have additional processing steps in between,
> i.e. using stream processing frameworks or custom consumers + producers.
>
> --
> Thanks,
> Ewen
>



-- 
Thanks,
Ewen

Reply via email to