Long wall of text:

-------

Hi Ewen

> There are 2 aspects to this. The first is rebalancing. There's actually
nothing that fundamentally requires every rebalance operation to be
stop-the-world. I've had some thoughts about allowing partial and
incremental rebalancing floating around for awhile.

I really think rebalancing should be a separate KIP for people operating
connect in cluster mode and they will really appreciate it. But this won't
help with "container mode" for connect.

>  If you want to use any of these solutions, you are going to have to have
separate Connect clusters. There are some pain points if you go this route,
but I think some you can just live with (e.g. 3 topics per connector isn't
actually that insane), some are fixable and just not there yet (e.g. the
upgrade example affecting the whole cluster is also fixable by just
supporting dynamically loading in new jars instead of requiring a restart).

I think separating connect clusters was always the idea. I believe the 3
topics can really made redundant, see below. Loading new jars as a hot swap
is not a solution, think of implications if the configuration update is a
breaking change.

>  However, I don't think that means we shouldn't consider exposing some
security configs if there are good use cases (such as here, where you might
want to
lock down different connectors from each other via ACLs).

I believe loading a connect cluster with 40 different keytabs is a security
risk in itself. An audit team won't less this pass: security means a
connector can't "borrow" credentials from another connector.

>  Updates to the config complicate things -- what happens when you're
rolling bouncing and a worker crashes and fails over?

I believe this problem is fixed by the KIP. The config is attached to a
worker and that's it, no shared state.

>  we store and distribute configs/offsets/statuses, we
could instead try to address this specific concern with, e.g., connect
cluster IDs that are included in these topics so you can have one global
topic similar to _consumer_offsets

_connect_offsets need to stay and have similar "group ACL" as consumer
group. Then having a shared topic that supports multiple clusters id is
fine.
_connect_configs needs to go away for distributed mode
_connect_status can also probably go away if you use the coordinator
capability similar to consumer groups (you talk about reinventing the
wheel, but wouldn't this be code re-use anyway?)

----

So, my biggest pain point today is the "config" topic. I think it prevents
two things from happening:
1) rolling upgrade of jars that has breaking config chanegs
2) security. If we consider source -> connector -> kafka, only the
connector should have access to the source credentials. Today these are
stored in Kafka in plain text on disk. This is in my opinion a concern as
there is no separation of concerns.

If we look at the standalone mode, you can load the config with the
connector so I'm sure somewhere the code exists. Further into the
standalone mode, you can launch several of them in Sink mode with
standalone, and effectively have achieved "container" mode: the status
topic is not used, because coordination is given through the consumer group
mechanism, the config topic is not used because the config is embedded, and
the offsets is not used because it leverages consumer offsets out of the
box. Additionally, if you have a breaking jar and want to do rolling
upgrade + config change, you can do it for each separate running instance
of each sink instance.

The changes probably won't be easy and would require to think the producer
that stores offsets in Kafka to have a similar architecture to a consumer,
but I think that's the ideal state for management. That would remove the
status topic and centralise the _connect_offsets topic

Config changes need not to be communicated across instances then. You can
totally have with the same cluster id: Container(JARv1+Configv1) +
Container(JARv2+Configv2)
while doing a rolling upgrade.





On 27 May 2018 at 05:40, Saulius Valatka <saulius...@gmail.com> wrote:

> Hi Ewen
>
> thanks for the response, I totally agree that this KIP touches upon
> multiple rather orthogonal issues, I had a hard time deciding on the name
> and scope for it :)
> Let me sum up the biggest pain points that we're facing with distributed
> mode:
>
> * By far the biggest issue we have is the lack of physical connector
> isolation. Since all of them live in the same JVM process we've run into
> issues multiple times where misbehaving connectors can either crash the
> whole process or simply over-utilize cpu/memory and effectively deprive
> others of resources. This is the main reason why we stopped using
> distributed mode in production and switched to running each connector in
> standalone mode per container. Currently we're running a patched version
> Connect where standalone mode can store offsets in a Kafka topic (as
> currently proposed in the KIP). I don't really see how this could be
> easily addressed within the existing framework and given that increasingly
> more users have access to mesos/k8s it would seem natural to isolate at
> this level rather than solve this within Connect.
>
> * Stop-the-world rebalancing when deploying connectors is
> a-very-close-second-biggest issue we had with distributed mode. True, you
> could solve this by improving the rebalancing algorithm, but just having
> isolated Connect frameworks with single connectors inside sounds much safer
> (bugs do happen) and easier to implement.
>
> * The next biggest problem is the deployment workflow, which can be split
> into two major issues: deploying via REST and various JAR upgrades
> (connectors/converters/transforms). Again, true, you can introduce some
> kind of JAR dynamic-reloading mechanism which no longer requires a global
> cluster restart, but still requires some ad-hoc deployment procedure (e.g.
> how would I coordinate JAR updates with configuration updates?). Once
> again, all of this is trivially solved just by running statically
> configured connectors-per-container: just rebuild the image with updated
> jars and redeploy it to mesos/k8s.
>
> * The separate configuration per connector issues were not that painful,
> but nice to get rid of: mainly the lack of ability to use different
> credentials per connector. Sure, these are being gradually addressed by
> various KIPs, but by running separate Connect instances per connector we
> get full control out of the box.
>
> As for now, we are perfectly happy with running each connector in a
> separate container in standalone mode, the only required modification to
> Connect is the ability to store offsets in a Kafka topic. The REST API
> becomes useless in this scenario, it can of course be disabled at the
> container/k8s level or ignored, but nevertheless I've included a proposal
> to make it read-only (or disable it) in the KIP for completeness.
>
> I believe what Stephane calls Kafka Connect v2 is not that much of a "grand
> redesign", but rather a logical conclusion to this KIP: I wouldn't go as
> far as calling it v2, but maybe a new "distributed-isolated" runtime mode.
> If connectors are isolated and statically pre-configured, the distributed
> mode could in principle do away with the config/status topics and act more
> like an ordinary kafka consumer. The fact that these topics would no longer
> be needed would be a nice side-effect, I don't really think they are a
> major problem, rather an annoyance.
>
> To summarize, the core issue the KIP is addressing is physical/logical
> connector isolation, which can be solved by running isolated Connect
> frameworks, rather than a Connect cluster. The most reasonable method for
> doing this is running connectors on a platform like mesos/k8s, hence the
> configuration/deployment problems. In other words, it would be nice to run
> Connect more like an application and not like a cluster. In terms of
> approaching this incrementally, I believe the first step could be improving
> the standalone mode as described in the KIP currently. Improving the
> distributed mode is a separate story.
>
> Hope this makes sense
>
>
> 2018-05-22 3:00 GMT+03:00 Ewen Cheslack-Postava <e...@confluent.io>:
>
> > Hey all,
> >
> > I think think this is a great discussion, and is helping to clarify the
> > real pain points as well as explore a few more options than just what was
> > initially proposed.
> >
> > Stephane, I think why you're ending up in "grand redesign" state is
> because
> > you're highlighting (and the KIP's motivation section examples of pain
> > points) are actually a set of a couple of different high level
> challenges:
> >
> >
> >    - Impact of shared resources, especially rebalancing but also just
> >    shared process space/memory/etc
> >    - Kafka security credentials / shared client configs
> >    - How configs are specified in the two modes
> >
> > I actually think there isn't much in the identified problems that are
> > unique to containers or k8s/mesos, that's just an environment you'll
> > commonly run into these challenges.
> >
> > I want to cover each of these issues to see if we can improve them in a
> > more incremental way.
> >
> > *Impact of shared resources, especially rebalancing but also just shared
> > process space/memory/etc*
> > There are 2 aspects to this. The first is rebalancing. There's actually
> > nothing that fundamentally requires every rebalance operation to be
> > stop-the-world. I've had some thoughts about allowing partial and
> > incremental rebalancing floating around for awhile. As Connect clusters
> are
> > growing larger, this is becoming increasingly important. I don't have a
> > full write up ready, but it's fixable by making a straightforward change
> > (and we planned for the ability to make changes like this, so it
> absolutely
> > should be possible to do and apply to existing clusters). The basic idea
> is
> > to change what starting a rebalance means for ownership of resources --
> > instead of assuming everything might be rearranged, you assume by default
> > that you will continue to own everything you currently do and continue
> > processing through the rebalance. The leader who handles assignment can
> > then decide what set of rebalances really are required and include that
> > info in the data it sends back. If any rebalancing is required, do a
> > subsequent round of rebalancing where you actually give up those
> resources
> > if they were assigned to you. This gives you a way to do partial
> rebalances
> > and only as needed. You can further extend this in a variety of ways,
> e.g.
> > only rebalancing one resource per rebalance, doing a bit of "scheduling"
> to
> > spread out the impact, etc. This is definitely not a trivial
> > change/addition to make and will require very thorough testing, but it's
> > definitely feasible for 1 person to implement themselves pretty quickly.
> >
> > The second aspect is shared resources with no control over different
> > connectors usage (and the fact that someone doing something bad might
> mean
> > all connectors crash with the workers). Containers are great for this
> that
> > give you more granularity than VMs or physical hosts, but either one
> works.
> > If you want to use any of these solutions, you are going to have to have
> > separate Connect clusters. There are some pain points if you go this
> route,
> > but I think some you can just live with (e.g. 3 topics per connector
> isn't
> > actually that insane), some are fixable and just not there yet (e.g. the
> > upgrade example affecting the whole cluster is also fixable by just
> > supporting dynamically loading in new jars instead of requiring a
> restart).
> >
> > *Kafka security credentials / shared client configs*
> > This is definitely one we hear from time to time. In general, there's no
> > reason we couldn't just expose security configs in the connector config.
> I
> > have pushed back on just exposing all configs because a) I don't think
> > users *should* have control over them, b) in a shared cluster, some
> > represent potentially serious stability threats (e.g. I override buffer
> > sizes and OOM everything), and c) it's actually a compatibility concern.
> > However, I don't think that means we shouldn't consider exposing some of
> > them if there are good use cases (such as here, where you might want to
> > lock down different connectors from each other via ACLs).
> >
> > For the purposes of this discussion, I don't really think this is the
> thing
> > to focus on -- given you're already committing to having separate
> security
> > configs for each connector, it seems like whether you configure multiple
> > clusters vs configure them per connector isn't that much different.
> >
> > *How configs are specified in the two modes*
> > This one is, I think, the most interesting because it has been requested
> a
> > few times before and I'm not sure anyone has actually worked through
> > semantics that are consistent and handle all the different requirements.
> Of
> > all the stuff mentioned so far in this discussion, this is the piece that
> > is really the biggest pain point that's not easy to work around today --
> > even if you split out to separate containers per connector, you still
> want
> > distributed mode because a single connector may need to be spread across
> > multiple containers. However, the default way for apps to be configured
> is
> > for them to get config files/parameters directly with each process, and
> so
> > it seems nice if you are only running 1 connecter per cluster, to treat
> the
> > entire config, both worker and connector, as a single "app" config.
> >
> > But this breaks down with distributed mode and when you start thinking
> > about upgrades. With distributed mode as it is today, how do you decide
> if
> > you should write the config you were handed to the config topic? (For the
> > deployment approach we're talking about here, disabling changes to
> configs
> > via the REST API makes sense and avoids a whole set of confusing cases
> > where it's unclear what to do, so I'll avoid them here, but for the more
> > general question of just allowing connector config files for distributed
> > mode, these cases would also need to be addressed). Updates to the config
> > complicate things -- what happens when you're rolling bouncing and a
> worker
> > crashes and fails over? Can we always guarantee we won't restart a
> process
> > such that a node still with the old config doesn't restart and overwrite
> a
> > newer version of the config? Or maybe we don't write it at all and each
> > just relies on the config it was given, but a) we still need to
> distribute
> > task configs and b) what happens during an upgrade when the configs are
> > different across different nodes? I think we would need to think about
> and
> > define how all these edge cases actually behave, and ensure it would be
> > both correct and intuitive.
> >
> > On the topic of workers having a leader & having to use a topic because
> > there is no other storage: first, there is a leader and we do use that to
> > control who is currently allowed to write to the config topic. Kafka
> > effectively provides for this in the protocol because of the way we have
> a
> > single member of the group do assignment in a two-round protocol. The
> > reason statuses are also a topic is that we have this great pub/sub
> system,
> > so it makes sense to use it instead of reinventing the wheel. If the
> number
> > of topics is too painful, instead of possibly doing something like
> > replacing the way we store and distribute configs/offsets/statuses, we
> > could instead try to address this specific concern with, e.g., connect
> > cluster IDs that are included in these topics so you can have one global
> > topic similar to _consumer_offsets. That doesn't totally solve the
> problem
> > if you want the worker clusters to also be completely security isolated,
> > but I think it addresses most people's concern with this setup (while
> also
> > not introducing additional changes in how we have to have the cluster
> > achieve consensus). These are JSON data formats so should be easy to
> extend
> > a bit -- the major question I would have is around how we make it
> seamless
> > to upgrade given that, e.g., keys in the config topic would have to
> change
> > between v1 and v2 formats.
> >
> >
> > Anyway, sorry for the wall of text, but I think giving a bit more
> structure
> > to the problem space might help us focus on what really needs to be
> solved
> > so we can also do so incrementally. So given the above, what are the
> things
> > that most make it painful? Is it the config thing where fundamentally it
> > does not fit into your k8s/mesos/container + CI/CD workflow to do
> anything
> > other than pass configs in via command line flags? Or is it mostly the
> pain
> > of too many topics? Or something else?
> >
> > -Ewen
> >
> > On Fri, May 18, 2018 at 4:47 AM Saulius Valatka <saulius...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > thanks for the feedback, you have some valid points that I did not
> > consider
> > > thoroughly.
> > >
> > > Now that I think about it, my current proposal is indeed a bit hacky,
> > > namely the dichotomy between running "light"/"heavy" connectors in
> > > standalone/distributed mode: indeed there should be no difference, one
> > > should be able to freely scale a connector by running more container
> > > instances. Whereas with the proposed approach going from 1 container to
> > > more would require switching the runtime mode, e.g. this is completely
> > not
> > > the case with kafka streams apps -- you just scale them at will and I
> > don't
> > > see why connectors should behave differently.
> > >
> > > Your points regarding the config/status topics are also true: the
> config
> > > topic seems irrelevant assuming connector configuration is defined
> > > statically inside the container and the status topic could _probably_
> > also
> > > be dropped and the functionality implemented via the consumer group
> > > protocol -- though I'm also just speculating here, I'll have to study
> the
> > > source code in more depth (maybe someone more familiar with the
> code-base
> > > could offer some insight?).
> > >
> > > Regarding Rahul's questions as to why these topics are a problem, I can
> > > also re-iterate what Stephane said: it's a needless burden to manage.
> > > Though I'm pretty sure different connectors can use the same topics,
> it's
> > > still not nice, since if you're using ACLs to control access, all
> > > connectors will have to be granted access to these topics and there can
> > > potentially be trouble from misbehaving/malicious connectors. Also, I
> > don't
> > > think these topics offer any advantages when it comes to
> centralization:
> > > the only reasons they exist is because the distributed mode has no
> other
> > > way to store information apart from inside kafka topics, whereas if
> > you're
> > > running connectors in containers on kubernetes, you'd store this
> > > configuration in the image/env-vars/configmaps or some other mechanism
> > you
> > > use, the point being that this once again becomes a concern for the
> > > container platform -- once again, just like with Kafka Streams.
> > >
> > > Given all of the above, I'm now inclined to either extend the
> standalone
> > > mode and enable scaling/offset storage in kafka OR better yet,
> introduce
> > a
> > > new runtime mode ("container"? not sure what should it be called). The
> > key
> > > points of this runtime mode would be:
> > >
> > >  - offset storage in a kafka topic
> > >  - only one statically configured connector launched during startup
> > >  - scaling happens via the consumer group protocol
> > >  - only a lightweight "health" REST API that simply informs if the
> > > connector is running
> > >
> > > Obviously this would extend the scope of the KIP, but I'd be willing to
> > > give this a shot.
> > > Waiting for your feedback, once a more clear vision is in place I could
> > > update the KIP.
> > >
> > > Thanks
> > >
> > > 2018-05-17 13:17 GMT+03:00 Stephane Maarek <
> steph...@simplemachines.com.
> > au
> > > >:
> > >
> > > > Hi Salius
> > > >
> > > > I think you're on the money, but you're not pushing things too far.
> > > > This is something I've hoped for a long time.
> > > > Let's talk Kafka Connect v2
> > > >
> > > > Kafka Connect Cluster, as you said, are not convenient to work with
> > (the
> > > > KIP details drawbacks well). I'm all about containerisation just like
> > > > stream apps support (and boasts!).
> > > >
> > > > Now, here's the problem with Kafka Connect. There are three backing
> > > topics.
> > > > Here's the analysis of how they can evolve:
> > > > - Config topic: this one is irrelevant if each connect cluster comes
> > > with a
> > > > config bundled with the corresponding JAR, as you mentioned in your
> KIP
> > > > - Status topic: this is something I wish was gone too. The consumers
> > > have a
> > > > coordinator, and I believe the connect workers should have a
> > coordinator
> > > > too, for task rebalancing.
> > > > - Source Offset topic: only relevant for sources. I wish there was a
> > > > __connect_offsets global topic just like for consumers and an
> > > > "ConnectOffsetCoordinator" to talk to to retrieve latest committed
> > > offset.
> > > >
> > > > If we look above, with a few back-end fundamental transformations, we
> > can
> > > > probably make Connect "cluster-less".
> > > >
> > > > What the community would get out of it is huge:
> > > > - Connect workers for a specific connector are independent and
> > isolated,
> > > > measurable (in CPU and Mem) and auto-scalable
> > > > - CI/CD is super easy to integrate, as it's just another container /
> > jar.
> > > > - You can roll restart a specific connector and upgrade a JAR without
> > > > interrupting your other connectors and while keeping the current
> > > connector
> > > > from running.
> > > > - The topics backing connect are removed except the global one, which
> > > > allows you to scale easily in terms of number of connectors
> > > > - Running a connector in dev or prod (for people offering connectors)
> > is
> > > as
> > > > easy as doing a simple "docker run".
> > > > - Each consumer / producer settings can be configured at the
> container
> > > > level.
> > > > - Each connect process is immutable in configuration.
> > > > - Each connect process has its own security identity (right now, you
> > > need a
> > > > connect cluster per service role, which is a lot of overhead in terms
> > of
> > > > backing topic)
> > > >
> > > > Now, I don't have the Kafka expertise to know exactly which changes
> to
> > > make
> > > > in the code, but I believe the final idea is achievable.
> > > > The change would be breaking for how Kafka Connect is run, but I
> think
> > > > there's a chance to make the change non breaking to how Connect is
> > > > programmed. I believe the same public API framework can be used.
> > > >
> > > > Finally, the REST API can be used for monitoring, or the JMX metrics
> as
> > > > usual.
> > > >
> > > > I may be completely wrong, but I would see such a change drive the
> > > > utilisation, management of Connect by a lot while lowering the
> barrier
> > to
> > > > adoption.
> > > >
> > > > This change may be big to implement but probably worthwhile. I'd be
> > happy
> > > > to provide more "user feedback" on a PR, but probably won't be able
> to
> > > > implement a PR myself.
> > > >
> > > > More than happy to discuss this
> > > >
> > > > Best,
> > > > Stephane
> > > >
> > > >
> > > > Kind regards,
> > > > Stephane
> > > >
> > > > [image: Simple Machines]
> > > >
> > > > Stephane Maarek | Developer
> > > >
> > > > +61 416 575 980
> > > > steph...@simplemachines.com.au
> > > > simplemachines.com.au
> > > > Level 2, 145 William Street, Sydney NSW 2010
> > > >
> > > > On 17 May 2018 at 14:42, Saulius Valatka <saulius...@gmail.com>
> wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > the only real usecase for the REST interface I can see is providing
> > > > > health/liveness checks for mesos/kubernetes. It's also true that
> the
> > > API
> > > > > can be left as is and e.g. not exposed publicly on the platform
> > level,
> > > > but
> > > > > this would still leave opportunities to accidentally mess something
> > up
> > > > > internally, so it's mostly a safety concern.
> > > > >
> > > > > Regarding the option renaming: I agree that it's not necessary as
> > it's
> > > > not
> > > > > clashing with anything, my reasoning is that assuming some other
> > offset
> > > > > storage appears in the future, having all config properties at the
> > root
> > > > > level of offset.storage.* _MIGHT_ introduce clashes in the future,
> so
> > > > this
> > > > > is just a suggestion for introducing a convention of
> > > > > offset.storage.<store>.<properties>, which the existing
> > > > > property offset.storage.file.filename already adheres to. But in
> > > general,
> > > > > yes -- this can be left as is.
> > > > >
> > > > >
> > > > >
> > > > > 2018-05-17 1:20 GMT+03:00 Jakub Scholz <ja...@scholz.cz>:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > What do you plan to use the read-only REST interface for? Is
> there
> > > > > > something what you cannot get through metrics interface?
> Otherwise
> > it
> > > > > might
> > > > > > be easier to just disable the REST interface (either in the code,
> > or
> > > > just
> > > > > > on the platform level - e.g. in Kubernetes).
> > > > > >
> > > > > > Also, I do not know what is the usual approach in Kafka ... but
> do
> > we
> > > > > > really have to rename the offset.storage.* options? The current
> > names
> > > > do
> > > > > > not seem to have any collision with what you are adding and they
> > > would
> > > > > get
> > > > > > "out of sync" with the other options used in connect
> > > (status.storage.*
> > > > > and
> > > > > > config.storage.*). So it seems a bit unnecessary change to me.
> > > > > >
> > > > > > Jakub
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, May 16, 2018 at 10:10 PM Saulius Valatka <
> > > saulius...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I'd like to start a discussion on the following KIP:
> > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 304%3A+Connect+runtime+mode+improvements+for+container+platforms
> > > > > > >
> > > > > > > Basically the idea is to make it easier to run separate
> instances
> > > of
> > > > > > Kafka
> > > > > > > Connect hosting isolated connectors on container platforms such
> > as
> > > > > Mesos
> > > > > > or
> > > > > > > Kubernetes.
> > > > > > >
> > > > > > > In particular it would be interesting to hear opinions about
> the
> > > > > proposed
> > > > > > > read-only REST API mode, more specifically I'm concerned about
> > the
> > > > > > > possibility to implement it in distributed mode as it appears
> the
> > > > > > framework
> > > > > > > is using it internally (
> > > > > > >
> > > > > > > https://github.com/apache/kafka/blob/trunk/connect/
> > > > > > runtime/src/main/java/org/apache/kafka/connect/runtime/
> > > > > > distributed/DistributedHerder.java#L1019
> > > > > > > ),
> > > > > > > however this particular API method appears to be
> undocumented(?).
> > > > > > >
> > > > > > > Looking forward for your feedback.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to