[jira] [Created] (KAFKA-13848) Clients remaing connected after SASL re-authentication fails

2022-04-22 Thread Andras Csaki (Jira)
Andras Csaki created KAFKA-13848:


 Summary: Clients remaing connected after SASL re-authentication 
fails
 Key: KAFKA-13848
 URL: https://issues.apache.org/jira/browse/KAFKA-13848
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.1.0
 Environment: https://github.com/acsaki/kafka-sasl-reauth
Reporter: Andras Csaki


Clients remain connected and able to produce or consume despite an expired 
OAUTHBEARER token.

The problem can be reproduced using the 
https://github.com/acsaki/kafka-sasl-reauth project by starting the embedded 
OAuth2 server and Kafka, then running the long running consumer in 
OAuthBearerTest and then killing the OAuth2 server thus making the client 
unable to re-authenticate.

Root cause seems to be 
SaslServerAuthenticator#calcCompletionTimesAndReturnSessionLifetimeMs failing 
to set ReauthInfo#sessionExpirationTimeNanos when tokens have already expired 
(when session life time goes negative), in turn causing 
KafkaChannel#serverAuthenticationSessionExpired returning false and finally 
SocketServer not closing the channel.

The issue is observed with OAUTHBEARER but seems to have a wider impact on SASL 
re-authentication.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-827: Expose logdirs total and usable space via Kafka API

2022-04-22 Thread Tom Bentley
Hi Mickael,

Thanks for the KIP, I can see this would be useful.

I guess you could have used optional tagged fields, rather than bumping the
version, but then again I don't see it being particularly advantageous in
this case either.

Kind regards,

Tom

On Tue, 19 Apr 2022 at 10:23, Divij Vaidya  wrote:

> I have a minor suggestion below but overall KIP looks good to me to start a
> vote.
>
> *Reg#6* Would you consider replacing UNKNOWN_SPACE with UNSUPPORTED?
> UNSUPPORTED tells the user explicitly that the value is missing due to
> client/server version mismatch whereas with UNKNOWN_SPACE, the user is left
> wondering whether it is a problem with underlying storage not providing
> space information or something else.
>
> Divij Vaidya
>
>
>
> On Fri, Apr 15, 2022 at 3:40 PM Mickael Maison 
> wrote:
>
> > Hi Luke,
> >
> > 7. I've updated the KIP to clarify these sizes are in bytes.
> >
> > Thanks,
> > Mickael
> >
> > On Fri, Apr 15, 2022 at 12:16 PM Luke Chen  wrote:
> > >
> > > Hi Mickael,
> > >
> > > Thanks for the KIP!
> > > This is a good improvement.
> > >
> > > (3) +1 for not adding the number of files in the directory. Counting
> the
> > > file numbers should be slow.
> > > (7) Could you make the fields clear in `DescribeLogDirsResponse`, to
> > > mention the returned number is size in Byte (or not?)
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Fri, Apr 15, 2022 at 5:27 PM Mickael Maison <
> mickael.mai...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for the feedback.
> > > >
> > > > 3. Yes that's right. Also the number of file descriptors is really
> not
> > > > a property of log directories. Administrators typically tracked that
> > > > count per process and for the whole operating system.
> > > >
> > > > 5. That's a good point, I've updated the KIP to mention sizes will be
> > > > capped to Long.MAX_VALUE even if the actual storage is larger.
> > > >
> > > > 6. Brokers would never return UNKNOWN_SPACE. When new clients query
> > > > older brokers via the admin API, the admin client will use
> > > > UNKNOWN_SPACE to indicate these values weren't provided by brokers.
> > > >
> > > > Thanks,
> > > > Mickael
> > > >
> > > > On Fri, Apr 8, 2022 at 5:00 PM Divij Vaidya  >
> > > > wrote:
> > > > >
> > > > > Thanks for replying. I still have a few lingering
> questions/comments.
> > > > >
> > > > > *Reg#1* Understood. I checked and the underlying system call is
> > statvfs
> > > > for
> > > > > unix systems which should be ok to call here.
> > > > > *Reg#2* Fair point. I checked again and yes, log.dir always means
> > local
> > > > > storage even when tiered storage is enabled.
> > > > > *Reg#3* The rationale for adding these new (size) fields to the
> > > > > `DescribeLogDirs` is to allow the administrator to monitor or
> perhaps
> > > > take
> > > > > automated action based on results. Doesn't monitoring the number of
> > file
> > > > > descriptors fall in the same category of use cases? I am assuming
> > that we
> > > > > want to add the size information in the API response because JVM
> > makes it
> > > > > possible to get this information in a platform agnostic manner
> which
> > is
> > > > not
> > > > > true for open file descriptors, correct?
> > > > > *Reg#4* Agree.
> > > > > *New#5*: As an FYI, Java FileStore API breaks on large storage
> sizes.
> > > > See:
> > > > > https://bugs.openjdk.java.net/browse/JDK-8162520. ElasticSearch
> has
> > been
> > > > > hit by these limitations in the past. For JDK 11, you will probably
> > have
> > > > to
> > > > > add defensive checks such as
> > > > >
> > > >
> >
> https://github.com/opensearch-project/OpenSearch/blob/b74d71fb747cc2873d4c2ffae825944da4d06e1b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java#L148
> > > > .
> > > > > The documentation of the API mentioned in KIP will also be modified
> > to
> > > > > account for this edge case.
> > > > > *New#6*: Can you please provide an example where the return for
> these
> > > > APIs
> > > > > would be UNKNOWN_SPACE? Doesn't JVM guarantee that this API will
> > > > definitely
> > > > > return results (else it throws an IOException)? I would propose
> that
> > we
> > > > get
> > > > > rid of default since JVM guarantees that this would work on all
> > > > platforms.
> > > > > If it doesn't then it's a bug and should be uncovered via an
> > exception.
> > > > >
> > > > > Also, I would like to volunteer to code review (of course, it would
> > be
> > > > > non-binding) your implementation once this KIP is approved.
> > > > >
> > > > > Regards,
> > > > > Divij Vaidya
> > > > >
> > > > > On Fri, Apr 8, 2022 at 11:35 AM Mickael Maison <
> > mickael.mai...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Divij,
> > > > > >
> > > > > > Thanks for taking a look!
> > > > > >
> > > > > > 1. In order to retrieve the sizes, the plan is to use
> > getTotalSpace()
> > > > > > and getUsableSpace() from java.nio.file.FileStore. The
> > implementations
> > > > > > may var

Re: [DISCUSS] KIP-821: Connect Transforms support for nested structures

2022-04-22 Thread Tom Bentley
Hi Jorge,

Thanks for the KIP, especially for the examples which are super-clear.

100. The name `field.style` isn't so clear for something like ReplaceField:
it's not so obvious that field.style applies to `include` and `exclude`.

101. The permitted values for `field.style` don't seem terribly intuitive
(to me anyway): the meaning of `plain` isn't very guessable. Why not
`top-level` or `root` instead? Also `nested` could be misconstrued to mean
nested-but-not-top-level, so perhaps `recursive` or `cascading` might be
better?

102. I'm torn on whether making the interpretation of existing configs like
`field` be dependent on `field.style` is a good idea. I can see that it's
the simplest thing to do, but it just feels a bit odd that sometimes the
`field` would actually be a path and have different escaping rules. An
alternative would be to come up with a parallel set of config names (e.g.
as well as "field" an SMT might support "path") which were defined to
always take paths, thus avoiding the changeable interpretation of the
existing configs. The SMT's #configure() would need to throw in the case
that both configs were given. I can see that that would be more work in
implementation, but it feels cleaner.

103. I think in order to allow for supporting arrays in a later KIP (which
certainly seems like it could be useful), we'd want to specify the syntax
now, even if it wasn't implemented under this KIP. That's because I don't
think you can't exclude the possibility that characters such as `[` and `]`
appear in field names. So you'd have a compatibility problem if people
started using the features of this KIP to access such fields, only for
those characters to change their meaning under a later KIP.

104. I also wonder whether making paths into a public Java API, for use by
3rd party SMTs, would be valuable.

Thanks again,

Tom



On Wed, 20 Apr 2022 at 17:53, Chris Egerton  wrote:

> 💯 Thanks Jorge, LGTM!
>
> On Wed, Apr 20, 2022, 12:40 Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
>
> > Thank you, Chris! Not possible without your feedback.
> >
> > On Tue, 19 Apr 2022 at 23:04, Chris Egerton 
> > wrote:
> >
> > > Hi Jorge,
> > >
> > > Thank you for sticking through this. I have one small remark and one
> > small
> > > clarification; assuming you agree with me on them then I'm ready to
> vote
> > on
> > > the KIP.
> > >
> > > 1. InsertField: The "field.on.missing.parent" and
> > "field.on.existing.field"
> > > docs both mention a permitted value of "ingore"; this should be
> "ignore",
> > > right?
> > >
> >
> > Of course, one more typo :)
> >
> >
> > > 2. InsertField: The examples are still missing the "field.style"
> property
> > > from the configurations. They should all include the property
> > > "transforms.smt1.field.style": "nested", correct?
> > >
> >
> > Yes, it is there. I think I know what you mean now, seems that Confluence
> > is putting everything in one line when it's in separate lines in the
> > editor.
> > Hopefully, it's fixed now.
> >
> >
> > >
> > > Thanks again for working through this, and congratulations on a
> > > well-written KIP!
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Tue, Apr 19, 2022 at 2:06 PM Jorge Esteban Quilcate Otoya <
> > > quilcate.jo...@gmail.com> wrote:
> > >
> > > > Thank you, Chris! I apply these improvements to the KIP, let me know
> > how
> > > it
> > > > looks now.
> > > >
> > > > On Mon, 11 Apr 2022 at 23:43, Chris Egerton  >
> > > > wrote:
> > > >
> > > > > Hi Jorge,
> > > > >
> > > > > Wow, those examples are great! A few more remarks, but I think
> we're
> > > > > getting close:
> > > > >
> > > > > 1. The examples differ across SMTs with the name of the
> > > newly-introduced
> > > > > style property; some of them use "field.style", and some use
> > > > > "fields.style". I think for consistency's sake we should stick with
> > > just
> > > > > "field.style"; otherwise it could be painful for users to have to
> > > > remember
> > > > > which to use.
> > > > >
> > > >
> > > > Great catch. Agree, I fixed the config names to `field.style`.
> > > >
> > > >
> > > > >
> > > > > 2. Some of the examples are off:
> > > > > - TimestampConverter: the input in the second example ("when field
> > > names
> > > > > include dots") doesn't contain a field with a dotted name
> > > > > - ValueToKey: the config in the third example ("when field names
> > > include
> > > > > dots") should probably use "parent..child.k2" as the
> > > > > "transforms.smt1.fields" property
> > > > >
> > > >
> > > > Fixed. Thanks!
> > > >
> > > >
> > > > >
> > > > > 3. RE changes to InsertField:
> > > > > - The InsertField SMT should also come with the new "field.style"
> > > > property
> > > > > in order to preserve backwards compatibility, right? I don't see it
> > > > > included in the example configs for that one, just want to make
> sure
> > > > > - I don't know of any cases where we use snake_case for property
> > names
> > > in
> > > > > Kafka; we should probably us

Re: [VOTE] KIP-797: Accept duplicate listener on port for IPv4/IPv6

2022-04-22 Thread Tom Bentley
Hi Matthew,

Thanks for the KIP, +1 (binding).

Kind regards,

Tom

On Thu, 14 Apr 2022 at 12:15, Matthew de Detrich
 wrote:

> Hi David,
>
> Thanks for the response.
>
> > 1. In the public interface section, could we spell out
> the configurations that we are changing with this
> KIP? The name does not change but the semantic is
> so it is good to be clear.
>
> Done
>
> > 2. In the proposed changes section, I would rather
> mention the configuration that we need to change the
> validation for instead of saying "loosening the validation
> on listenerListToEndPoints in kafka.utils.CoreUtils.scala"
> as this is specific to the implementation.
>
> This is already done with examples later down in the same section, or am I
> missing something? Would you like me to just remove the
> kafka.utils.CoreUtils.scala reference so its not implying an implementation
> detail?
>
> > 3. For my understanding, using the same port with two
> different DNS entries would fail, right? e.g.
> "PLAINTEXT://foo:9092,PLAINTEXT://bar:9092"
>
> Correct, the idea is that it checks that the listener host is an IP address
> and if it's not then it doesn't even consider it (i.e. it short circuits to
> what is current behaviour). The proposed KIP changes only apply if
> hostnames in the listener are IP address's otherwise no change is
> observable.
>
> Regards
>
> On Mon, Feb 21, 2022 at 10:42 AM David Jacot 
> wrote:
>
> > Hi Matthew,
> >
> > Thanks for the KIP. I have a few minor comments:
> >
> > 1. In the public interface section, could we spell out
> > the configurations that we are changing with this
> > KIP? The name does not change but the semantic is
> > so it is good to be clear.
> >
> > 2. In the proposed changes section, I would rather
> > mention the configuration that we need to change the
> > validation for instead of saying "loosening the validation
> > on listenerListToEndPoints in kafka.utils.CoreUtils.scala"
> > as this is specific to the implementation.
> >
> > 3. For my understanding, using the same port with two
> > different DNS entries would fail, right? e.g.
> > "PLAINTEXT://foo:9092,PLAINTEXT://bar:9092"
> >
> > Best,
> > David
> >
> > On Fri, Feb 11, 2022 at 10:35 AM Luke Chen  wrote:
> > >
> > > Hi Matthew,
> > >
> > > Thanks for the update.
> > > I'm +1 (binding)
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Fri, Feb 11, 2022 at 3:32 PM Matthew de Detrich
> > >  wrote:
> > >
> > > > Hi Luke,
> > > >
> > > > I have just updated the KIP with the changes you requested.
> > > >
> > > > Regards
> > > >
> > > > On Fri, Feb 11, 2022 at 4:47 AM Luke Chen  wrote:
> > > >
> > > > > Hi Matthew,
> > > > >
> > > > > I checked again the KIP, and it LGTM.
> > > > >
> > > > > Just a minor comment:
> > > > > Maybe add some examples into the KIP to show how users can set both
> > IPv4
> > > > > and IPv6 on the same port.
> > > > > And some examples to show how the validation will fail like you
> > listed in
> > > > > `Proposed Changes`.
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > >
> > > > > On Fri, Feb 11, 2022 at 8:54 AM Matthew de Detrich
> > > > >  wrote:
> > > > >
> > > > > > Hello everyone
> > > > > >
> > > > > > I have just updated/rebased the PR against the latest Kafka
> trunk.
> > Let
> > > > me
> > > > > > know if anything else is required/missing.
> > > > > >
> > > > > > Regards
> > > > > >
> > > > > > On Thu, Jan 13, 2022 at 10:28 AM Matthew de Detrich <
> > > > > > matthew.dedetr...@aiven.io> wrote:
> > > > > >
> > > > > > > Does anyone have any additional comments/regards to help get
> > this PR
> > > > > > voted
> > > > > > > through?
> > > > > > >
> > > > > > > On Tue, Nov 23, 2021 at 7:46 AM Josep Prat
> > > >  > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Matthew,
> > > > > > >>
> > > > > > >> Thank you for the PR.
> > > > > > >>
> > > > > > >> +1 (non binding) from my side.
> > > > > > >>
> > > > > > >>
> > > > > > >> Best,
> > > > > > >>
> > > > > > >> ———
> > > > > > >> Josep Prat
> > > > > > >>
> > > > > > >> Aiven Deutschland GmbH
> > > > > > >>
> > > > > > >> Immanuelkirchstraße 26, 10405 Berlin
> > > > > > >>
> > > > > > >> Amtsgericht Charlottenburg, HRB 209739 B
> > > > > > >>
> > > > > > >> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > > > > > >>
> > > > > > >> m: +491715557497
> > > > > > >>
> > > > > > >> w: aiven.io
> > > > > > >>
> > > > > > >> e: josep.p...@aiven.io
> > > > > > >>
> > > > > > >> On Tue, Nov 23, 2021, 07:11 Ivan Yurchenko <
> > > > ivan0yurche...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Hi,
> > > > > > >> >
> > > > > > >> > Thank you for the KIP.
> > > > > > >> >
> > > > > > >> > +1 (non-binding)
> > > > > > >> >
> > > > > > >> > Ivan
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Tue, 23 Nov 2021 at 04:18, Luke Chen 
> > > > wrote:
> > > > > > >> >
> > > > > > >> > > Hi Matthew,
> > > > > > >> > > Thanks for the KIP.
> > > > > > >> > > It makes sense to allow IPv4 and IPv6 listening on the
> same
> >

Re: [DISCUSS] KIP-821: Connect Transforms support for nested structures

2022-04-22 Thread Chris Egerton
Hi Tom,

Thanks for taking a look at this, and for your thoughtful comments. I'll
leave it up to Jorge to address most of your comments but I wanted to share
a couple quick thoughts I had regarding 103 and 104.

103. Like you, I was envisioning a possible syntax for array access that
used classic C-style brackets; e.g., `arr[index]`. However, I wonder if we
could keep things simple and use the same syntax that we're proposing for
nested field access? In other words, instead of `arr[index]`, you'd write
`arr.index`. It'd save us and users the headache of reserving characters
now that would need to be escaped even if their unescaped brethren aren't
used for anything, and also avoid the question of what exactly we should do
when we see a config that uses reserved characters that aren't yet
supported (throwing an exception seems pretty unfriendly for new users).

104. This would probably be useful, but it would come with some nasty
compatibility questions that would need to be addressed if we'd want SMTs
that leverage this new API to be viable for older versions of Connect. If
we package and distribute this feature as a library (either via an entirely
new artifact, or as part of the existing connect-transforms or connect-api
artifacts), then we'd have to either sidestep the existing plugin isolation
logic [1] that basically makes it impossible for Connect plugins to ship
their own versions of Connect artifacts, or issue a big warning to people
that any SMT that uses this API won't work with any older versions of
Connect. There's also some other features we might want to add in an
SMT-utils library such as the existing, internal, utils that Connect uses
right now [2]. It may be worth exploring this in a separate KIP of its own.

[1] -
https://github.com/apache/kafka/blob/d480c4aa6e513e36050d8e067931de2270525d18/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java#L46-L143

[2] -
https://github.com/apache/kafka/tree/d480c4aa6e513e36050d8e067931de2270525d18/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util

Cheers,

Chris

On Fri, Apr 22, 2022 at 6:55 AM Tom Bentley  wrote:

> Hi Jorge,
>
> Thanks for the KIP, especially for the examples which are super-clear.
>
> 100. The name `field.style` isn't so clear for something like ReplaceField:
> it's not so obvious that field.style applies to `include` and `exclude`.
>
> 101. The permitted values for `field.style` don't seem terribly intuitive
> (to me anyway): the meaning of `plain` isn't very guessable. Why not
> `top-level` or `root` instead? Also `nested` could be misconstrued to mean
> nested-but-not-top-level, so perhaps `recursive` or `cascading` might be
> better?
>
> 102. I'm torn on whether making the interpretation of existing configs like
> `field` be dependent on `field.style` is a good idea. I can see that it's
> the simplest thing to do, but it just feels a bit odd that sometimes the
> `field` would actually be a path and have different escaping rules. An
> alternative would be to come up with a parallel set of config names (e.g.
> as well as "field" an SMT might support "path") which were defined to
> always take paths, thus avoiding the changeable interpretation of the
> existing configs. The SMT's #configure() would need to throw in the case
> that both configs were given. I can see that that would be more work in
> implementation, but it feels cleaner.
>
> 103. I think in order to allow for supporting arrays in a later KIP (which
> certainly seems like it could be useful), we'd want to specify the syntax
> now, even if it wasn't implemented under this KIP. That's because I don't
> think you can't exclude the possibility that characters such as `[` and `]`
> appear in field names. So you'd have a compatibility problem if people
> started using the features of this KIP to access such fields, only for
> those characters to change their meaning under a later KIP.
>
> 104. I also wonder whether making paths into a public Java API, for use by
> 3rd party SMTs, would be valuable.
>
> Thanks again,
>
> Tom
>
>
>
> On Wed, 20 Apr 2022 at 17:53, Chris Egerton 
> wrote:
>
> > 💯 Thanks Jorge, LGTM!
> >
> > On Wed, Apr 20, 2022, 12:40 Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> > > Thank you, Chris! Not possible without your feedback.
> > >
> > > On Tue, 19 Apr 2022 at 23:04, Chris Egerton 
> > > wrote:
> > >
> > > > Hi Jorge,
> > > >
> > > > Thank you for sticking through this. I have one small remark and one
> > > small
> > > > clarification; assuming you agree with me on them then I'm ready to
> > vote
> > > on
> > > > the KIP.
> > > >
> > > > 1. InsertField: The "field.on.missing.parent" and
> > > "field.on.existing.field"
> > > > docs both mention a permitted value of "ingore"; this should be
> > "ignore",
> > > > right?
> > > >
> > >
> > > Of course, one more typo :)
> > >
> > >
> > > > 2. InsertField: The examples are still missing the "field.style"
> > property
> >

[jira] [Created] (KAFKA-13849) All topics whose cleanup policy contains `compact` are not executing deleteLogStartOffsetBreachedSegments as expected

2022-04-22 Thread RivenSun (Jira)
RivenSun created KAFKA-13849:


 Summary: All topics whose cleanup policy  contains `compact` are 
not executing deleteLogStartOffsetBreachedSegments as expected
 Key: KAFKA-13849
 URL: https://issues.apache.org/jira/browse/KAFKA-13849
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.0.1
Reporter: RivenSun


This should be a *regression* bug.
https://issues.apache.org/jira/browse/KAFKA-7400

Currently the logic in LogManager.cleanupLogs() does not follow the above JIRA.


Analyze the source code of LogManager.cleanupLogs(): 

1. When getting deletableLogs, the logs with `compact` enabled are filtered out 
in the two branches of if-else below.
{code:java}
// clean current logs.
val deletableLogs = {
  if (cleaner != null) {
// prevent cleaner from working on same partitions when changing cleanup 
policy
cleaner.pauseCleaningForNonCompactedPartitions()
  } else {
currentLogs.filter {
  case (_, log) => !log.config.compact
}
  }
}{code}
2. But in the subsequent UnifiedLog.deleteOldSegments method, from the comments 
and code logic of this method, the topic of enable `compact` also wants to 
execute the deleteLogStartOffsetBreachedSegments method.
{code:java}
/**
 * If topic deletion is enabled, delete any local log segments that have either 
expired due to time based retention
 * or because the log size is > retentionSize.
 *
 * Whether or not deletion is enabled, delete any local log segments that are 
before the log start offset
 */
def deleteOldSegments(): Int = {
  if (config.delete) {
deleteLogStartOffsetBreachedSegments() +
  deleteRetentionSizeBreachedSegments() +
  deleteRetentionMsBreachedSegments()
  } else {
deleteLogStartOffsetBreachedSegments()
  }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-821: Connect Transforms support for nested structures

2022-04-22 Thread Joshua Grisham
Hello all!

Sorry that I come a bit later to the party here, but I am the one who wrote
KIP-683 [1] for recursive support (just simply looping through all child
non-primitive structures for the same matching name(s)) which is a slightly
different way to try and solve a similar requirement -- unfortunately at
the time the dev community was not quite as active and then I also got busy
with work and just life in general so wasn't able to follow up or push it.

I do think it is a very good idea to have some kind of path-like expression
to be able to specifically address a nested field, as I can see that the
simple "recursive" case could potentially result in unwanted or unexpected
behavior, plus there is the potential to introduce a bit of a performance
hit to always loop through everything in cases where the schemas/values
might be quite large.

One thing I wanted to ask: instead of creating a new "path parser"
including some bespoke or "borrowed" syntax, why not just use something
that already exists? Specifically here I am thinking about JsonPath (
https://github.com/json-path/JsonPath)

There is already quite nice support in JsonPath for handling special
characters in field names, for handling different non-primitive types
(arrays etc), for handling multiple levels of nesting, etc etc.  Would it
be possible to instead to re-think this and maybe have some kind of
JsonPath-based Schema selector / updater and/or JsonPath-based Value
selector / updater? Conceptually this feels like it makes sense to me, as
from the top of my head it would be quite a natural fit to map a Json data
structure to the Connect API data structure (and you could potentially even
try to leverage the existing Json-to-Connect serializer/deserializer to
help out with this even in a more "out of the box"-feeling kind of way).

Maybe also as Tom mentioned, this part (in my example, this JsonPath-based
"thing") could even be a generic API that could be used by any SMT,
including used in custom ones built by the community.  Then I think to use
a completely separate config property somehow related to "path" (as Tom
also mentioned) would also make a lot of sense here as well. This way, if
you select based on "path" then this JsonPath-based API would be used,
otherwise it could use something similar to the existing get-field based
approach (which I guess could also be refactored into some kind of utility
/ API as well if it made sense?)

And with that in mind, if this was the kind of direction to go, then a
"recursive" capability like I pitched in KIP-683 would also become
unnecessary because you could easily write a JsonPath expression like
"$..someRecuriveField" and it would do the same thing (on top of anything
else you would want to do that is already supported by JsonPath). Then we
could also kill that older KIP and do a bit of clean-up :)

[1] -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-683%3A+Add+recursive+support+to+Connect+Cast+and+ReplaceField+transforms%2C+and+support+for+casting+complex+types+to+either+a+native+or+JSON+string

Just some extra food for thought. All-in-all I think this is a super great
initiative!

Best,

Joshua


Den fre 22 apr. 2022 kl 14:50 skrev Chris Egerton :

> Hi Tom,
>
> Thanks for taking a look at this, and for your thoughtful comments. I'll
> leave it up to Jorge to address most of your comments but I wanted to share
> a couple quick thoughts I had regarding 103 and 104.
>
> 103. Like you, I was envisioning a possible syntax for array access that
> used classic C-style brackets; e.g., `arr[index]`. However, I wonder if we
> could keep things simple and use the same syntax that we're proposing for
> nested field access? In other words, instead of `arr[index]`, you'd write
> `arr.index`. It'd save us and users the headache of reserving characters
> now that would need to be escaped even if their unescaped brethren aren't
> used for anything, and also avoid the question of what exactly we should do
> when we see a config that uses reserved characters that aren't yet
> supported (throwing an exception seems pretty unfriendly for new users).
>
> 104. This would probably be useful, but it would come with some nasty
> compatibility questions that would need to be addressed if we'd want SMTs
> that leverage this new API to be viable for older versions of Connect. If
> we package and distribute this feature as a library (either via an entirely
> new artifact, or as part of the existing connect-transforms or connect-api
> artifacts), then we'd have to either sidestep the existing plugin isolation
> logic [1] that basically makes it impossible for Connect plugins to ship
> their own versions of Connect artifacts, or issue a big warning to people
> that any SMT that uses this API won't work with any older versions of
> Connect. There's also some other features we might want to add in an
> SMT-utils library such as the existing, internal, utils that Connect uses
> right now [2]. It may be worth exploring this

[jira] [Resolved] (KAFKA-13849) All topics whose cleanup policy contains `compact` are not executing deleteLogStartOffsetBreachedSegments as expected

2022-04-22 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun resolved KAFKA-13849.
--
Resolution: Not A Bug

> All topics whose cleanup policy  contains `compact` are not executing 
> deleteLogStartOffsetBreachedSegments as expected
> --
>
> Key: KAFKA-13849
> URL: https://issues.apache.org/jira/browse/KAFKA-13849
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.0.1
>Reporter: RivenSun
>Priority: Major
>
> This should be a *regression* bug.
> https://issues.apache.org/jira/browse/KAFKA-7400
> Currently the logic in LogManager.cleanupLogs() does not follow the above 
> JIRA.
> Analyze the source code of LogManager.cleanupLogs(): 
> 1. When getting deletableLogs, the logs with `compact` enabled are filtered 
> out in the two branches of if-else below.
> {code:java}
> // clean current logs.
> val deletableLogs = {
>   if (cleaner != null) {
> // prevent cleaner from working on same partitions when changing cleanup 
> policy
> cleaner.pauseCleaningForNonCompactedPartitions()
>   } else {
> currentLogs.filter {
>   case (_, log) => !log.config.compact
> }
>   }
> }{code}
> 2. But in the subsequent UnifiedLog.deleteOldSegments method, from the 
> comments and code logic of this method, the topic of enable `compact` also 
> wants to execute the deleteLogStartOffsetBreachedSegments method.
> {code:java}
> /**
>  * If topic deletion is enabled, delete any local log segments that have 
> either expired due to time based retention
>  * or because the log size is > retentionSize.
>  *
>  * Whether or not deletion is enabled, delete any local log segments that are 
> before the log start offset
>  */
> def deleteOldSegments(): Int = {
>   if (config.delete) {
> deleteLogStartOffsetBreachedSegments() +
>   deleteRetentionSizeBreachedSegments() +
>   deleteRetentionMsBreachedSegments()
>   } else {
> deleteLogStartOffsetBreachedSegments()
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13850) kafka-metadata-shell is missing some record types

2022-04-22 Thread David Arthur (Jira)
David Arthur created KAFKA-13850:


 Summary: kafka-metadata-shell is missing some record types
 Key: KAFKA-13850
 URL: https://issues.apache.org/jira/browse/KAFKA-13850
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: David Arthur


Noticed while working on feature flags in KRaft, the in-memory tree of the 
metadata  (MetadataNodeManager) is missing support for a few of record types. 
 * DelegationTokenRecord
 * UserScramCredentialRecord (should we include this?)
 * FeatureLevelRecord
 * AccessControlEntryRecord

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-829: (console-consumer) add print.topic property

2022-04-22 Thread Evans Jahja
Hi, everyone,

Thanks for the comment on writing the example on KIP. I'm glad this KIP is
well received.
There's this other message queue called `nsq` with the consumer program
`nsq_tail` and it has the ability to print the topic, which is useful when
consuming from wildcard (includes), which led me to create this KIP.

I've updated examples in the KIP, and I've also added the unit test for
this.

As stated in the KIP, this is the expected output when we specify the topic:

Topic:topicvalue

and this is the expected output when we specify all of the print properties:

NO_TIMESTAMPPartition:0Offset:123Topic:topickeyvalue

I think we can discuss if there's any requirements on the ordering.
Personally, I wouldn't care so much about the ordering, as long as the
topic is included, but let's discuss if there are any suggestions
regarding this.

Thank you,

Evans


On Wed, Apr 20, 2022 at 1:59 PM Luke Chen  wrote:

> Hi Evans,
>
> This is a good feature that we missed before.
> +1 to add an example.
> I saw your WIP PR already added it, please put them into the KIP.
>
> Thank you.
> Luke
>
> On Sat, Apr 16, 2022 at 7:39 AM John Roesler  wrote:
>
> > Hi Evans,
> >
> > Thanks for the KIP!
> >
> > I agree with Mickael; it would be good to see an example in the KIP.
> >
> > Otherwise, I'm in favor.
> >
> > Thanks,
> > -John
> >
> > On Fri, Apr 15, 2022, at 04:33, Mickael Maison wrote:
> > > Hi,
> > >
> > > I can't believe this was missing! Thanks for addressing this.
> > > Can you add an example of the expected output with the topic in the
> KIP?
> > >
> > > Thanks,
> > > Mickael
> > >
> > > On Sat, Apr 9, 2022 at 12:11 PM Evans Jahja 
> > wrote:
> > >>
> > >> Good evening.
> > >>
> > >> My name is Evans, I am proposing KIP-829.
> > >>
> > >> Brief: Using `kafka-console-consumer` with `--include` argument
> listens
> > >> from multiple topics, but there is no way to know which topic the
> > message
> > >> came from. This proposal allows printing the topic name.
> > >>
> > >> Requesting for comments.
> > >>
> > >> Confluence
> > >> <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-829%3A+%28console-consumer%29+add+print.topic+property
> > >
> > >> Github 
> > >>
> > >> Best Regards,
> > >>
> > >> Evans Jahja
> >
>


contributor access

2022-04-22 Thread András Csáki
Hi Kafka Devs,

I'd like to start contributing to Apache Kafka, first with a fix for
KAFKA-13848  I've just
reported.
Please add me to the list of contributors so I can assign this task to
myself and submit a PR! :)

My jira ID is: acsaki

Thank you,
Andras


Build failed in Jenkins: Kafka » kafka-2.7-jdk8 #189

2022-04-22 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 6.93 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRe

Re: [DISCUSS] KIP-830: Allow disabling JMX Reporter

2022-04-22 Thread Federico Valeri
Hi Mickael, what about setting the default value to JmxReporter while
also maintaining the old behavior? When a user is setting
metric.reporters without explicitly including the JmxReporter, we
could simply print a warning message which says "The automatic
addition of the JmxReporter is deprecated. Add it explicitly to the
metric.reporters list if needed.", or something like that. That way we
can avoid adding a new property, and drop the implicit setting in the
next major release.

On Wed, Apr 13, 2022 at 11:49 AM Mickael Maison
 wrote:
>
> Hi,
>
> I've created a small KIP to allow disabling JMXreporter:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-830%3A+Allow+disabling+JMX+Reporter
>
> Let me know if you have any feedback.
>
> Thanks,
> Mickael


[jira] [Created] (KAFKA-13851) Tests are missing for DeleteRecords API

2022-04-22 Thread Divij Vaidya (Jira)
Divij Vaidya created KAFKA-13851:


 Summary: Tests are missing for DeleteRecords API
 Key: KAFKA-13851
 URL: https://issues.apache.org/jira/browse/KAFKA-13851
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Divij Vaidya
Assignee: Divij Vaidya


I noticed that tests are missing for DeleteRecords API whereas we have tests 
for rest of the APIs (such as DeleteTopic) over here: 
[https://github.com/apache/kafka/tree/trunk/core/src/test/scala/unit/kafka/server]
 

The completion of this Jira item will ensure that we have test coverage for the 
DeleteRecords API.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: contributor access

2022-04-22 Thread Konstantine Karantasis
Just added you.

Thanks for your interest to contribute to the Apache Kafka project Andras!

Konstantine

On Fri, Apr 22, 2022 at 7:25 AM András Csáki  wrote:

> Hi Kafka Devs,
>
> I'd like to start contributing to Apache Kafka, first with a fix for
> KAFKA-13848  I've just
> reported.
> Please add me to the list of contributors so I can assign this task to
> myself and submit a PR! :)
>
> My jira ID is: acsaki
>
> Thank you,
> Andras
>


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #884

2022-04-22 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-819: Merge multiple KStreams in one operation

2022-04-22 Thread Matthias J. Sax

Nick, how should we proceed?

On 3/29/22 9:21 PM, Chris Egerton wrote:

Hi all,

The Named-first variant seems pretty appealing:
 merge(Named named, KStream... streams)
It doesn't align with the existing merge methods, but it does at least
follow precedent set by the (now-deprecated) branch method [1].

A Collection-based alternative seems slightly less appealing, but only
because I'm guessing it'll be more common for the set of to-be-merged
streams to be known at compile time. In that case, the syntactic sugar
provided by a variadic method is preferable to having to wrap your set of
streams in a call to, e.g., Arrays::asList [2].

An issue I have with the split variant:
 merge(KStream first, Named named, KStream... rest)
is that it doesn't seem very intuitive to users who aren't familiar with
Streams and don't have the context of how/when the overloaded variants to
the merge method were introduced.

If we really want things to be consistent, one possibility is to add a
Named-first variant:
 merge(Named named, KStream... streams)
Deprecate the existing Named variant:
 merge(KStream stream, Named named)
And change the existing single-arg merge method:
 merge(KStream stream)
To become variadic (like proposed by Matthias earlier in the thread):
 merge(KStream... streams)

In the end, we'd have three methods, possibly reduced to two by the next
major release:
 merge(Named named, KStream... streams)
 merge(KStream... streams)
 merge(KStream stream, Named named) (deprecated)

RE supporting both a Collection-based and a variadic method: it doesn't
look like this is too common in the Streams API right now and considering
how trivial it is to convert from one style to another in most cases
(either with the aforementioned Arrays::asList to turn a static
compile-time set of streams into a Collection, or with Collection::toArray
[3] to turn a runtime Collection of streams into an array which can be
passed to a variadic method), it doesn't seem worth it to pollute the API
space with multiple methods that provide the exact same behavior.

[1] -
https://kafka.apache.org/31/javadoc/org/apache/kafka/streams/kstream/KStream.html#branch(org.apache.kafka.streams.kstream.Named,org.apache.kafka.streams.kstream.Predicate..
.)
[2] -
https://docs.oracle.com/javase/8/docs/api/java/util/Arrays.html#asList-T...-
[3] -
https://docs.oracle.com/javase/8/docs/api/java/util/Collection.html#toArray-T:A-

Cheers,

Chris

On Tue, Mar 29, 2022 at 11:14 PM Matthias J. Sax  wrote:


My understand was, that the original proposal was to have:

merge(KStream stream);
merge(KStream... streams);

Maybe I misunderstood.


I am not really a fan of

merge(KStream stream, KStream... streams);

because it seem to break the `Collection` pattern. If I have a
collection of KStreams, I need to artificially extract one and pass as
first argument and pass all others using the second argument.

On the other hand, _if_ I have a collection, going back to the original
proposal of

merge(Collection streams);

would work, too. Maybe bottom line is, that we might want to have both
(`Collection` and vararg) to optimize for both cases? On the other hand
it feels rather redundant? Also not sure if both are compatible?



The issue with Named is interesting. Wondering if we should just flip
the argument order:

merge(Named name, KStream... streams);

Falling back to `Collection` would also avoid this issue.



-Matthias


On 3/29/22 1:43 AM, Nick Telford wrote:

Yeah, the Named parameter makes it a little trickier. My suggestion would
be to add an additional overload that looks like:

KStream merged(KStream first, Named named, KStream

rest);


It's not ideal having the Named parameter split the other parameters; we
could alternatively move the Named parameter to be first, but then that
wouldn't align with the rest of the API.

Nick

On Tue, 29 Mar 2022 at 05:20, Chris Egerton 

wrote:



Hi all,

Java permits the overload. Simple test class to demonstrate:

```
public class Test {
  private final String field;

  public Test(String field) {
  this.field = field;
  }

  public Test merge(Test that) {
  return new Test("Single-arg merge: " + this.field + ", " +
that.field);
  }

  public Test merge(Test that, Test... those) {
  String newField = "Varargs merge: " + this.field + ", " +
that.field;
  for (Test test : those) newField += ", " + test.field;
  return new Test(newField);
  }

  public static void main(String[] args) {
  Test t1 = new Test("t1"), t2 = new Test("t2"), t3 = new

Test("t3");

  Test merge1 = t1.merge(t2), merge2 = t1.merge(t2, t3);
  System.out.println(merge1.field); // Single-arg merge: t1, t2
  System.out.println(merge2.field); // Varargs merge: t1, t2, t3
  }
}
```

There's a great StackOverflow writeup on the subject [1], which explains
that during method resolution, priority is give

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2022-04-22 Thread Matthias J. Sax

Ivan,

are you still interested in this KIP? I think it would be a good addition.


-Matthias

On 8/16/21 5:30 PM, Matthias J. Sax wrote:

Your point about the IQ problem is an interesting one. I missed the
point that the "new key" would be a "superkey", and thus, it should
always be possible to compute the original key from the superkey. (As a
matter of fact, for windowed-table the windowed-key is also a superkey...)

I am not sure if we need to follow the "use the head idea" or if we need
a "CompositeKey" interface? It seems we can just allow for any types and
we can be agnostic to it?

KStream stream = ...
KStream stream2 =
   stream.selectKey(/*set superkey*/)
 .markAsPartitioned()

We only need a `Function` without any restrictions on the type,
to map the "superkey" to the original "partition key"?


Do you propose to provide the "revers mapper" via the
`markAsPartitioned()` method (or config object), or via the IQ methods?
Not sure which one is better?


However, I am not sure if it would solve the join problem? At least not
easily: if one has two KStream and one is properly
partitioned by `Tuple` while the other one is "marked-as-partitoned",
the join would just fail. -- Similar for a stream-table join. -- The
only fix would be to do the re-partitioning anyway, effectively ignoring
the "user hint", but it seems to defeat the purpose? Again, I would
argue that it is ok to not handle this case, but leave it as the
responsibility for the user to not mess it up.


-Matthias

On 8/9/21 2:32 PM, Ivan Ponomarev wrote:

Hi Matthias and Sophie!

==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==

I don't have a strong opinion here, both Sophie's and Matthias' points
look convincing for me.

I think we should estimate the following: what is the probability that
we will ever need to extend `selectKey` etc. with a config for the
purposes other than `markAsPartitioned`?

If we find this probability high, then it's just a refactoring to
deprecate overloads with `Named` and introduce overloads with dedicated
configs, and we should do it this way.

If it's low or zero, maybe it's better not to mess with the existing
APIs and to introduce a single `markAsPartitioned()` method, which
itself can be easily deprecated if we find a better solution later!


==2. The IQ problem==


it then has to be the case that



Partitioner.partition(key) == Partitioner.partition(map(key))



Sophie, you got this wrong, and Matthias already explained why.

The actual required property for the mapping function is:

\forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))

or, by contraposition law,

\forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )


(look at the whiteboard photo that I attached to the KIP).

There is a big class of such mappings: key -> Tuple(key, anyValue). This
is actually what we often do before aggregation, and this mapping does
not require repartition.

But of course we can extract the original key from Tuple(key, anyValue),
and this can save IQ and joins!

This is what I'm talking about when I talk about 'CompositeKey' idea.

We can do the following:

1. implement a 'partitioner wrapper' that recognizes tuples
(CompositeKeys) and uses only the 'head' to calculate the partition,

2. implement

selectCompositeKey(BiFunction tailSelector) {
   selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
   //MARK_AS_PARTITIONED call here,
   //but this call is an implementation detail and we do not expose
   //markAsPartitioned publicly!
}

WDYT? (it's just a brainstorming idea)

09.08.2021 2:38, Matthias J. Sax пишет:

Hi,

I originally had a similar thought about `markAsPartitioned()` vs
extending `selectKey()` et al. with a config. While I agree that it
might be conceptually cleaner to use a config object, I did not propose
it as the API impact (deprecating stuff and adding new stuff) is quite
big... If we think it's an acceptable price to pay, I am ok with it
though.

I also do think, that `markAsPartitioned()` could actually be
categorized as an operator... We don't expose it in the API as
first-class citizen atm, but in fact we have two types of `KStream` -- a
"PartitionedKStream" and a "NonPartitionedKStream". Thus,
`markAsPartitioned()` can be seen as a "cast operator" that converts the
one into the other.

I also think that the raised concern about "forgetting to remove
`markAsPartitioned()`" might not be very strong though. If you have
different places in the code that link stuff together, a call to eg.
`selectKey().markAsPartitioned()` must always to together. If you have
some other place in the code that get a `KStream` passed an input, it
would be "invalid" to blindly call `markAsPartitioned()` as you don't
know anything about the upstream code. Of course, it requires some
"coding discipline" to follow this pattern... Also, you can shoot
themselves into the foot if they want with the config object pattern,
too: if you get a `KStrea

Re: [DISCUSS] KIP-831: Add metric for log recovery progress

2022-04-22 Thread James Cheng
The KIP describes RemainingLogsToRecovery, which seems to be the number of 
partitions in each log.dir. 

We have some partitions which are much much larger than others. Those large 
partitions have many many more segments than others. 

Is there a way the metric can reflect partition size? Could it be 
RemainingSegmentsToRecover? Or even RemainingBytesToRecover?

-James

Sent from my iPhone

> On Apr 20, 2022, at 2:01 AM, Luke Chen  wrote:
> 
> Hi all,
> 
> I'd like to propose a KIP to expose a metric for log recovery progress.
> This metric would let the admins have a way to monitor the log recovery
> progress.
> Details can be found here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-831%3A+Add+metric+for+log+recovery+progress
> 
> Any feedback is appreciated.
> 
> Thank you.
> Luke


[jira] [Resolved] (KAFKA-13398) The caller program will be shut down directly when the execution of Kafka script is abnormal

2022-04-22 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun resolved KAFKA-13398.
--
Resolution: Not A Problem

> The caller program will be shut down directly when the execution of Kafka 
> script is abnormal
> 
>
> Key: KAFKA-13398
> URL: https://issues.apache.org/jira/browse/KAFKA-13398
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 3.0.0
>Reporter: RivenSun
>Priority: Major
>
> hello [~showuon] and [~guozhang]
> Kafka has some key functions that have not yet been integrated into 
> Java-AdminClient, so I have to use some Scala classes in the Kafka Server 
> `kafka.admin` package in my java program, such as: 
> `ReassignPartitionsCommand`, `ConsumerGroupCommand` (reset group offsets),  
> and etc., to call their `*main(args: Array[String])*` methods in order to 
> achieve specific functions.
> *Problem*:
> 1. In different Kafka versions, these Scala classes may have different 
> requirements for input parameters, or they may have different treatments for 
> the results of command execution.
>  1) `ReassignPartitionsCommand` requires  --bootstrap-server is required in 
> the latest high version,
> but requires --zookeeper in the low version.
>  Once the parameter verification fails, the *Exit.exit(1, Some(message))* 
> method will be called, which will cause my process to shut down directly.
>  2) In Kafka 3.0.0 version, there is this code at the end in the `*main(args: 
> Array[String])*` method of `ReassignPartitionsCommand`
> {code:java}
> // If the command failed, exit with a non-zero exit code.
>  if (failed) {
>  Exit.exit(1)
>  }{code}
> This will also make my process shut down directly
> So I hope that the Kafka community will be able to print out the reason and 
> stack of the corresponding exception when the parameter verification fails or 
> the execution command is abnormal, and then return from the `*main(args: 
> Array[String])*` method of the command, but don't call `*Exit.exit(...)*` 
> method. Of course, when the script is executed on the machine, there is no 
> problem with exiting directly.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)