[jira] [Created] (KAFKA-13924) purging a Kafka topic at an specific time and interval

2022-05-21 Thread Amin Qurjili (Jira)
Amin Qurjili created KAFKA-13924:


 Summary: purging a Kafka topic at an specific time and interval
 Key: KAFKA-13924
 URL: https://issues.apache.org/jira/browse/KAFKA-13924
 Project: Kafka
  Issue Type: Improvement
  Components: log cleaner
Reporter: Amin Qurjili


In our use case we had an idea to use Kafka compaction strategy as a persisted 
cache and as a disaster recovery:

we are developing a trading platform where order notifications are reset at 
00:00am everyday. and trade times are not consistent to handle this with 
retention.ms parameters.

so it would be great if a mechanism is built in for purging a topic at an 
specific time like 00:00 and interval like every 2 days or everyday.

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13925) Интеграция информационно-финансовых систем

2022-05-21 Thread Yudin Nikita (Jira)
Yudin Nikita created KAFKA-13925:


 Summary: Интеграция информационно-финансовых систем
 Key: KAFKA-13925
 URL: https://issues.apache.org/jira/browse/KAFKA-13925
 Project: Kafka
  Issue Type: Test
Reporter: Yudin Nikita






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-839: Provide builders for KafkaProducer/KafkaConsumer and KafkaStreams

2022-05-21 Thread Chris Egerton
Hi François,

Thanks for writing this up! A few thoughts now that I've had some time to
think this over:

1. The KIP outlines methods for the KafkaStreamsBuilder class to add
producer interceptors and consumer interceptors. Since Streams instantiates
many producers and consumers, we should probably leave these methods out of
the builder due to the concerns noted in the KIP about shared resources
being closed when a client is shut down. The existing KafkaClientSupplier
interface should be sufficient to cover the case of interceptors already,
but if not, we can explore alternatives that don't risk conflict over
shared resources.

2. In a similar vein, the KIP also outlines a method for the
KafkaStreamsBuilder class to add metrics reporters. Unlike interceptors, I
think there's still some value to this method, since a KafkaStreams
instance does instantiate its own set of metrics reporters. However, we
should probably be careful to note in the docs for this method (and in the
KIP itself) that this method only controls the metrics reporters for
Streams-specific metrics, and not for the Kafka clients that Streams uses.
We could also point people towards the KafkaClientSupplier interface if
they'd like to add metrics reporters for these Kafka clients.

3. We should probably add a builder for the Admin interface as well, since
it's also a Kafka client class and comes with the same "metric.reporters"
property (which takes a list of classes) that the other two come with.

4. Can you specify the exact package(s) that the new builders will be added
to, and the exact type signatures for their constructors and methods? You
can see KIP-585 [1] for a good example of how KIPs usually define new
classes/interfaces. You don't need to add Javadocs to everything, but if
there's something worth mentioning like the behavior proposed in item 2,
it's nice to include in a Javadoc for the relevant class/method.

5. Just for aesthetics, I wonder if we can add static builder() methods to
the KafkaProducer, KafkaConsumer, KafkaStreams, and Admin classes/interface
that return a new builder? Seems cleaner to write
"KafkaProducer.builder(props)" than "new KafkaProducerBuilder(props)". Not
a huge thing, though.

[1] -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-585%3A+Filter+and+Conditional+SMTs#KIP585:FilterandConditionalSMTs-PublicInterfaces

Cheers,

Chris

On Thu, May 19, 2022 at 4:59 PM François Rosière 
wrote:

> @Kirk,
>
> 1. No defaults are expected. Serializers/deserializers would need to either
> by provided using the config or the builder.
> 2. As some properties would need to be closed, maybe the build method
> should only be called one time. To see if we really need to add a check for
> that.
>
> Kr,
>
> François.
>
> Le jeu. 19 mai 2022 à 00:02, Kirk True  a écrit :
>
> > Hi François,
> >
> > Thanks for the KIP!
> >
> > A couple of questions:
> >
> >  1. Do the builders have defaults for the serializers/deserializers?
> >  2. Can the build method be called more than once on a given builder?
> >
> > Thanks,
> > Kirk
> >
> > On Wed, May 18, 2022, at 10:11 AM, François Rosière wrote:
> > > Hi all,
> > >
> > > KIP to create builders for
> > >
> > >- KafkaProducer
> > >- KafkaConsumer
> > >- KafkaStreams
> > >
> > >
> > > This KIP can be seen as the continuity of the KIP-832.
> > >
> > > KIP details:  
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211884640
> > > Jira issue: https://issues.apache.org/jira/browse/KAFKA-13913
> > >
> > > Kr,
> > >
> > > F.
> > >
> >
>


Re: [DISCUSS] KIP-821: Connect Transforms support for nested structures

2022-05-21 Thread Chris Egerton
Hi Jorge,

I really appreciate the effort you've made to simplify the syntax and
feature set of a JSONPath-based approach as much as possible. I'm still
hesitant to continue with it, though.

1. The syntax is much less friendly. Just compare "top.mid.bottom" to
"$['top']['mid']['bottom']"... not everyone uses JSONPath or even JSON, and
the learning curve for the former is going to be steeper. The examples in
the new KIP draft you published demonstrate this pretty well, and this is
without diving into the details of what escape syntax would look like.

2. The three new major features that this syntax adds (array accesses, deep
scans, and multi-value paths) could all be added pretty easily to the dot
notation syntax without introducing brackets and dollar signs. Array
accesses can be described using the same syntax as struct/map field access,
deep scans can be described using '*', and multi-value paths can be
described by referencing the name of a field that's expected to have
children. These are all top-of-the-head ideas and can probably be refined,
but hopefully they demonstrate that we can keep the syntax simple without
sacrificing features. Of course, the question of leaving room for future
features might arise... given that these are the out-of-the-box SMTs that
are likely to be the first that many people encounter, I'd err on the side
of simplicity and a gentle learning curve; if people need to get more out
of transforms, the option to implement their own is still there. If we can
address 95% of use cases with something easy to use, it's not worth making
the feature harder for everyone to use just to accommodate the remaining 5%.

3. The advantage of leveraging an existing syntax is twofold: users who are
already familiar with that syntax don't need to learn a new syntax, and
maintainers of the syntax get to leverage existing libraries and
documentation for the syntax. With the current proposal, reusing libraries
is off the table, which means that we're going to have to parse this
ourselves (including all the escape syntax edge cases), and we won't be
able to automatically leverage new features added to that syntax. And given
how stripped-down the syntax is in comparison to full JSONPath, there's
still going to be a learning curve for users who are already familiar with
it, and we'll still have to document how Connect's variant of JSONPath
works either instead of or in addition to just linking to a well-maintained
third-party docs site.

On the topic of "field.path" and "include.path" vs. "field.style", I
actually think that a single property per SMT is cleaner and simpler.
Allowing users to mix and match styles within the same SMT config seems
like a recipe for confusion, and with a single property that dictates field
syntax behavior, we leave the door open to change the default at a later
date. We could even fully remove support for plain field notation at some
point and still be able to retain the simple property names of "field",
"include", etc. instead of forcing people to use "field.path" and the like.
That said, the term "field.style" and the permitted values might be a
little ambiguous. One alternative, though a little heavy-handed, is to
change it to "field.syntax.version" with permitted values of "V1" (default,
equivalent to "field.style = plain") and "V2" (equivalent to "field.style =
nested"). This would leave us room in the future to make further changes to
the syntax without having to come up with new names, although it does
sacrifice a little bit in that the permitted values are no longer
self-describing.

Cheers,

Chris

On Sun, May 15, 2022 at 4:51 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Thank you all for your feedback, and sorry for the long wait for a reply.
>
> I would like to explore the idea of JSONPath-inspired/subset notation a bit
> further:
>
> It will need to be a much-reduced version of JSONPath:
> - No full support for JsonPath therefore an additional dependency.
> - All paths must start with '$'
> - No functions support or other operators allowed.
> - JsonPath dotted and square-bracket notations can be supported to avoid
> escaping dots or other characters: `$a.b.c` and `$['a.b']['c']` - or we
> could only support the second one as it's more complete.
> - Add support for arrays with `[]`, e.g. `$a.[1].b`
> - Add support for multiple-value paths using array access `$a.*.b` or deep
> scan `$a..b`.
> - Some SMTs that will benefit from this: `MaskField`, `Cast`,
> `ReplaceField`.
> - We could introduce a `path[s]` config under the current configurations to
> apply this feature so no compatibility issues are introduced: e.g.
> `field.path`, `fields.paths`, `exclude.path`.
>
> With these,  100, 101, 102, and 103 will be effectively solved.
>
> The main challenge that I see at the moment is that by being JSON
> path-like, there may be some edge cases that I can't foresee at the moment,
> that could make this hard to implement, test, and maintain 

[jira] [Resolved] (KAFKA-13921) Flaky test TopicCommandIntegrationTest testDescribeAtMinIsrPartitions(String).quorum=kraft

2022-05-21 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-13921.
--
  Reviewer: Jason Gustafson
Resolution: Fixed

> Flaky test TopicCommandIntegrationTest 
> testDescribeAtMinIsrPartitions(String).quorum=kraft
> --
>
> Key: KAFKA-13921
> URL: https://issues.apache.org/jira/browse/KAFKA-13921
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Divij Vaidya
>Assignee: Divij Vaidya
>Priority: Minor
>  Labels: flaky-test
> Fix For: 3.3.0
>
>
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12184/1/tests]
> {code:java}
> org.opentest4j.AssertionFailedError: expected: <1> but was: <7>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)   
> at 
> app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62)
>at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)  
> at 
> app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)  
> at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:527)
>   at 
> app//kafka.admin.TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(TopicCommandIntegrationTest.scala:704)
>at 
> java.base@11.0.12/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)   at 
> java.base@11.0.12/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base@11.0.12/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@11.0.12/java.lang.reflect.Method.invoke(Method.java:566)   
> at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:92)
>at 
> app//org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> at 
> app//org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at 
> app//org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>  at 
> app//org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
>at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
> at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
>  at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at 
> app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(Throw

[jira] [Resolved] (KAFKA-13889) Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL

2022-05-21 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-13889.
-
Fix Version/s: 3.3.0
   Resolution: Fixed

> Fix AclsDelta to handle ACCESS_CONTROL_ENTRY_RECORD quickly followed by 
> REMOVE_ACCESS_CONTROL_ENTRY_RECORD for same ACL
> ---
>
> Key: KAFKA-13889
> URL: https://issues.apache.org/jira/browse/KAFKA-13889
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew Grant
>Priority: Major
> Fix For: 3.3.0
>
>
> In 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/image/AclsDelta.java#L64]
>  we store the pending deletion in the changes map. This could override a 
> creation that might have just happened. This is an issue because in 
> BrokerMetadataPublisher this results in us making a removeAcl call which 
> finally results in 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java#L203]
>  being executed and this code throws an exception if the ACL isnt in the Map 
> yet. If the ACCESS_CONTROL_ENTRY_RECORD event never got processed by 
> BrokerMetadataPublisher then the ACL wont be in the Map yet.
> My feeling is we might want to make removeAcl idempotent in that it returns 
> success if the ACL doesn't exist: no matter how many times removeAcl is 
> called it returns success if the ACL is deleted. Maybe we’d just log a 
> warning or something?
> Note, I dont think the AclControlManager has this issue because it doesn't 
> batch the events like AclsDelta does. However, we still do throw a 
> RuntimeException here 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java#L197]
>  - maybe we should still follow the same logic (if we make the fix suggested 
> above) and just log a warning if the ACL doesnt exist in the Map?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] KIP-831: Add metric for log recovery progress

2022-05-21 Thread Luke Chen
Hi Tom and Raman,

Thanks for your comments.

> 1. There's not a JIRA for this KIP (or the JIRA link needs updating).
2. Similarly the link to this discussion thread needs updating.
> Please update the links to JIRA and the discussion thread.

Yes, thanks for the reminder. I've updated the KIP.

> 3. I wonder whether we need to keep these metrics (with value 0) once the
broker enters the running state. Do you see it as valuable? A benefit of
removing the metrics would be a reduction on storage required for metric
stores which are recording these metrics.

Yes, removing the metrics after log recovery completed is a good idea.
Updated the KIP.

> 4. I think the KIP's public interfaces section could be a bit clearer.
Previous KIPs which added metrics usually used a table, with the MBean
name, metric type and description. SeeKIP-551 for example (or KIP-748,
KIP-608). Similarly you could use a table in the proposed changes section
rather than describing the tree you'd see in an MBean console.

Good point! Updated the KIP to use a table to list the MBean name, metric
type and descriptions.


Thank you.
Luke

On Fri, May 20, 2022 at 9:13 AM Raman Verma 
wrote:

> Hi Luke,
>
> The change is useful and simple. Thanks.
> Please update the links to JIRA and the discussion thread.
>
> Best Regards,
> Raman Verma
>
> On Thu, May 19, 2022 at 8:57 AM Tom Bentley  wrote:
> >
> > Hi Luke,
> >
> > Thanks for the KIP. I think the idea makes sense and would provide useful
> > observability of log recovery. I have a few comments.
> >
> > 1. There's not a JIRA for this KIP (or the JIRA link needs updating).
> > 2. Similarly the link to this discussion thread needs updating.
> > 3. I wonder whether we need to keep these metrics (with value 0) once the
> > broker enters the running state. Do you see it as valuable? A benefit of
> > removing the metrics would be a reduction on storage required for metric
> > stores which are recording these metrics.
> > 4. I think the KIP's public interfaces section could be a bit clearer.
> > Previous KIPs which added metrics usually used a table, with the MBean
> > name, metric type and description. SeeKIP-551 for example (or KIP-748,
> > KIP-608). Similarly you could use a table in the proposed changes section
> > rather than describing the tree you'd see in an MBean console.
> >
> > Kind regards,
> >
> > Tom
> >
> > On Wed, 11 May 2022 at 09:08, Luke Chen  wrote:
> >
> > > > And if people start using RemainingLogs and RemainingSegments and
> then
> > > REALLY FEEL like they need RemainingBytes, then we can always add it
> in the
> > > future.
> > >
> > > +1
> > >
> > > Thanks James!
> > > Luke
> > >
> > > On Wed, May 11, 2022 at 3:57 PM James Cheng 
> wrote:
> > >
> > > > Hi Luke,
> > > >
> > > > Thanks for the detailed explanation. I agree that the current
> proposal of
> > > > RemainingLogs and RemainingSegments will greatly improve the
> situation,
> > > and
> > > > that we can go ahead with the KIP as is.
> > > >
> > > > If RemainingBytes were straight-forward to implement, then I’d like
> to
> > > > have it. But we can live without it for now. And if people start
> using
> > > > RemainingLogs and RemainingSegments and then REALLY FEEL like they
> need
> > > > RemainingBytes, then we can always add it in the future.
> > > >
> > > > Thanks Luke, for the detailed explanation, and for responding to my
> > > > feedback!
> > > >
> > > > -James
> > > >
> > > > Sent from my iPhone
> > > >
> > > > > On May 10, 2022, at 6:48 AM, Luke Chen  wrote:
> > > > >
> > > > > Hi James and all,
> > > > >
> > > > > I checked again and I can see when creating UnifiedLog, we
> expected the
> > > > > logs/indexes/snapshots are in good state.
> > > > > So, I don't think we should break the current design to expose the
> > > > > `RemainingBytesToRecovery`
> > > > > metric.
> > > > >
> > > > > If there is no other comments, I'll start a vote within this week.
> > > > >
> > > > > Thank you.
> > > > > Luke
> > > > >
> > > > >> On Fri, May 6, 2022 at 6:00 PM Luke Chen 
> wrote:
> > > > >>
> > > > >> Hi James,
> > > > >>
> > > > >> Thanks for your input.
> > > > >>
> > > > >> For the `RemainingBytesToRecovery` metric proposal, I think
> there's
> > > one
> > > > >> thing I didn't make it clear.
> > > > >> Currently, when log manager start up, we'll try to load all logs
> > > > >> (segments), and during the log loading, we'll try to recover logs
> if
> > > > >> necessary.
> > > > >> And the logs loading is using "thread pool" as you thought.
> > > > >>
> > > > >> So, here's the problem:
> > > > >> All segments in each log folder (partition) will be loaded in
> each log
> > > > >> recovery thread, and until it's loaded, we can know how many
> segments
> > > > (or
> > > > >> how many Bytes) needed to recover.
> > > > >> That means, if we have 10 partition logs in one broker, and we
> have 2
> > > > log
> > > > >> recovery threads (num.recovery.threads.per.data.dir=2), before the
> > > > >> threads load the segments in eac