Re: [VOTE] KIP-891: Running multiple versions of Connector plugins

2024-10-07 Thread Ashwin
Hi Snehasis,

Thanks for the KIP - +1 (non-binding)

I just had a question regarding the sample config in
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=235834793#KIP891:RunningmultipleversionsofConnectorplugins-Configuration

>  "connector.version": "3.8"
> "transforms.flatten-old.version": "3.8.0"

How will the feature be implemented so that first config treated as an
exact version requirement, while the second one is a
"use-only-if-you-have-it" version ?

Thanks,
Ashwin


On Tue, Oct 8, 2024 at 12:19 AM Chris Egerton 
wrote:

> Hi Snehashis,
>
> Thanks for the KIP. I'm +1 (binding) but have one more non-blocking thought
> I wanted to share.
>
> On the off chance that an existing plugin is designed to accept a "version"
> property, could we either 1) keep passing that property to plugins instead
> of stripping it, or 2) rename our new property to something like
> "plugin.version"?
>
> Feel free to close the vote (if/when it gets a third binding+1) without
> addressing this if you believe the tradeoffs with the existing design are
> superior.
>
> Cheers,
>
> Chris
>
> On Mon, Oct 7, 2024, 11:22 Greg Harris 
> wrote:
>
> > Hey Snehashis,
> >
> > Thanks for the KIP! +1 (binding)
> >
> > Greg
> >
> > On Fri, Oct 4, 2024 at 10:14 PM Snehashis 
> > wrote:
> >
> > > Hi everyone
> > >
> > > I would like to call a vote for KIP-891. Please take a moment to review
> > the
> > > proposal and submit your vote. Special thanks to Greg who helped to
> > expand
> > > this to make it much more broadly useful, and everyone else who
> > > participated in both discussion threads.
> > >
> > > KIP
> > > KIP-891: Running multiple versions of Connector plugins - Apache Kafka
> -
> > > Apache Software Foundation
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+Connector+plugins
> > > >
> > >
> > > Thanks
> > > Snehashis
> > >
> >
>


Re: [VOTE] KIP-995: Allow users to specify initial offsets while creating connectors

2024-03-11 Thread Ashwin
Thanks Folks,

Renamed the field to `offsets_status`.

And now that we have 3 binding votes, I will update the KIP status to
`accepted` .

Thanks again for all the valuable feedback.

Ashwin

On Wed, Mar 6, 2024 at 2:42 PM Chris Egerton 
wrote:

> Hi Yash,
>
> Thanks for the follow-up, I like the benefits it's yielded. I too think
> "offsets_status" would be a better name for the response field.
> @Ashwin--thoughts?
>
> Cheers,
>
> Chris
>
>
> On Wed, Mar 6, 2024, 03:08 Ashwin  wrote:
>
> > Thanks Yash,
> >
> > Yes , I think we can use @JsonInclude(JsonInclude.Include.NON_NULL) to
> > exclude “initial_offsets_response” from the create response if offset is
> > not specified.
> >
> > I’ll close the voting this week , if there are no further comments.
> >
> > Thanks for voting, everyone!
> >
> >
> > Ashwin
> >
> > On Tue, Mar 5, 2024 at 11:20 PM Yash Mayya  wrote:
> >
> > > Hi Chris,
> > >
> > > I followed up with Ashwin offline and I believe he wanted to take a
> > closer
> > > look at the `ConnectorInfoWithInitialOffsetsResponse` stuff he
> mentioned
> > in
> > > the previous email and whether or not that'll be required
> (alternatively
> > > using some Jackson JSON tricks). However, that's an implementation
> detail
> > > and shouldn't hold up the KIP. Bikeshedding a little on the
> > > "initial_offsets_response" field - I'm wondering if something like
> > > "offsets_status" might be more appropriate, what do you think? I don't
> > > think the current name is terrible though, so I'm +1 (binding) if
> > everyone
> > > else agrees that it's suitable.
> > >
> > > Thanks,
> > > Yash
> > >
> > > On Tue, Mar 5, 2024 at 9:51 PM Chris Egerton 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Wanted to bump this and see if it looks good enough for a third vote.
> > > Yash,
> > > > any thoughts?
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Mon, Jan 29, 2024 at 2:55 AM Ashwin  >
> > > > wrote:
> > > >
> > > > > Thanks for reviewing this KIP,  Yash.
> > > > >
> > > > > Could you please elaborate on the cleanup steps? For instance, if
> we
> > > > > > encounter an error after wiping existing offsets but before
> writing
> > > the
> > > > > new
> > > > > > offsets, there's not really any good way to "revert" the wiped
> > > offsets.
> > > > > > It's definitely extremely unlikely that a user would expect the
> > > > previous
> > > > > > offsets for a connector to still be present (by creating a new
> > > > connector
> > > > > > with the same name but without initial offsets for instance)
> after
> > > > such a
> > > > > > failed operation, but it would still be good to call this out
> > > > > explicitly. I
> > > > > > presume that we'd want to wipe the newly written initial offsets
> if
> > > we
> > > > > fail
> > > > > > while writing the connector's config however?
> > > > >
> > > > >
> > > > > Agree - I have clarified the cleanup here -
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-995%3A+Allow+users+to+specify+initial+offsets+while+creating+connectors#KIP995:Allowuserstospecifyinitialoffsetswhilecreatingconnectors-ProposedChanges
> > > > > .
> > > > >
> > > > > The `PATCH /connectors/{connector}/offsets` and `DELETE
> > > > > > /connectors/{connector}/offsets` endpoints have two possible
> > success
> > > > > > messages in the response depending on whether or not the
> connector
> > > > plugin
> > > > > > has implemented the `alterOffsets` connector method. Since we're
> > > > > proposing
> > > > > > to utilize the same offset validation during connector creation
> if
> > > > > initial
> > > > > > offsets are specified, I think it would be valuable to surface
> > > similar
> > > > > > information to users here as well
> > > > >
> > >

Re: [VOTE] KIP-995: Allow users to specify initial offsets while creating connectors

2024-03-05 Thread Ashwin
Thanks Yash,

Yes , I think we can use @JsonInclude(JsonInclude.Include.NON_NULL) to
exclude “initial_offsets_response” from the create response if offset is
not specified.

I’ll close the voting this week , if there are no further comments.

Thanks for voting, everyone!


Ashwin

On Tue, Mar 5, 2024 at 11:20 PM Yash Mayya  wrote:

> Hi Chris,
>
> I followed up with Ashwin offline and I believe he wanted to take a closer
> look at the `ConnectorInfoWithInitialOffsetsResponse` stuff he mentioned in
> the previous email and whether or not that'll be required (alternatively
> using some Jackson JSON tricks). However, that's an implementation detail
> and shouldn't hold up the KIP. Bikeshedding a little on the
> "initial_offsets_response" field - I'm wondering if something like
> "offsets_status" might be more appropriate, what do you think? I don't
> think the current name is terrible though, so I'm +1 (binding) if everyone
> else agrees that it's suitable.
>
> Thanks,
> Yash
>
> On Tue, Mar 5, 2024 at 9:51 PM Chris Egerton 
> wrote:
>
> > Hi all,
> >
> > Wanted to bump this and see if it looks good enough for a third vote.
> Yash,
> > any thoughts?
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Jan 29, 2024 at 2:55 AM Ashwin 
> > wrote:
> >
> > > Thanks for reviewing this KIP,  Yash.
> > >
> > > Could you please elaborate on the cleanup steps? For instance, if we
> > > > encounter an error after wiping existing offsets but before writing
> the
> > > new
> > > > offsets, there's not really any good way to "revert" the wiped
> offsets.
> > > > It's definitely extremely unlikely that a user would expect the
> > previous
> > > > offsets for a connector to still be present (by creating a new
> > connector
> > > > with the same name but without initial offsets for instance) after
> > such a
> > > > failed operation, but it would still be good to call this out
> > > explicitly. I
> > > > presume that we'd want to wipe the newly written initial offsets if
> we
> > > fail
> > > > while writing the connector's config however?
> > >
> > >
> > > Agree - I have clarified the cleanup here -
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-995%3A+Allow+users+to+specify+initial+offsets+while+creating+connectors#KIP995:Allowuserstospecifyinitialoffsetswhilecreatingconnectors-ProposedChanges
> > > .
> > >
> > > The `PATCH /connectors/{connector}/offsets` and `DELETE
> > > > /connectors/{connector}/offsets` endpoints have two possible success
> > > > messages in the response depending on whether or not the connector
> > plugin
> > > > has implemented the `alterOffsets` connector method. Since we're
> > > proposing
> > > > to utilize the same offset validation during connector creation if
> > > initial
> > > > offsets are specified, I think it would be valuable to surface
> similar
> > > > information to users here as well
> > >
> > >
> > > Thanks for pointing this out. I have updated the response to include a
> > new
> > > field “initial_offsets_response” which will contain the response based
> on
> > > whether the connector implements alterOffsets or not. This also means
> > that
> > > if initial_offsets is set in the ConnectorCreate request, we will
> return
> > a
> > > new REST entity (ConnectorInfoWithInitialOffsetsResponse ?) which will
> > be a
> > > child class of ConnectorInfo.
> > >
> > > (
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java#L28-L28
> > > )
> > >
> > > Thanks,
> > > Ashwin
> > >
> > > On Wed, Jan 17, 2024 at 4:48 PM Yash Mayya 
> wrote:
> > >
> > > > Hi Ashwin,
> > > >
> > > > Thanks for the KIP.
> > > >
> > > > > If Connect runtime encounters an error in any of these steps,
> > > > > it will cleanup (if required) and return an error response
> > > >
> > > > Could you please elaborate on the cleanup steps? For instance, if we
> > > > encounter an error after wiping existing offsets but before writing
> the
> > > new
> > > > offsets, there's not really any g

Re: [VOTE] KIP-995: Allow users to specify initial offsets while creating connectors

2024-01-28 Thread Ashwin
Thanks for reviewing this KIP,  Yash.

Could you please elaborate on the cleanup steps? For instance, if we
> encounter an error after wiping existing offsets but before writing the new
> offsets, there's not really any good way to "revert" the wiped offsets.
> It's definitely extremely unlikely that a user would expect the previous
> offsets for a connector to still be present (by creating a new connector
> with the same name but without initial offsets for instance) after such a
> failed operation, but it would still be good to call this out explicitly. I
> presume that we'd want to wipe the newly written initial offsets if we fail
> while writing the connector's config however?


Agree - I have clarified the cleanup here -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-995%3A+Allow+users+to+specify+initial+offsets+while+creating+connectors#KIP995:Allowuserstospecifyinitialoffsetswhilecreatingconnectors-ProposedChanges
.

The `PATCH /connectors/{connector}/offsets` and `DELETE
> /connectors/{connector}/offsets` endpoints have two possible success
> messages in the response depending on whether or not the connector plugin
> has implemented the `alterOffsets` connector method. Since we're proposing
> to utilize the same offset validation during connector creation if initial
> offsets are specified, I think it would be valuable to surface similar
> information to users here as well


Thanks for pointing this out. I have updated the response to include a new
field “initial_offsets_response” which will contain the response based on
whether the connector implements alterOffsets or not. This also means that
if initial_offsets is set in the ConnectorCreate request, we will return a
new REST entity (ConnectorInfoWithInitialOffsetsResponse ?) which will be a
child class of ConnectorInfo.

(
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorInfo.java#L28-L28
)

Thanks,
Ashwin

On Wed, Jan 17, 2024 at 4:48 PM Yash Mayya  wrote:

> Hi Ashwin,
>
> Thanks for the KIP.
>
> > If Connect runtime encounters an error in any of these steps,
> > it will cleanup (if required) and return an error response
>
> Could you please elaborate on the cleanup steps? For instance, if we
> encounter an error after wiping existing offsets but before writing the new
> offsets, there's not really any good way to "revert" the wiped offsets.
> It's definitely extremely unlikely that a user would expect the previous
> offsets for a connector to still be present (by creating a new connector
> with the same name but without initial offsets for instance) after such a
> failed operation, but it would still be good to call this out explicitly. I
> presume that we'd want to wipe the newly written initial offsets if we fail
> while writing the connector's config however?
>
> > Validate the offset using the same checks performed while
> > altering connector offsets (PATCH /$connector/offsets ) as
> > specified in KIP-875
>
> The `PATCH /connectors/{connector}/offsets` and `DELETE
> /connectors/{connector}/offsets` endpoints have two possible success
> messages in the response depending on whether or not the connector plugin
> has implemented the `alterOffsets` connector method. Since we're proposing
> to utilize the same offset validation during connector creation if initial
> offsets are specified, I think it would be valuable to surface similar
> information to users here as well. Thoughts?
>
> Thanks,
> Yash
>
> On Wed, Jan 17, 2024 at 3:31 PM Ashwin 
> wrote:
>
> > Hi All ,
> >
> > Can I please get one more binding vote, so that the KIP is approved ?
> > Thanks for the votes Chris and Mickael !
> >
> >
> > - Ashwin
> >
> >
> > On Thu, Jan 11, 2024 at 3:55 PM Mickael Maison  >
> > wrote:
> >
> > > Hi Ashwin,
> > >
> > > +1 (binding), thanks for the KIP
> > >
> > > Mickael
> > >
> > > On Tue, Jan 9, 2024 at 4:54 PM Chris Egerton 
> > > wrote:
> > > >
> > > > Thanks for the KIP! +1 (binding)
> > > >
> > > > On Mon, Jan 8, 2024 at 9:35 AM Ashwin 
> > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I would like to start  a vote on KIP-995.
> > > > >
> > > > >
> > > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-995%3A+Allow+users+to+specify+initial+offsets+while+creating+connectors
> > > > >
> > > > > Discussion thread -
> > > > > https://lists.apache.org/thread/msorbr63scglf4484yq764v7klsj7c4j
> > > > >
> > > > > Thanks!
> > > > >
> > > > > Ashwin
> > > > >
> > >
> >
>


Re: [VOTE] KIP-995: Allow users to specify initial offsets while creating connectors

2024-01-17 Thread Ashwin
Hi All ,

Can I please get one more binding vote, so that the KIP is approved ?
Thanks for the votes Chris and Mickael !


- Ashwin


On Thu, Jan 11, 2024 at 3:55 PM Mickael Maison 
wrote:

> Hi Ashwin,
>
> +1 (binding), thanks for the KIP
>
> Mickael
>
> On Tue, Jan 9, 2024 at 4:54 PM Chris Egerton 
> wrote:
> >
> > Thanks for the KIP! +1 (binding)
> >
> > On Mon, Jan 8, 2024 at 9:35 AM Ashwin 
> wrote:
> >
> > > Hi All,
> > >
> > > I would like to start  a vote on KIP-995.
> > >
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-995%3A+Allow+users+to+specify+initial+offsets+while+creating+connectors
> > >
> > > Discussion thread -
> > > https://lists.apache.org/thread/msorbr63scglf4484yq764v7klsj7c4j
> > >
> > > Thanks!
> > >
> > > Ashwin
> > >
>


[VOTE] KIP-995: Allow users to specify initial offsets while creating connectors

2024-01-08 Thread Ashwin
Hi All,

I would like to start  a vote on KIP-995.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-995%3A+Allow+users+to+specify+initial+offsets+while+creating+connectors

Discussion thread -
https://lists.apache.org/thread/msorbr63scglf4484yq764v7klsj7c4j

Thanks!

Ashwin


Re: [DISCUSS] KIP-995: Allow users to specify initial offsets while creating connectors

2024-01-03 Thread Ashwin
Thanks Chris,

It makes sense to call out the cleaning up of existing offsets in the KIP.
I have made the change and will initiate the voting in a day.

Regards,
Ashwin

On Tue, Dec 12, 2023 at 6:57 PM Chris Egerton 
wrote:

> Hi Ashwin,
>
> LGTM! One small adjustment I'd suggest but we don't have to block on--it
> may be clearer to put "Wipe all existing offsets for the connector" in
> between steps 2 and 3 of the proposed changes section.
>
> Cheers,
>
> Chris
>
> On Mon, Dec 11, 2023 at 11:24 PM Ashwin 
> wrote:
>
> > Thanks for pointing this out Chris.
> >
> > I have updated the KIP with the correct sequence of steps.
> >
> > Thanks,
> > Ashwin
> >
> > On Wed, Dec 6, 2023 at 11:48 PM Chris Egerton 
> > wrote:
> >
> > > Hi Ashwin,
> > >
> > > Regarding point 4--I think this is still a bit unwise. When workers
> pick
> > up
> > > a new connector config from the config topic, they participate in a
> > > rebalance. It may be safe to write offsets during that rebalance, but
> in
> > > the name of simplicity, do you think we can write the offsets for the
> > > connector before its config? The sequence of steps would be something
> > like
> > > this:
> > >
> > > 1. Validate offsets and connector config (can be done in any order)
> > > 2. Write offsets
> > > 3. Write connector config (with whatever initial state is specified in
> > the
> > > request, or the default if none is specified)
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Wed, Dec 6, 2023 at 9:13 AM Ashwin 
> > > wrote:
> > >
> > > > Hello Chris,
> > > >
> > > > Thanks for the quick and detailed review. Please see my responses
> below
> > > >
> > > > High-level thoughts:
> > > >
> > > > 1. I had not thought of this till now, thanks for pointing it out. I
> > > > would lean towards the second option of cleaning previous offsets as
> > > > it will result in the fewer surprises for the user.
> > > >
> > > > 2 and 3. I agree and have updated the wiki to that effect. I just
> > > > wanted to use the connector name as a mutex - we can handle the race
> > > > in other ways.
> > > >
> > > > 4. Yes, I meant submitting config to the config topic. Have updated
> > > > the wiki to make it clearer.
> > > >
> > > >
> > > > Nits:
> > > >
> > > > Thanks for pointing these - I have made the necessary changes.
> > > >
> > > >
> > > > Thanks again,
> > > >
> > > > Ashwin
> > > >
> > > >
> > > > On Mon, Dec 4, 2023 at 9:05 PM Chris Egerton  >
> > > > wrote:
> > > >
> > > > > Hi Ashwin,
> > > > >
> > > > > Thanks for the KIP! This would be a nice simplification to the
> > process
> > > > for
> > > > > migrating connectors enabled by KIP-980, and would also add global
> > > > support
> > > > > for a feature I've seen implemented by hand in at least a couple
> > > > connectors
> > > > > over the years.
> > > > >
> > > > >
> > > > > High-level thoughts:
> > > > >
> > > > > 1. If there are leftover offsets from a previous connector, what
> will
> > > the
> > > > > impact on those offsets (if any) be if a new connector is created
> > with
> > > > the
> > > > > same name with initial offsets specified? I can think of at least
> two
> > > > > options: we leave those offsets as-are but allow any of the initial
> > > > offsets
> > > > > in the new connector request to overwrite them, or we automatically
> > > wipe
> > > > > all existing offsets for the connector first before writing its
> > initial
> > > > > offsets and then creating it. I have a slight preference for the
> > first
> > > > > because it's simpler to implement and aligns with existing
> precedent
> > > for
> > > > > offset handling where we never wipe them unless explicitly
> requested
> > by
> > > > the
> > > > > user or connector, but it could be argued that the second is less
> > > likely
> > > > to
> > > > > generate footguns

Re: [DISCUSS] KIP-995: Allow users to specify initial offsets while creating connectors

2023-12-11 Thread Ashwin
Thanks for pointing this out Chris.

I have updated the KIP with the correct sequence of steps.

Thanks,
Ashwin

On Wed, Dec 6, 2023 at 11:48 PM Chris Egerton 
wrote:

> Hi Ashwin,
>
> Regarding point 4--I think this is still a bit unwise. When workers pick up
> a new connector config from the config topic, they participate in a
> rebalance. It may be safe to write offsets during that rebalance, but in
> the name of simplicity, do you think we can write the offsets for the
> connector before its config? The sequence of steps would be something like
> this:
>
> 1. Validate offsets and connector config (can be done in any order)
> 2. Write offsets
> 3. Write connector config (with whatever initial state is specified in the
> request, or the default if none is specified)
>
> Cheers,
>
> Chris
>
> On Wed, Dec 6, 2023 at 9:13 AM Ashwin 
> wrote:
>
> > Hello Chris,
> >
> > Thanks for the quick and detailed review. Please see my responses below
> >
> > High-level thoughts:
> >
> > 1. I had not thought of this till now, thanks for pointing it out. I
> > would lean towards the second option of cleaning previous offsets as
> > it will result in the fewer surprises for the user.
> >
> > 2 and 3. I agree and have updated the wiki to that effect. I just
> > wanted to use the connector name as a mutex - we can handle the race
> > in other ways.
> >
> > 4. Yes, I meant submitting config to the config topic. Have updated
> > the wiki to make it clearer.
> >
> >
> > Nits:
> >
> > Thanks for pointing these - I have made the necessary changes.
> >
> >
> > Thanks again,
> >
> > Ashwin
> >
> >
> > On Mon, Dec 4, 2023 at 9:05 PM Chris Egerton 
> > wrote:
> >
> > > Hi Ashwin,
> > >
> > > Thanks for the KIP! This would be a nice simplification to the process
> > for
> > > migrating connectors enabled by KIP-980, and would also add global
> > support
> > > for a feature I've seen implemented by hand in at least a couple
> > connectors
> > > over the years.
> > >
> > >
> > > High-level thoughts:
> > >
> > > 1. If there are leftover offsets from a previous connector, what will
> the
> > > impact on those offsets (if any) be if a new connector is created with
> > the
> > > same name with initial offsets specified? I can think of at least two
> > > options: we leave those offsets as-are but allow any of the initial
> > offsets
> > > in the new connector request to overwrite them, or we automatically
> wipe
> > > all existing offsets for the connector first before writing its initial
> > > offsets and then creating it. I have a slight preference for the first
> > > because it's simpler to implement and aligns with existing precedent
> for
> > > offset handling where we never wipe them unless explicitly requested by
> > the
> > > user or connector, but it could be argued that the second is less
> likely
> > to
> > > generate footguns for users. Interested in your thoughts!
> > >
> > > 2. IMO preflight validation (i.e., the "Validate initial_offset before
> > > creating the connector in STOPPED state rejected alternative) is a
> > > must-have for this kind of feature. I acknowledge the possible race
> > > condition (and I don't think it's worth it to track in-flight connector
> > > creation requests in the herder in order to prevent this race, since
> > > ever-blocking Connector operations would be cumbersome to deal with),
> and
> > > the extra implementation effort. But I don't think either of those tip
> > the
> > > scales far enough to override the benefit of ensuring the submitted
> > offsets
> > > are valid before creating the connector.
> > >
> > > 3. On the topic of preflight validation--I also think it's important
> that
> > > we validate both the connector config and the initial offsets before
> > either
> > > creating the connector or storing the initial offsets. I don't think
> this
> > > point requires any changes to the KIP that aren't already proposed with
> > > point 2. above, but wanted to see if we could adopt this as a primary
> > goal
> > > for the design of the feature and keep it in mind with future changes.
> > >
> > > 4. In the proposed changes section, the first step is "Create a
> connector
> > > in STOPPED state", and the second step is "Validate the offs

Re: [DISCUSS] KIP-995: Allow users to specify initial offsets while creating connectors

2023-12-06 Thread Ashwin
Hello Chris,

Thanks for the quick and detailed review. Please see my responses below

High-level thoughts:

1. I had not thought of this till now, thanks for pointing it out. I
would lean towards the second option of cleaning previous offsets as
it will result in the fewer surprises for the user.

2 and 3. I agree and have updated the wiki to that effect. I just
wanted to use the connector name as a mutex - we can handle the race
in other ways.

4. Yes, I meant submitting config to the config topic. Have updated
the wiki to make it clearer.


Nits:

Thanks for pointing these - I have made the necessary changes.


Thanks again,

Ashwin


On Mon, Dec 4, 2023 at 9:05 PM Chris Egerton 
wrote:

> Hi Ashwin,
>
> Thanks for the KIP! This would be a nice simplification to the process for
> migrating connectors enabled by KIP-980, and would also add global support
> for a feature I've seen implemented by hand in at least a couple connectors
> over the years.
>
>
> High-level thoughts:
>
> 1. If there are leftover offsets from a previous connector, what will the
> impact on those offsets (if any) be if a new connector is created with the
> same name with initial offsets specified? I can think of at least two
> options: we leave those offsets as-are but allow any of the initial offsets
> in the new connector request to overwrite them, or we automatically wipe
> all existing offsets for the connector first before writing its initial
> offsets and then creating it. I have a slight preference for the first
> because it's simpler to implement and aligns with existing precedent for
> offset handling where we never wipe them unless explicitly requested by the
> user or connector, but it could be argued that the second is less likely to
> generate footguns for users. Interested in your thoughts!
>
> 2. IMO preflight validation (i.e., the "Validate initial_offset before
> creating the connector in STOPPED state rejected alternative) is a
> must-have for this kind of feature. I acknowledge the possible race
> condition (and I don't think it's worth it to track in-flight connector
> creation requests in the herder in order to prevent this race, since
> ever-blocking Connector operations would be cumbersome to deal with), and
> the extra implementation effort. But I don't think either of those tip the
> scales far enough to override the benefit of ensuring the submitted offsets
> are valid before creating the connector.
>
> 3. On the topic of preflight validation--I also think it's important that
> we validate both the connector config and the initial offsets before either
> creating the connector or storing the initial offsets. I don't think this
> point requires any changes to the KIP that aren't already proposed with
> point 2. above, but wanted to see if we could adopt this as a primary goal
> for the design of the feature and keep it in mind with future changes.
>
> 4. In the proposed changes section, the first step is "Create a connector
> in STOPPED state", and the second step is "Validate the offset...". What
> exactly is entailed by "Create a connector"? Submit the config to the
> config topic (presumably after a preflight validation of the config)?
> Participate in the ensuing rebalance? I'm a little hesitant to start
> performing rebalances inside REST requests, wondering if we can find a
> lighter-weight way to implement this.
>
>
> Nits:
>
> 5. You can remove the italicized "This page is meant as a template for
> writing a KIP" section after the table of contents.
>
> 6. Can you file a JIRA ticket for this change and add a link to it in the
> KIP under the status section?
>
> 7. What do you think about changing the name for the new field from
> "initial_offset" (singular) to "initial_offsets" (plural)? This is
> especially relevant for connectors that read from multiple source
> partitions, like MM2 and the various JDBC source connectors out there.
>
> 8. IMO it's not necessary to include this section: "Please note that sink
> and source connectors have different schemas for offset." While it's
> technically true that the fields of the objects inside the "partition" and
> "offset" fields will likely differ between sink and source connectors,
> they'll also likely differ in the exact same way across different sink
> connectors. I think it's enough to link to the relevant section in KIP-875
> on the details for the format.
>
> 9. Instead of "Connector-defined source partition" and "Connector-defined
> source offset" in the comments for the sample connector creation body
> (which aren't strictly accurate for sink connec

[jira] [Created] (KAFKA-15976) KIP-995: Allow users to specify initial offsets while creating connectors

2023-12-06 Thread Ashwin Pankaj (Jira)
Ashwin Pankaj created KAFKA-15976:
-

 Summary: KIP-995: Allow users to specify initial offsets while 
creating connectors
 Key: KAFKA-15976
 URL: https://issues.apache.org/jira/browse/KAFKA-15976
 Project: Kafka
  Issue Type: Improvement
  Components: connect
Reporter: Ashwin Pankaj
Assignee: Ashwin Pankaj


Allow setting the initial offset for a connector in the connector creation API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-995: Allow users to specify initial offsets while creating connectors

2023-11-29 Thread Ashwin
Hi all,

I'd like to begin discussion on KIP-995 which proposes to allow users
to specify initial offset as part of the request to create a connector

https://cwiki.apache.org/confluence/display/KAFKA/KIP-995%3A+Allow+users+to+specify+initial+offsets+while+creating+connectors

During the discussion for KIP-980
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state>,
which proposed the creation of connectors in STOPPED state, there was
a suggestion to also allow setting the initial offset for a connector
in the connector creation API. The proposal was deemed valid
<https://lists.apache.org/thread/tq79l3975hy69wsycvwoh9n8gbp5j26f>
(point no.4) but was deferred to a future KIP. This KIP proposes to
implement that change and builds upon the changes introduced in
KIP-875 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Alteringoffsets(request)>
and KIP-980 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state>.

Thanks,
Ashwin


Re: [DISCUSS] KIP-1006: Remove SecurityManager Support

2023-11-20 Thread Ashwin
Hi Greg,

Thanks for writing this KIP.
I agree with you that handling this now will help us react to the
deprecation of SecurityManager, whenever it happens.

I had a question regarding how we deprecate JDKs supported by Apache Kafka.
When we drop support for JDK 17, will we set the “-target” option of Javac
such that the resulting JARs will not load in JVMs which are lesser than or
equal to that version ?

Thanks,
Ashwin


On Tue, Nov 21, 2023 at 6:18 AM Greg Harris 
wrote:

> Hi all,
>
> I'd like to invite you all to discuss removing SecurityManager support
> from Kafka. This affects the client and server SASL mechanism, Tiered
> Storage, and Connect classloading.
>
> Find the KIP here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support
>
> I think this is a "code higiene" effort that doesn't need to be dealt
> with urgently, but it would prevent a lot of headache later when Java
> does decide to remove support.
>
> If you are currently using the SecurityManager with Kafka, I'd really
> appreciate hearing how you're using it, and how you're planning around
> its removal.
>
> Thanks!
> Greg Harris
>


Re: [DISCUSS] KIP-967: Support custom SSL configuration for Kafka Connect RestServer

2023-10-09 Thread Ashwin
Hello Taras,

> Do you think that something needs to be corrected in KIP to make it more
understandable without PR? Do you have any advice?
Ha - no. I just wanted to thank you for sharing the PR which helped me as a
newbie.

> If I understood the question correctly:
I was referring to (and did not understand) the removal of L141
in clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
in https://github.com/apache/kafka/pull/14203/files.
But you are right - this can be discussed in the final PR.

> There might be a better place for this public static method that creates
the SslEngineFactory.
Yes, I think this class should be moved to something like `server-common`
module - but would like any of the committers to comment on this.

Thanks,
Ashwin


On Fri, Sep 29, 2023 at 9:22 PM Taras Ledkov  wrote:

> Hi Ashwin,
>
> Thanks a lot for your review.
>
> > Thanks for the KIP and the PR (which helped me understand the change).
> Do you think that something needs to be corrected in KIP to make it more
> understandable without PR? Do you have any advice?
>
> > I could not understand one thing though - In
> https://github.com/apache/kafka/pull/14203/,
> > why did you have to remove the code which sets sslEngineFactoryConfig in
> instantiateSslEngineFactory?
> If I understood the question correctly:
> I've refactored this method a bit.
> SslFactory#instantiateSslEngineFactory was a private not-static method.
> I've separated the code that really creates new instance of the
> SslEngineFactory and place it into a public static method. There might be a
> better place for this public static method that creates the
> SslEngineFactory. I think we will discuss this at the final PR. Current PR
> is just a demo / prototype to play.
>


Re: [DISCUSS] KIP-967: Support custom SSL configuration for Kafka Connect RestServer

2023-09-28 Thread Ashwin
Hi Taras,

Thanks for the KIP and the PR (which helped me understand the change).

This is a useful feature, change is small and reuses existing functionality
in clients/../SslFactory.java - so hopefully, this KIP will get accepted.

I could not understand one thing though - In
https://github.com/apache/kafka/pull/14203/,  why did you have to remove
the code which sets sslEngineFactoryConfig in instantiateSslEngineFactory ?

Thanks,
Ashwin

On Tue, Sep 26, 2023 at 6:39 PM Taras Ledkov  wrote:

> Hi Kafka Team.
>
> Ping...
>


Re: [ANNOUNCE] New committer: Yash Mayya

2023-09-21 Thread Ashwin
Awesome ! Congratulations Yash !!

On Thu, Sep 21, 2023 at 9:25 PM Edoardo Comar  wrote:

> Congratulations Yash
>
> On Thu, 21 Sept 2023 at 16:28, Bruno Cadonna  wrote:
> >
> > Hi all,
> >
> > The PMC of Apache Kafka is pleased to announce a new Kafka committer
> > Yash Mayya.
> >
> > Yash's major contributions are around Connect.
> >
> > Yash authored the following KIPs:
> >
> > KIP-793: Allow sink connectors to be used with topic-mutating SMTs
> > KIP-882: Kafka Connect REST API configuration validation timeout
> > improvements
> > KIP-970: Deprecate and remove Connect's redundant task configurations
> > endpoint
> > KIP-980: Allow creating connectors in a stopped state
> >
> > Overall, Yash is known for insightful and friendly input to discussions
> > and his high quality contributions.
> >
> > Congratulations, Yash!
> >
> > Thanks,
> >
> > Bruno (on behalf of the Apache Kafka PMC)
>


Re: [DISCUSS] KIP-980: Allow creating connectors in a stopped state

2023-09-20 Thread Ashwin
Thanks Yash! This is very useful for migrating connectors from one cluster
to another.

I had the following comments/questions

1. Is the offset read using `GET /offsets` api always guaranteed to be in a
format accepted by `PATCH /offsets` ?
2. I had to tackle a similar migration situation but the two connect
clusters in question were using the same backing Kafka cluster. The
challenge in this case is that when I delete the original connector, I want
to retain offsets and config topics. Do you think we should support
deletion of a connector without removal of these topics as part of this KIP
?
3. In the case of a downgrade, how will Connect worker handle the optional
“state” field in config topic ?

Thanks,
Ashwin




On Sun, Sep 17, 2023 at 11:09 PM Yash Mayya  wrote:

> Hi all,
>
> I'd like to begin discussion on a KIP to allow creating connectors in a
> stopped state -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-980%3A+Allow+creating+connectors+in+a+stopped+state
>
>
> Thanks,
> Yash
>


Re: KIP-976: Cluster-wide dynamic log adjustment for Kafka Connect

2023-09-06 Thread Ashwin
Hi Chris,

> I agree with Yash here: we can cross that bridge when we come to it,
unless
there are problems that we'd encounter then that would be better addressed
by adding request forwarding now. One potential argument I can think of is
that the UX would be a little strange if the semantics for this endpoint
differ depending on the value of the scope parameter (some values would be
affected by in-progress rebalances, and some would not), but I don't know
if making scope=cluster more brittle in the name of consistency with, e.g.,
scope=connectorType:foo is really a worthwhile tradeoff. Thoughts?

Yes, the inconsistency in UX was what drove me to comment. I understand
that this will add complexity (unnecessary for the current implementation)
and am fine with deferring it for later.


> Ah, great call! I love the new testing plan section. I also share Yash's
concerns about adding a new system test though (at this point, they're so
painful to write, test, and debug that in most circumstances I consider
them a last resort). Do you think it'd be reasonable to add end-to-end
verification for this with an integration test instead?

Sounds good to me. Honestly, I did not know that system tests are more
"heavy weight" than integration tests - so thanks for letting me know Yash
& Chris.

Cheers,
Ashwin

On Tue, Sep 5, 2023 at 8:51 PM Chris Egerton 
wrote:

> Hi all,
>
> Thank you so much for the generous review comments! Happy to see interest
> in this feature. Inline responses follow.
>
>
> Ashwin:
>
> > Don't you foresee a future scope type which may require cluster metadata
> ?
> In that case, isn't it better to forward the requests to the leader in the
> initial implementation ?
>
> I agree with Yash here: we can cross that bridge when we come to it, unless
> there are problems that we'd encounter then that would be better addressed
> by adding request forwarding now. One potential argument I can think of is
> that the UX would be a little strange if the semantics for this endpoint
> differ depending on the value of the scope parameter (some values would be
> affected by in-progress rebalances, and some would not), but I don't know
> if making scope=cluster more brittle in the name of consistency with, e.g.,
> scope=connectorType:foo is really a worthwhile tradeoff. Thoughts?
>
> > I would also recommend an additional system test for Standalone herder to
> ensure that the new scope parameter is honored and the response contains
> the last modified time.
>
> Ah, great call! I love the new testing plan section. I also share Yash's
> concerns about adding a new system test though (at this point, they're so
> painful to write, test, and debug that in most circumstances I consider
> them a last resort). Do you think it'd be reasonable to add end-to-end
> verification for this with an integration test instead?
>
>
> Yash:
>
> > From the proposed changes section, it isn't very clear to me how we'll be
> tracking this last modified timestamp to be returned in responses for the
> *GET
> /admin/loggers* and *GET /admin/loggers/{logger}* endpoints. Could you
> please elaborate on this? Also, will we track the last modified timestamp
> even for worker scoped modifications where we won't write any records to
> the config topic and the requests will essentially be processed
> synchronously?
>
> RE timestamp tracking: I was thinking we'd store the timestamp for each
> level in-memory and, whenever we change the level for a namespace, update
> its timestamp to the current wallclock time. Concretely, this means we'd
> want the timestamp for some logger `logger` to be as soon as possible after
> the call to `logger.setLevel(level)` for some level `level`. I'm honestly
> unsure how to clarify this further in the KIP; is there anything in there
> that strikes you as particularly ambiguous that we can tweak to be more
> clear?
>
> RE scope distinction for timestamps: I've updated the KIP to clarify this
> point, adding this sentence: "Timestamps will be updated regardless of
> whether the namespace update was applied using scope=worker or
> scope=cluster.". Let me know what you think
>
> > In the current synchronous implementation for the *PUT
> /admin/loggers/{logger} *endpoint, we return a 404 error if the level is
> invalid (i.e. not one of TRACE, DEBUG, WARN etc.). Since the new cluster
> scoped variant of the endpoint will be asynchronous, can we also add a
> validation to synchronously surface erroneous log levels to users?
>
> Good call! I think we don't have to be explicit about this in the proposed
> changes section, but it's a great fit for the testing plan, where I've
&

Re: KIP-976: Cluster-wide dynamic log adjustment for Kafka Connect

2023-09-03 Thread Ashwin
Hi Chris,

Thanks for thinking about this useful feature !
I had a question regarding

> Since cluster metadata is not required to handle these types of request,
they will not be forwarded to the leader

And later, we also mention about supporting more scope types in the future.
Don't you foresee a future scope type which may require cluster metadata ?
In that case, isn't it better to forward the requests to the leader in the
initial implementation ?

I would also recommend an additional system test for Standalone herder to
ensure that the new scope parameter is honored and the response contains
the last modified time.

Thanks,
Ashwin

On Sat, Sep 2, 2023 at 5:12 AM Chris Egerton 
wrote:

> Hi all,
>
> Can't imagine a worse time to publish a new KIP (it's late on a Friday and
> we're in the middle of the 3.6.0 release), but I wanted to put forth
> KIP-976 for discussion:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-976%3A+Cluster-wide+dynamic+log+adjustment+for+Kafka+Connect
>
> TL;DR: The API to dynamically adjust log levels at runtime with Connect is
> great, and I'd like to augment it with support to adjust log levels for
> every worker in the cluster (instead of just the worker that receives the
> REST request).
>
> I look forward to everyone's thoughts, but expect that this will probably
> take a bump or two once the dust has settled on 3.6.0. Huge thanks to
> everyone that's contributed to that release so far, especially our release
> manager Satish!
>
> Cheers,
>
> Chris
>


[jira] [Resolved] (KAFKA-14598) Fix flaky ConnectRestApiTest

2023-08-04 Thread Ashwin Pankaj (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Pankaj resolved KAFKA-14598.
---
Resolution: Fixed

Did not observe this recently

> Fix flaky ConnectRestApiTest
> 
>
> Key: KAFKA-14598
> URL: https://issues.apache.org/jira/browse/KAFKA-14598
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>    Reporter: Ashwin Pankaj
>Assignee: Ashwin Pankaj
>Priority: Minor
>  Labels: flaky-test
>
> ConnectRestApiTest sometimes fails with the message
> {{ConnectRestError(404, '\n\n content="text/html;charset=ISO-8859-1"/>\nError 404 Not 
> Found\n\nHTTP ERROR 404 Not 
> Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not
>  
> Found\nSERVLET:-\n\n\n\n\n',
>  'http://172.31.1.75:8083/connector-plugins/')}}
> This happens because ConnectDistributedService.start() by default waits till 
> the the line
> {{Joined group at generation ..}} is visible in the logs.
> In most cases this is sufficient. But in the cases where the test fails, we 
> see that this message appears even before Connect RestServer has finished 
> initialization.
>  {quote}   - [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, 
> groupId=connect-cluster] Joined group at generation 2 with protocol version 1 
> and got assignment: Assignment{error=0, 
> leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', 
> leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], 
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> - [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 
> +] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" 
> "python-requests/2.24.0" 71 (org.apache.kafka.connect.runtime.rest.RestServer)
> - [2022-12-15 15:40:29,579] INFO REST resources initialized; server is 
> started and ready to handle requests 
> (org.apache.kafka.connect.runtime.rest.RestServer)
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-882: Kafka Connect REST API timeout improvements

2023-03-01 Thread Ashwin
Thanks for the KIP Yash - +1 (non-binding)

One nitpick -
In one of the responses Chris had mentioned that the timeout param could be
like 'timeout=10s'.
The KIP seems to favour a millisecond timeout value and has an unwieldy
value in the example - `POST /connectors?timeout=12`

I like the idea of having smaller timeout values and the unit as part of
the value eg: '2m', '10s' or '500ms'.
Is it too late to incorporate this change?

Thanks,
Ashwin


On Wed, Mar 1, 2023 at 6:32 PM Yash Mayya  wrote:

> Hi all,
>
> I'd like to call for a vote on the (hopefully) straightforward KIP-882
> which adds support for configuring request timeouts on Kafka Connect REST
> APIs via query parameters along with a couple of related small
> improvements.
>
> KIP -
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-882%3A+Kafka+Connect+REST+API+timeout+improvements
>
> Discussion thread -
> https://lists.apache.org/thread/cygy115qmwpc3nj5omnj0crws2dw8nor
>
> Thanks,
> Yash
>


[jira] [Created] (KAFKA-14598) Fix flaky ConnectRestApiTest

2023-01-06 Thread Ashwin Pankaj (Jira)
Ashwin Pankaj created KAFKA-14598:
-

 Summary: Fix flaky ConnectRestApiTest
 Key: KAFKA-14598
 URL: https://issues.apache.org/jira/browse/KAFKA-14598
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Ashwin Pankaj
Assignee: Ashwin Pankaj


ConnectRestApiTest sometimes fails with the message

```
ConnectRestError(404, '\n\n\nError 404 Not 
Found\n\nHTTP ERROR 404 Not 
Found\n\nURI:/connector-plugins/\nSTATUS:404\nMESSAGE:Not
 
Found\nSERVLET:-\n\n\n\n\n',
 'http://172.31.1.75:8083/connector-plugins/')
```

This happens because ConnectDistributedService.start() by default waits till 
the the line

```
Joined group at generation ..
```
 is visible in the logs.

In most cases this is sufficient. But in the cases where the test fails, we see 
that this message appears even before Connect RestServer has finished 
initialization.

```
- [2022-12-15 15:40:29,064] INFO [Worker clientId=connect-1, 
groupId=connect-cluster] Joined group at generation 2 with protocol version 1 
and got assignment: Assignment{error=0, 
leader='connect-1-07d9da63-9acb-4633-aee4-1ab79f4ab1ae', 
leaderUrl='http://worker34:8083/', offset=-1, connectorIds=[], taskIds=[], 
revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
- [2022-12-15 15:40:29,560] INFO 172.31.5.66 - - [15/Dec/2022:15:40:29 
+] "GET /connector-plugins/ HTTP/1.1" 404 375 "-" "python-requests/2.24.0" 
71 (org.apache.kafka.connect.runtime.rest.RestServer)
- [2022-12-15 15:40:29,579] INFO REST resources initialized; server is 
started and ready to handle requests 
(org.apache.kafka.connect.runtime.rest.RestServer)

```

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-22 Thread Ashwin
Hi Snehasis,

> IIUC (please correct me if I am wrong here), what you highlighted above,
is
a versioning scheme for a connector config for the same connector (and not
different versions of a connector plugin).

Sorry for not being more precise in my wording -  I meant registering
versions of schema for connector config.

Let's take the example of a fictional connector which uses a fictional AWS
service.

Fictional Connector Config schema version:2.0
---
{
  "$schema": "http://json-schema.org/draft-04/schema#";,
  "type": "object",
  "properties": {
"name": {
  "type": "string"
},
"schema_version": {
  "type": "string"
},
"aws_access_key": {
  "type": "string"
},
"aws_secret_key": {
  "type": "string"
}
  },
  "required": [
"name",
"schema_version",
"aws_access_key",
"aws_secret_key"
  ]
}

Fictional Connector config schema version:3.0
---
{
  "$schema": "http://json-schema.org/draft-04/schema#";,
  "type": "object",
  "properties": {
"name": {
  "type": "string"
},
"schema_version": {
  "type": "string"
},
"iam_role": {
  "type": "string"
}
  },
  "required": [
"name",
"schema_version",
"iam_role"
  ]
}

The connector which supports Fictional config schema 2.0  will validate the
access key and secret key.
Whereas a connector which supports config with schema version 3.0 will only
validate the IAM role.

This is the alternative which I wanted to suggest. Each plugin will
register the schema versions of connector config which it supports.

The plugin paths may be optionally different i.e  we don't have to
mandatorily add a new plugin path to support a new schema version.

Thanks,
Ashwin

On Tue, Nov 22, 2022 at 12:47 PM Snehashis  wrote:

> Thanks for the input Ashwin.
>
> > 1. Can you elaborate on the rejected alternatives ? Suppose connector
> > config is versioned and has a schema. Then a single plugin (whose
> > dependencies have not changed) can handle multiple config versions for
> the
> > same connector class.
>
> IIUC (please correct me if I am wrong here), what you highlighted above, is
> a versioning scheme for a connector config for the same connector (and not
> different versions of a connector plugin). That is a somewhat tangential
> problem. While it is definitely a useful feature to have, like a log to
> check what changes were made over time to the config which might make it
> easier to do rollbacks, it is not the focus here. Here by version we mean
> to say what underlying version of the plugin should the given configuration
> of the connector use. Perhaps it is better to change the name of the
> parameter from connector.version to connector.plugin.version or
> plugin.version if it was confusing. wdyt?
>
> >  2. Any plans to support assisted migration e.g if a user invokes "POST
> > connector/config?migrate=latest", the latest version __attempts__ to
> > transform the existing config to the newer version. This would require
> > adding a method like "boolean migrate(Version fromVersion)" to the
> > connector interface.
>
> This is an enhancement we can think of doing in future. Users can simply do
> a PUT call with the updated config which has the updated version number.
> The assisted mode could be handy as the user does not need to know the
> config but beyond this it does not seem to justify its existence.
>
> Regards
> Snehashis
>
> On Tue, Nov 22, 2022 at 10:50 AM Ashwin 
> wrote:
>
> > Hi Snehasis,
> >
> > This is a really useful feature and thanks for initiating this
> discussion.
> >
> > I had the following questions -
> >
> >
> > 1. Can you elaborate on the rejected alternatives ? Suppose connector
> > config is versioned and has a schema. Then a single plugin (whose
> > dependencies have not changed) can handle multiple config versions for
> the
> > same connector class.
> >
> > 2. Any plans to support assisted migration e.g if a user invokes "POST
> > connector/config?migrate=latest", the latest version __attempts__ to
> > transform the existing config to the newer version. This would require
> > adding a method like "boolean migrate(Version fromVersion)" to the
> > connector interface.
> >
> > Thanks,
> > Ashwin
> >
> > On Mon, Nov 21, 2022 at 2:27 PM Snehashis 
> > wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a discussion thread on KIP-891: Running multiple
> > versions
> > > of a connector.
> > >
> > > The KIP aims to add the ability for the connect runtime to run multiple
> > > versions of a connector.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+a+connector
> > >
> > > Please take a look and let me know what you think.
> > >
> > > Thank you
> > > Snehashis Pal
> > >
> >
>


Re: [DISCUSS] KIP-891: Running multiple versions of a connector.

2022-11-21 Thread Ashwin
Hi Snehasis,

This is a really useful feature and thanks for initiating this discussion.

I had the following questions -


1. Can you elaborate on the rejected alternatives ? Suppose connector
config is versioned and has a schema. Then a single plugin (whose
dependencies have not changed) can handle multiple config versions for the
same connector class.

2. Any plans to support assisted migration e.g if a user invokes "POST
connector/config?migrate=latest", the latest version __attempts__ to
transform the existing config to the newer version. This would require
adding a method like "boolean migrate(Version fromVersion)" to the
connector interface.

Thanks,
Ashwin

On Mon, Nov 21, 2022 at 2:27 PM Snehashis  wrote:

> Hi all,
>
> I'd like to start a discussion thread on KIP-891: Running multiple versions
> of a connector.
>
> The KIP aims to add the ability for the connect runtime to run multiple
> versions of a connector.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-891%3A+Running+multiple+versions+of+a+connector
>
> Please take a look and let me know what you think.
>
> Thank you
> Snehashis Pal
>


Re: [DISCUSS] KIP-875: First-class offsets support in Kafka Connect

2022-10-13 Thread Ashwin
Thanks for KIP Chris - I think this is a useful feature.

Can you please elaborate on the following in the KIP -

1. How would the response of GET /connectors/{connector}/offsets look like
if the worker has both global and connector specific offsets topic ?

2. How can we pass the reset options like shift-by , to-date-time etc.
using a REST API like DELETE /connectors/{connector}/offsets ?

3. Today PAUSE operation on a connector invokes its stop method - will
there be a change here to reduce confusion with the new proposed STOPPED
state ?

Thanks,
Ashwin

On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton 
wrote:

> Hi all,
>
> I noticed a fairly large gap in the first version of this KIP that I
> published last Friday, which has to do with accommodating connectors
> that target different Kafka clusters than the one that the Kafka Connect
> cluster uses for its internal topics and source connectors with dedicated
> offsets topics. I've since updated the KIP to address this gap, which has
> substantially altered the design. Wanted to give a heads-up to anyone
> that's already started reviewing.
>
> Cheers,
>
> Chris
>
> On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton  wrote:
>
> > Hi all,
> >
> > I'd like to begin discussion on a KIP to add offsets support to the Kafka
> > Connect REST API:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> >
> > Cheers,
> >
> > Chris
> >
>


Re: [DISCUSS] KIP-850: REST API for filtering Connector plugins by type

2022-06-23 Thread Ashwin
No worries Chris - I understand what you are getting at.

I will pursue updating the current REST API.

Cheers,
Ashwin


On Fri, Jun 24, 2022 at 8:57 AM Chris Egerton 
wrote:

> Hi Ashwin,
>
> I just want to be clear that I do think there might be some value to being
> able to address the edge case you've raised; I just want to understand
> exactly what users might do with that information. Driving a programmatic
> UI with a dropdown menu for selecting header/key/value converters could be
> one such use case, for example.
>
> If we do decide to tweak the REST API to expose multiple types for classes
> that implement multiple plugins, we'll still need a KIP for that, and I'd
> be happy to review if you wanted to pursue that. I'm also not dead-set
> against the filter API you've proposed, I just don't know of any cases yet
> where the implementation cost would be outweighed by potential benefit to
> users.
>
> I don't want to sound unwelcoming, especially since this is a newbie task.
> Hope this has been helpful so far, and thank you for your interest in
> contributing to Kafka Connect!
>
> Cheers,
>
> Chris
>
> On Thu, Jun 23, 2022 at 11:16 PM Ashwin 
> wrote:
>
> > Hi Chris,
> >
> > Sure, if JIRA is obsolete and does not add value, I am fine with
> abandoning
> > this change.
> > I will close the JIRA as well.
> > Thanks for helping out!
> >
> > Ashwin
> >
> > On Fri, Jun 24, 2022 at 4:23 AM Chris Egerton 
> > wrote:
> >
> > > Hi Ashwin,
> > >
> > > Thanks for the clarification, which is certainly useful. However, it's
> > not
> > > an actual use case. What will users try to do now that would have been
> > > difficult/impossible in the future because of this ambiguity? I ask
> > because
> > > the Jira ticket linked in the KIP is very old and may be obsolete, and
> > > there seems to be a much simpler way to address the edge case of a
> class
> > > implementing multiple plugin interfaces: we can just expose each type
> in
> > > the REST API, instead of picking one at random.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Thu, Jun 23, 2022 at 12:55 AM Ashwin 
> > > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > Thanks for the quick response.
> > > >
> > > > > It seems pretty straightforward to get this via client-side
> filtering
> > > >
> > > > I assume that you meant  GET /collector-plugins + jq filtering on
> type
> > > > attribute.
> > > >
> > > > One issue that I see with that approach is that we cannot filter
> > plugins
> > > if
> > > > they implement more than one of Converter, HeaderConverter, Sink,
> > Source
> > > > interfaces.
> > > >
> > > > The type field in PluginDesc field is set based on the order of enum
> > > values
> > > > in PluginType -
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/22534424d22e5a4d4d512b0ac1b5f312b867a5bf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java#L49
> > > > .
> > > >
> > > > Eg: org.apache.kafka.connect.json.JsonConverter implements both
> > Converter
> > > > and HeaderConverter interfaces, but the type value is always set to
> > > > Converter as that value is defined first in PluginType enum.
> > > >
> > > > The new API will list JsonConverter when we query either for
> > > > HeaderConverter or Converter plugin types.
> > > >
> > > > Thanks,
> > > > Ashwin
> > > >
> > > >
> > > > On Thu, Jun 23, 2022 at 12:03 AM Chris Egerton <
> > fearthecel...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Ashwin,
> > > > >
> > > > > Can you provide a use case for this new behavior? It seems pretty
> > > > > straightforward to get this via client-side filtering, I'm a little
> > > > > skeptical that we need to implement it server-side.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Wed, Jun 22, 2022 at 4:25 AM Ashwin
>  > >
> > > > > wrote:
> > > > >
> > > > > > Hello Folks,
> > > > > >
> > > > > > I have picked up a newbie task and request you provide your
> > valuable
> > > > > > feedback.
> > > > > >
> > > > > > KIP:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-850%3A+REST+API+for+filtering+Connector+plugins+by+type
> > > > > > Initial PR: https://github.com/apache/kafka/pull/12330
> > > > > >
> > > > > > Though https://issues.apache.org/jira/browse/KAFKA-4279 was for
> an
> > > > > > endpoint
> > > > > > to list converter plugins, I propose that we implement a generic
> > > > "filter"
> > > > > > API which will list connector plugins matched by type (one of
> > > > > > SOURCE,SINK,CONVERTER,HEADER_CONVERTER).
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Ashwin
> > > > > >
> > > > >
> > > >
> > >
> >
>


[jira] [Resolved] (KAFKA-4279) REST endpoint to list converter plugins

2022-06-23 Thread Ashwin Pankaj (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Pankaj resolved KAFKA-4279.
--
Resolution: Won't Do

Obsolete - https://lists.apache.org/thread/hvjr2pclhstpmdr164sc73sb55lhbl1m

> REST endpoint to list converter plugins
> ---
>
> Key: KAFKA-4279
> URL: https://issues.apache.org/jira/browse/KAFKA-4279
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Gwen Shapira
>Assignee: Ashwin Pankaj
>Priority: Minor
>  Labels: needs-kip, newbie
>
> We have a REST resource that allows users to see the available plugins, but 
> we have no equivalent that allows listing available converters.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-850: REST API for filtering Connector plugins by type

2022-06-23 Thread Ashwin
Hi Chris,

Sure, if JIRA is obsolete and does not add value, I am fine with abandoning
this change.
I will close the JIRA as well.
Thanks for helping out!

Ashwin

On Fri, Jun 24, 2022 at 4:23 AM Chris Egerton 
wrote:

> Hi Ashwin,
>
> Thanks for the clarification, which is certainly useful. However, it's not
> an actual use case. What will users try to do now that would have been
> difficult/impossible in the future because of this ambiguity? I ask because
> the Jira ticket linked in the KIP is very old and may be obsolete, and
> there seems to be a much simpler way to address the edge case of a class
> implementing multiple plugin interfaces: we can just expose each type in
> the REST API, instead of picking one at random.
>
> Cheers,
>
> Chris
>
> On Thu, Jun 23, 2022 at 12:55 AM Ashwin 
> wrote:
>
> > Hi Chris,
> >
> > Thanks for the quick response.
> >
> > > It seems pretty straightforward to get this via client-side filtering
> >
> > I assume that you meant  GET /collector-plugins + jq filtering on type
> > attribute.
> >
> > One issue that I see with that approach is that we cannot filter plugins
> if
> > they implement more than one of Converter, HeaderConverter, Sink, Source
> > interfaces.
> >
> > The type field in PluginDesc field is set based on the order of enum
> values
> > in PluginType -
> >
> >
> https://github.com/apache/kafka/blob/22534424d22e5a4d4d512b0ac1b5f312b867a5bf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java#L49
> > .
> >
> > Eg: org.apache.kafka.connect.json.JsonConverter implements both Converter
> > and HeaderConverter interfaces, but the type value is always set to
> > Converter as that value is defined first in PluginType enum.
> >
> > The new API will list JsonConverter when we query either for
> > HeaderConverter or Converter plugin types.
> >
> > Thanks,
> > Ashwin
> >
> >
> > On Thu, Jun 23, 2022 at 12:03 AM Chris Egerton 
> > wrote:
> >
> > > Hi Ashwin,
> > >
> > > Can you provide a use case for this new behavior? It seems pretty
> > > straightforward to get this via client-side filtering, I'm a little
> > > skeptical that we need to implement it server-side.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Wed, Jun 22, 2022 at 4:25 AM Ashwin 
> > > wrote:
> > >
> > > > Hello Folks,
> > > >
> > > > I have picked up a newbie task and request you provide your valuable
> > > > feedback.
> > > >
> > > > KIP:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-850%3A+REST+API+for+filtering+Connector+plugins+by+type
> > > > Initial PR: https://github.com/apache/kafka/pull/12330
> > > >
> > > > Though https://issues.apache.org/jira/browse/KAFKA-4279 was for an
> > > > endpoint
> > > > to list converter plugins, I propose that we implement a generic
> > "filter"
> > > > API which will list connector plugins matched by type (one of
> > > > SOURCE,SINK,CONVERTER,HEADER_CONVERTER).
> > > >
> > > >
> > > > Thanks,
> > > > Ashwin
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-850: REST API for filtering Connector plugins by type

2022-06-22 Thread Ashwin
Hi Chris,

Thanks for the quick response.

> It seems pretty straightforward to get this via client-side filtering

I assume that you meant  GET /collector-plugins + jq filtering on type
attribute.

One issue that I see with that approach is that we cannot filter plugins if
they implement more than one of Converter, HeaderConverter, Sink, Source
interfaces.

The type field in PluginDesc field is set based on the order of enum values
in PluginType -
https://github.com/apache/kafka/blob/22534424d22e5a4d4d512b0ac1b5f312b867a5bf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java#L49
.

Eg: org.apache.kafka.connect.json.JsonConverter implements both Converter
and HeaderConverter interfaces, but the type value is always set to
Converter as that value is defined first in PluginType enum.

The new API will list JsonConverter when we query either for
HeaderConverter or Converter plugin types.

Thanks,
Ashwin


On Thu, Jun 23, 2022 at 12:03 AM Chris Egerton 
wrote:

> Hi Ashwin,
>
> Can you provide a use case for this new behavior? It seems pretty
> straightforward to get this via client-side filtering, I'm a little
> skeptical that we need to implement it server-side.
>
> Cheers,
>
> Chris
>
> On Wed, Jun 22, 2022 at 4:25 AM Ashwin 
> wrote:
>
> > Hello Folks,
> >
> > I have picked up a newbie task and request you provide your valuable
> > feedback.
> >
> > KIP:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-850%3A+REST+API+for+filtering+Connector+plugins+by+type
> > Initial PR: https://github.com/apache/kafka/pull/12330
> >
> > Though https://issues.apache.org/jira/browse/KAFKA-4279 was for an
> > endpoint
> > to list converter plugins, I propose that we implement a generic "filter"
> > API which will list connector plugins matched by type (one of
> > SOURCE,SINK,CONVERTER,HEADER_CONVERTER).
> >
> >
> > Thanks,
> > Ashwin
> >
>


[DISCUSS] KIP-850: REST API for filtering Connector plugins by type

2022-06-22 Thread Ashwin
Hello Folks,

I have picked up a newbie task and request you provide your valuable
feedback.

KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-850%3A+REST+API+for+filtering+Connector+plugins+by+type
Initial PR: https://github.com/apache/kafka/pull/12330

Though https://issues.apache.org/jira/browse/KAFKA-4279 was for an endpoint
to list converter plugins, I propose that we implement a generic "filter"
API which will list connector plugins matched by type (one of
SOURCE,SINK,CONVERTER,HEADER_CONVERTER).


Thanks,
Ashwin


Requesting permissions to contribute to Apache Kafka

2022-06-20 Thread Ashwin
Hello,

Please approve my request for permissions to contribute.

wiki id : ashwin pankaj
jira id: ashwinpankaj

Thanks,
Ashwin


Re: [VOTE] KIP-162: Enable topic deletion by default

2017-06-06 Thread Ashwin Sinha
+1

On Tue, Jun 6, 2017 at 11:20 PM, Mickael Maison 
wrote:

> +1 (non binding), thanks
>
> On Tue, Jun 6, 2017 at 2:16 PM, Bill Bejeck  wrote:
> > +1
> >
> > -Bill
> >
> > On Tue, Jun 6, 2017 at 9:08 AM, Ismael Juma  wrote:
> >
> >> Thanks for the KIP, Gwen. +1 (binding).
> >>
> >> Ismael
> >>
> >> On Tue, Jun 6, 2017 at 5:37 AM, Gwen Shapira  wrote:
> >>
> >> > Hi,
> >> >
> >> > The discussion has been quite positive, so I posted a JIRA, a PR and
> >> > updated the KIP with the latest decisions.
> >> >
> >> > Lets officially vote on the KIP:
> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 162+-+Enable+topic+deletion+by+default
> >> >
> >> > JIRA is here: https://issues.apache.org/jira/browse/KAFKA-5384
> >> >
> >> > Gwen
> >> >
> >>
>



-- 
Thanks and Regards,
Ashwin


[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2015-03-18 Thread Ashwin Jayaprakash (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14367361#comment-14367361
 ] 

Ashwin Jayaprakash commented on KAFKA-1716:
---

Sorry for the delay, I will get back to you on this ASAP.

> hang during shutdown of ZookeeperConsumerConnector
> --
>
> Key: KAFKA-1716
> URL: https://issues.apache.org/jira/browse/KAFKA-1716
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Sean Fay
>Assignee: Neha Narkhede
> Attachments: after-shutdown.log, before-shutdown.log, 
> kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
> wedge in the case that some consumer fetcher threads receive messages during 
> the shutdown process.
> Shutdown thread:
> {code}-- Parking to wait for: 
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> at 
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> at 
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> at 
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}-- Parking to wait for: 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at 
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at 
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> at 
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractF

[jira] [Updated] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2015-03-12 Thread Ashwin Jayaprakash (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Jayaprakash updated KAFKA-1716:
--
Attachment: after-shutdown.log
before-shutdown.log

[~becket_qin] Here are the thread dumps collected before and after the shutdown 
process was initiated.

> hang during shutdown of ZookeeperConsumerConnector
> --
>
> Key: KAFKA-1716
> URL: https://issues.apache.org/jira/browse/KAFKA-1716
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Sean Fay
>Assignee: Neha Narkhede
> Attachments: after-shutdown.log, before-shutdown.log, 
> kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
> wedge in the case that some consumer fetcher threads receive messages during 
> the shutdown process.
> Shutdown thread:
> {code}-- Parking to wait for: 
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> at 
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> at 
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> at 
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}-- Parking to wait for: 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at 
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at 
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> at 
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> at 
> kafka/server/AbstractFetcherThread$$anonfun

[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2015-03-10 Thread Ashwin Jayaprakash (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356326#comment-14356326
 ] 

Ashwin Jayaprakash commented on KAFKA-1716:
---

I'm beginning to wonder if the {{shutdown()}} method on the consumer is safe to 
call from a different thread than the one that is currently using it.

> hang during shutdown of ZookeeperConsumerConnector
> --
>
> Key: KAFKA-1716
> URL: https://issues.apache.org/jira/browse/KAFKA-1716
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Sean Fay
>Assignee: Neha Narkhede
> Attachments: kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
> wedge in the case that some consumer fetcher threads receive messages during 
> the shutdown process.
> Shutdown thread:
> {code}-- Parking to wait for: 
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> at 
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> at 
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> at 
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}-- Parking to wait for: 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at 
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at 
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> at 
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> at 
> kafka/server/AbstractFetcherThread$$a

[jira] [Updated] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2015-03-10 Thread Ashwin Jayaprakash (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashwin Jayaprakash updated KAFKA-1716:
--
Attachment: kafka-shutdown-stuck.log

> hang during shutdown of ZookeeperConsumerConnector
> --
>
> Key: KAFKA-1716
> URL: https://issues.apache.org/jira/browse/KAFKA-1716
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.1.1
>Reporter: Sean Fay
>Assignee: Neha Narkhede
> Attachments: kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
> wedge in the case that some consumer fetcher threads receive messages during 
> the shutdown process.
> Shutdown thread:
> {code}-- Parking to wait for: 
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at 
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at 
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at 
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at 
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> at 
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at 
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> at 
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at 
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> at 
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code}-- Parking to wait for: 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at 
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at 
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at 
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> at 
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at 
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)

[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2015-03-10 Thread Ashwin Jayaprakash (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356289#comment-14356289
 ] 

Ashwin Jayaprakash commented on KAFKA-1716:
---

We upgraded to Kafka 0.8.2 last week and now we can reproduce this issue every 
time on our Kafka consumer JVMs.

Our setup is like this. We start {{ConsumerConnector}} instances dynamically 
based on a configurable property. Each of those {{ConsumerConnector}} instances 
creates a {{ConsumerIterator}}. Right now we have 4 such instances in each JVM. 
Naturally we have 4 separate threads consuming from those 4 iterators in 
parallel. 

All this worked ok until recently, where we faced some issues with consumer 
rebalancing and an overloaded ZK subtree, see 
http://markmail.org/thread/gnodacjjya6r573m. While we were trying to address 
that we changed the defaults to these {{rebalance.max.retries 16}} and 
{{rebalance.backoff.ms 1}}. Note that we also upgraded to 0.8.2.

Everytime we shutdown the JVM, we first try to shutdown the consumers one by 
one before exiting. With these recent changes, the JVM exit gets stuck because:
# The shutdown thread is different from the 4 consumer threads (in addition to 
the background threads that ZK and Kafka create)
# The shutdown thread shuts down the first consumer and so that consumer exits 
quickly and gracefully
# In the meanwhile the second, third and fourth consumers are trying to 
rebalance the partitions
# Shutdown thread proceeds to call shutdown on the second consumer
## The shutdown thread appears to make some progress in shutting down the 
second consumer but then gets stuck on a monitor that has been acquired by the 
{{xx_watcher_executor}}
## This appears to be a deadlock because the {{xx_watcher_executor}} thread has 
acquired the monitor lock and gone to sleep
# The shutdown then takes a long time because all the 3 remaining consumers 
retry for {{16}} times and then give up

The thread dumps here should make it clear.

{code}
"Thread-13@8222" prio=5 tid=0x53 nid=NA waiting for monitor entry
  java.lang.Thread.State: BLOCKED
 waiting for 
indexer-group_on-localhost-pid-58357-kafka-message-source-id-820_watcher_executor@8160
 to release lock on <0x2b5e> (a java.lang.Object)
  at 
kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:191)
  at 
kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:119)
  at 
xx.yy.zz.processor.kafka.consumer.KafkaMessageSource.close(KafkaMessageSource.java:239)
  at 
xx.yy.zz.pipeline.source.MessageSourceStage.stop(MessageSourceStage.java:162)
  at 
xx.yy.common.util.lifecycle.LifecycleHelper.stopAll(LifecycleHelper.java:53)
  at xx.yy.zz.pipeline.framework.Pipeline.stop(Pipeline.java:205)
  at 
xx.yy.common.util.lifecycle.LifecycleHelper.stopAll(LifecycleHelper.java:53)
  at xx.yy.zz.pipeline.framework.Pipelines.stop(Pipelines.java:225)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at xx.yy.AppLifecycle.callAnnotatedMethods(AppLifecycle.java:163)
  at xx.yy.AppLifecycle.stop(AppLifecycle.java:144)
  - locked <0x2b31> (a xx.yy.AppLifecycle)
  at xx.yy.AppLifecycle$6.stop(AppLifecycle.java:247)
  at io.dropwizard.lifecycle.JettyManaged.doStop(JettyManaged.java:32)
  at 
org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:90)
  - locked <0x2b83> (a java.lang.Object)
  at 
org.eclipse.jetty.util.component.ContainerLifeCycle.stop(ContainerLifeCycle.java:129)
  at 
org.eclipse.jetty.util.component.ContainerLifeCycle.doStop(ContainerLifeCycle.java:148)
  at 
org.eclipse.jetty.server.handler.AbstractHandler.doStop(AbstractHandler.java:71)
  at org.eclipse.jetty.server.Server.doStop(Server.java:410)
  at 
org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:90)
  - locked <0x2b84> (a java.lang.Object)
  at 
org.eclipse.jetty.util.thread.ShutdownThread.run(ShutdownThread.java:133)

"indexer-group_on-localhost-pid-58357-kafka-message-source-id-820_watcher_executor@8160"
 daemon prio=5 tid=0x74 nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
 blocks Thread-13@8222
  at java.lang.Thread.sleep(Thread.java:-1)
  at 
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:627)
  at