Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #2711

2024-03-08 Thread Apache Jenkins Server
See 




[DISCUSS] KIP-1026: Handling producer snapshot when upgrading from < v2.8.0 for Tiered Storage

2024-03-08 Thread Arpit Goyal
Hi All,

I have created KIP-1026 for handling producerSnapshot empty scenarios  when
the topic is upgraded from the kafka  < 2.8 version.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1026%3A+Handling+producer+snapshot+when+upgrading+from+%3C+v2.8.0+for+Tiered+Storage

Feedback and suggestions are welcome.

Thanks and Regards
Arpit Goyal
8861094754


Re: [DISCUSS] KIP-955: Add stream-table join on foreign key

2024-03-08 Thread Matthias J. Sax

Igor,

I did drop the ball on this discussion. Sorry about this. Too many 
things are happening at the same time.




Also I do not think that emitting multiple records from the stream-table FK
join is a 'weird' behaviour because this is exactly how the standard SQL
behaves and many stream processing tools try to mimic SQL at the DSL layer
- for example Spark Structured Streaming or Hadoop. 


While the KS DSL is also partially inspired by SQL, we never intended to 
mimic SQL. KS always focused on eventing, and a KStream models an 
event-stream. If you have a single event, you can enrich it with 
auxiliary data. But a 1:n stream-table join would mean, that an event is 
"duplicated" what is semantically questionable, because the thing the 
event models, did only happen ones, not twice?


Of course, we also have flatMap() in case a "fat record" that actually 
contains multiple sub-event come in, and we want to split out the 
sub-events. (I'll cycle back to this below.)




IMHO, SQL is not about eventing, and thus provides other operations than 
the KS DSL. If we would want to mimic SQL, we would not have a KStream 
to begin with, but we would model everything as KTable. Additionally, 
our KTables are limited compared to SQL tables, because we always 
require a PK, something SQL does not require. So we would also need to 
extend KTables if we would want to mimic SQL (but we don't intent to). I 
also want to point out, that a KStream is NOT a key-less SQL table, it 
models events, something that is not in the scope of the relational 
model. (Of course, you can create a key-less table and put events into 
it. But SQL and the relation model does not really understand that it's 
events -- it's just rows in a table in the SQL world and it's up the 
application to interpret and treat the event correctly.)


The point of having a KStream is to extend the semantics and scope of 
the DSL, and to not bind ourselves to KTables and SQL semantics. -- This 
is also the reason why a stream-table join is stream-side driven; table 
side updates don't trigger a join but only update the table, because the 
left hand side is stateless. And already emitted results are events, and 
thus immutable and cannot be update any longer. -- A stream-table join 
by it's very nature, is not a SQL join (SQL joins are "symmetric" if you 
wish while a stream-table join is inherently "asymmetic").




For example we could stream all orderItems for completedOrders as
separate messages (result of FK join) and then could count them based on
time window and on orderItemId to update the current inventory of these
items in the warehouse.


Not sure if I understand this example. What is the nature of `orderItem` 
and `completedOrders`? It seems `orderItem` would belongs to exactly one 
order. And `completedOrders` would be "order table" to see if an order 
is completed or not, and thus it's still a regular stream-table join? 
Why would an `orderItem` belong to more than one `order`?



Overall, I still think, I don't fully understand the use-case yet, for 
which you would *not* want to aggregate, but you would want/need more 
than one result from the join? To me, the event does not duplicate (ie, 
happened twice) and in contrast to a flatMap() we don't split out 
sub-event in a join (or do we)?


Also wondering about stream-stream join? The are used to correlate 
events that happen "close to each other" what is a n:m join. Not sure if 
this would help for your use case? I assume you did consider it and 
rules it out, but as I still don't fully understand your use case, I 
might not fully understand why.



You did mention that you don't want to trigger the join when the table 
updates at some point: going back to your "orderItems" and 
"completedOrders" example from above, this actually does not align. If 
you want to keep inventory up-to-date, you actually want to emit all 
orderItems when an order completes so you can re-stock. Thus, orderItems 
would need to be modeled as a table, and you want a table-table join.


This example might actually indicate (not sure if my interpretation is 
correct), that you might actually want a table, but the issue with a 
KTable would be, that the table grows unbounded(*). And for this reason, 
you try to fall back to a KStream. However, to me, that would not be the 
correct approach because you change _semantics_. If that's the case, the 
better KIP would be to add a TTL to KTable -- something that was asked 
for many times, and I think it would be beneficial to add. In the end, 
you might not want KStream semantics but just want to avoid unbounded 
grow of you KTable?


(*) For such an "orderItem" and "orderTable" case, if you would model 
this with SQL tables, you have the same issue. You always get new 
orders, and you need a way to eventually purge completed orders you are 
not interested in any longer?



Putting my lack or use-case understanding aside, I have a technical 
question: You propose to extract the

Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.7 #107

2024-03-08 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException

2024-03-08 Thread Matthias J. Sax

Hey Nick,

I am sorry that I have to say that I am not a fan of this KIP. I see way 
too many food-guns and complications that can be introduced.


I am also not sure if I understand the motivation. You say, CONTINUE and 
FAIL is not good enough, but don't describe in detail why? If we 
understand the actual problem better, it might also get clear how 
task-pausing would help to address the problem.



The main problem I see, as already mentioned by Sophie, it's about time 
synchronization. However, its not limited to joins, but affect all 
time-based operations, ie, also all windowed aggregations. If one task 
pauses but other keep running, we keep advancing stream-time downstream, 
and thus when the task would resume later, there is a very high 
probability that records are dropped as window got already closed.


For the runtime itself, we also cannot really do a cascading downstream 
pause, because the runtime does not know anything about the semantics of 
operators. We don't know if we execute a DSL operator or a PAPI 
operator. (We could maybe track all downsteam tasks independent of 
semantics, but in the end it might just imply we could also just pause 
all task...)


For the "skip record case", it's also not possible to skip over an 
offset from outside while the application is running. The offset in 
question is cached inside the consumer and the consumer would not go 
back to Kafka to re-read the offset (only when a partitions is 
re-assigned to a new consumer, the consumer would fetch the offset once 
to init itself). -- But even if the consumer would go back to read the 
offset, as long as the partition is assigned to a member of the group, 
it's not even possible to commit a new offset using some external tool. 
Only member of the group are allowed to commit offset, and all tools 
that allow to manipulate offsets require that the corresponding 
application is stopped, and that the consumer group is empty (and the 
tool will join the consumer group as only member and commit offsets).


Of course, we could pause all tasks, but that's kind similar to shut 
down? I agree though, that `FAIL` is rather harsh, and it could be a 
good thing to introduce a graceful `SHUTDOWN` option (similar to what we 
have via the uncaught exception handler)?


If we pause all tasks we would of course need to do this not just for a 
single instance, but for all... We do already have 
`KafkaStreams#pause()` but it does not include a application wide pause, 
but only an instance pause -- the assumption of this feature was, that 
an external pause signal would be send to all instances at the same 
time. Building it into KS was not done as potentially to complicated...


Other questions: if a task would be paused, would we commit the current 
offset? What happens if we re-balance? Would we just lose the "pause" 
state, and hit the same error again and just pause again?



Right now, I would rather propose to discard this KIP (or change the 
scope drastically to add a "global pause" and/or "global shutdown" 
option). Of course, if you can provide convincing answers, I am happy to 
move forward with per-task pausing. But my gut feeling is, that even if 
we would find technically sound solutions, it would be way too 
complicated to use (and maybe also to implement inside KS) for too 
little benefits.




-Matthias



On 10/26/23 5:57 AM, Nick Telford wrote:

1.
Woops! I've fixed that now. Thanks for catching that.

2.
I agree, I'll remove the LogAndPause handler so it's clear this is an
advanced feature. I'll also add some documentation to
DeserializationExceptionResponse#SUSPEND that explains the care users
should approach it with.

3a.
This is interesting. My main concern is that there may be situations where
skipping a single bad record is not the necessary solution, but the Task
should still be resumed without restarting the application. For example, if
there are several bad records in a row that should be skipped.

3b.
Additionally, a Task may have multiple input topics, so we'd need some way
to indicate which record to skip.

These can probably be resolved by something like skipAndContinue(TaskId
task, String topic, int recordsToSkip) or even skipAndContinue(TaskId task,
Map recordsToSkipByTopic)?

4.
Related to 2: I was thinking that users implementing their own handler may
want to be able to determine which Processors (i.e. which Subtopology/task
group) are being affected, so they can programmatically make a decision on
whether it's safe to PAUSE. ProcessorContext, which is already a parameter
to DeserializationExceptionHandler provides the TaskId of the failed Task,
but doesn't provide metadata on the Processors that Task executes.

Since TaskIds are non-deterministic (they can change when you modify your
topology, with no influence over how they're assigned), a user cannot use
TaskId alone to determine which Processors would be affected.

What do you think would be the best way to provide this information to
exce

Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #2710

2024-03-08 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 440673 lines...]
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
[2024-03-09T00:11:03.244Z] at 
app//org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
[2024-03-09T00:11:03.244Z] at 
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:119)
[2024-03-09T00:11:03.244Z] at 
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:94)
[2024-03-09T00:11:03.244Z] at 
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:89)
[2024-03-09T00:11:03.244Z] at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62)
[2024-03-09T00:11:03.244Z] at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method)
[2024-03-09T00:11:03.244Z] at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[2024-03-09T00:11:03.244Z] at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[2024-03-09T00:11:03.244Z] at 
java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566)
[2024-03-09T00:11:03.244Z] at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
[2024-03-09T00:11:03.244Z] at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
[2024-03-09T00:11:03.244Z] at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
[2024-03-09T00:11:03.244Z] at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
[2024-03-09T00:11:03.244Z] at com.sun.proxy.$Proxy2.stop(Unknown Source)
[2024-03-09T00:11:03.244Z] at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
[2024-03-09T00:11:03.244Z] at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
[2024-03-09T00:11:03.244Z] at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
[2024-03-09T00:11:03.244Z] at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
[2024-03-09T00:11:03.244Z] at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
[2024-03-09T00:11:03.244Z] at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
[2024-03-09T00:11:03.244Z] at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
[2024-03-09T00:11:03.244Z] at 
app//worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
[2024-03-09T00:11:03.244Z] at 
app//worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.ja

Re: [VOTE] KIP-956: Tiered Storage Quotas

2024-03-08 Thread Kamal Chandraprakash
+1 (non-binding), Thanks for the KIP, Abhijeet!

--
Kamal

On Fri, Mar 8, 2024 at 11:02 PM Jun Rao  wrote:

> Hi, Abhijeet,
>
> Thanks for the KIP. +1
>
> Jun
>
> On Fri, Mar 8, 2024 at 3:44 AM Abhijeet Kumar 
> wrote:
>
> > Hi All,
> >
> > I would like to start the vote for KIP-956 - Tiered Storage Quotas
> >
> > The KIP is here:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
> >
> > Regards.
> > Abhijeet.
> >
>


Re: [DISCUSS] KIP-1022 Formatting and Updating Features

2024-03-08 Thread Artem Livshits
Hi Justine,

>  Are you suggesting it should be called "transaction protocol version" or
"TPV"? I don't mind that, but just wanted to clarify if we want to include
protocol or if simply "transaction version" is enough.

My understanding is that "metadata version" is the version of metadata
records, which is fairly straightforward.  "Transaction version" may be
ambiguous.

-Artem

On Thu, Feb 29, 2024 at 3:39 PM Justine Olshan 
wrote:

> Hey folks,
>
> Thanks for the discussion. Let me try to cover everyone's comments.
>
> Artem --
> I can add the examples you mentioned. As for naming, right now the feature
> is called "transaction version" or "TV". Are you suggesting it should be
> called "transaction protocol version" or "TPV"? I don't mind that, but just
> wanted to clarify if we want to include protocol or if simply "transaction
> version" is enough.
>
> Jun --
>
> 10.  *With **more features, would each of those be controlled by a separate
> feature or*
>
> *multiple features. For example, is the new transaction record format*
>
> *controlled only by MV with TV having a dependency on MV or is it
> controlled*
>
> *by both MV and TV.*
>
>
> I think this will need to be decided on a case by case basis. There should
> be a mechanism to set dependencies among features.
> For transaction version specifically, I have no metadata version
> dependencies besides requiring 3.3 to write the feature records and use the
> feature tools. I would suspect all new features would have this
> requirement.
>
>
> 11. *Basically, if **--release-version is not used, the command will just
> use the latest*
>
> *production version of every feature. Should we apply that logic to both*
>
> *tools?*
>
>
> How would this work with the upgrade tool? I think we want a way to set a
> new feature version for one feature and not touch any of the others.
>
>
> *12. Should we remove --metadata METADATA from kafka-features? It does the*
>
> *same thing as --release-version.*
>
>
> When I previously discussed with Colin McCabe offline about this tool, he
> was strongly against deprecation or changing flags. I personally think it
> could good
>
> to unify and not support a ton of flags, but I would want to make sure he
> is aligned.
>
>
> *13. KIP-853 also extends the tools to support a new feature
> kraft.version.*
>
> *It would be useful to have alignment between that KIP and this one.*
>
>
> Sounds good. Looks like Jose is in on the discussion so we can continue
> here. :)
>
>
>
> Jose --
>
>
> *1. KIP-853 uses --feature for kafka-storage instead of --features.*
>
> *This is consistent with the use of --feature in the "kafka-feature.sh*
>
> *upgrade" command.*
>
>
> I wanted to include multiple features in one command, so it seems like
> features is a better name. I discuss more below about why I think we should
> allow setting multiple features at once.
>
>
> *2. I find it a bit inconsistent that --feature and --release-version*
>
> *are mutually exclusive in the kafka-feature CLI but not in the*
>
> *kafka-storage CLI. What is the reason for this decision?*
>
>
> For the storage tool, we are setting all the features for the cluster. By
> default, all are set. For the upgrade tool, the default is to set one
> feature. In the storage tool, it is natural for the --release-version to
> set the remaining features that --features didn't cover since otherwise we
> would need to set them all
>
> If we use the flag. In the feature upgrade case, it is less necessary for
> all the features to be set at once and the tool can be run multiple times.
> I'm not opposed to allowing both flags, but it is less necessary in my
> opinion.
>
>
> *3. KIP-853 deprecates --metadata in the kafka-features and makes it an*
>
> *alias for --release-version. In KIP-1022, what happens if the user*
>
> *specified both --metadata and --feature?*
>
>
> See my note above (Jun's comment 12) about deprecating old commands. I
> would think as the KIP stands now, we would not accept both commands.
>
>
> *4. I would suggest keeping this*
>
> *consistent with kafka-features. It would avoid having to implement one*
>
> *more parser in Kafka.*
>
>
> I sort of already implemented it as such, so I don't think it is too
> tricky. I'm not sure of an alternative. Kafka features currently only
> supports one feature at a time.
> I would like to support more than one for the storage tool. Do you have
> another suggestion for multiple features in the storage tool?
>
>
> *5. As currently described, trial and error seem to be the*
>
> *only mechanism. Should the Kafka documentation describe these*
>
> *dependencies? Is that good enough?*
>
>
> The plan so far is documentation. The idea is that this is an advanced
> feature, so I think it is reasonable to ask folks use documentation
>
>
> *6. Did you mean that 3.8-IV4 would map to TV2? If*
>
> *not, 3.8-IV3 would map to two different TV values.*
>
>
> It was a typo. Each MV maps to a single other feature version.
>
>
> *7. For --release-ve

[jira] [Created] (KAFKA-16358) Update Connect Transformation documentation

2024-03-08 Thread Hector Geraldino (Jira)
Hector Geraldino created KAFKA-16358:


 Summary: Update Connect Transformation documentation
 Key: KAFKA-16358
 URL: https://issues.apache.org/jira/browse/KAFKA-16358
 Project: Kafka
  Issue Type: Bug
  Components: connect
Reporter: Hector Geraldino


When reading the [Kafka Connect 
docs|https://kafka.apache.org/documentation/#connect_included_transformation] 
for transformations, there are a few gaps that should be covered:
 * The Flatten, Cast and TimestampConverter transformations are not listed
 * HeadersFrom should be HeaderFrom
 * InsertHeader is not documented

Should be relatively easy to fix



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.7 #106

2024-03-08 Thread Apache Jenkins Server
See 




Re: Requesting Feedback for a backup solution

2024-03-08 Thread Ralph Weires
Hi Karthick,

To share a few thoughts on this - I guess a lot depends on your actual
setup / requirements, such as:

   - The sort of potential outages you really need to tackle with this
   - The volume of data you have to continuously process
   - Constraints you may have in terms of messages that could be(come)
   duplicated or out-of-order in cases of outages. At least that will likely
   happen at the boundaries where you switch to / back from a secondary backup
   storage, and in the process of replaying data from that backup storage back
   to Kafka.

For certain kinds of network outages for example, your backup solution
could fail the same way as the access to Kafka - so you might not be able
to connect to Cassandra either.

At a previous job, we used a local on-disk storage buffer for specific
(critical) producers that couldn't afford even short outages in the
connection to Kafka. Producers would then replay buffered data from disk
back to Kafka once it came back up. That is of course just 'local' handling
(and limited by available disk-space), but it doesn't introduce a
dependency on another external system as backup.

If you are however certain to need another (external) system to serve as
backup, why not just use another (fully separate) Kafka cluster as your
backup, too? Using Kafka for this again seems only natural to me.
Introducing a fully different technology seems like overkill when it's
really just another message buffer you need - plus a different tech-stack
can have its own requirements and challenges to maintain. And as added
benefit, any tooling you may have to observe your actual Kafka cluster
could be used in similar ways also for data in the backup cluster (even if
that's ideally just empty).

Best,
Ralph


On Fri, Mar 8, 2024 at 12:54 PM Karthick  wrote:

> Hi guys, I'm still seeking for a solution, I hope many face this problem in
> production. Any working solutions will help us.
>
> On Sat, Feb 17, 2024 at 2:16 PM Karthick 
> wrote:
>
> > Dear Kafka Community,
> >
> > I am reaching out to seek your valuable feedback and insights on a
> > proposed solution we are considering for managing Kafka outages using
> > Cassandra.
> >
> > At our organization, we heavily rely on Kafka for real-time data
> > processing and messaging. However, like any technology, Kafka is
> > susceptible to occasional outages which can disrupt our operations and
> > impact our services. To mitigate the impact of such outages and ensure
> > continuity, we are exploring the possibility of leveraging Cassandra as a
> > backup solution.
> >
> > Our proposed approach involves storing messages in Cassandra during Kafka
> > outages. Subsequently, we plan to implement a scheduler that will read
> from
> > Cassandra and attempt to write these messages back into Kafka once it is
> > operational again.
> >
> > We believe that by adopting this strategy, we can achieve the following
> > benefits:
> >
> >1.
> >
> >Improved Fault Tolerance: By having a backup mechanism in place, we
> >can reduce the risk of data loss and ensure continuity of operations
> during
> >Kafka outages.
> >2.
> >
> >Enhanced Reliability: Cassandra's distributed architecture and
> >built-in replication features make it well-suited for storing data
> >reliably, even in the face of failures.
> >3.
> >
> >Scalability: Both Cassandra and Kafka are designed to scale
> >horizontally, allowing us to handle increased loads seamlessly.
> >
> > Before proceeding further with this approach, we would greatly appreciate
> > any feedback, suggestions, or concerns from the community. Specifically,
> we
> > are interested in hearing about:
> >
> >- Potential challenges or drawbacks of using Cassandra as a backup
> >solution for Kafka outages.
> >- Best practices or recommendations for implementing such a backup
> >mechanism effectively.
> >- Any alternative approaches or technologies that we should consider?
> >
> > Your expertise and insights are invaluable to us, and we are eager to
> > learn from your experiences and perspectives. Please feel free to share
> > your thoughts or reach out to us with any questions or clarifications.
> >
> > Thank you for taking the time to consider our proposal, and we look
> > forward to hearing from you soon.
> > Thanks and regards,
> > Karthick
> >
>


Re: Requesting Feedback for a backup solution

2024-03-08 Thread Karthick
Hi guys, I'm still seeking for a solution, I hope many face this problem in
production. Any working solutions will help us.

On Sat, Feb 17, 2024 at 2:16 PM Karthick  wrote:

> Dear Kafka Community,
>
> I am reaching out to seek your valuable feedback and insights on a
> proposed solution we are considering for managing Kafka outages using
> Cassandra.
>
> At our organization, we heavily rely on Kafka for real-time data
> processing and messaging. However, like any technology, Kafka is
> susceptible to occasional outages which can disrupt our operations and
> impact our services. To mitigate the impact of such outages and ensure
> continuity, we are exploring the possibility of leveraging Cassandra as a
> backup solution.
>
> Our proposed approach involves storing messages in Cassandra during Kafka
> outages. Subsequently, we plan to implement a scheduler that will read from
> Cassandra and attempt to write these messages back into Kafka once it is
> operational again.
>
> We believe that by adopting this strategy, we can achieve the following
> benefits:
>
>1.
>
>Improved Fault Tolerance: By having a backup mechanism in place, we
>can reduce the risk of data loss and ensure continuity of operations during
>Kafka outages.
>2.
>
>Enhanced Reliability: Cassandra's distributed architecture and
>built-in replication features make it well-suited for storing data
>reliably, even in the face of failures.
>3.
>
>Scalability: Both Cassandra and Kafka are designed to scale
>horizontally, allowing us to handle increased loads seamlessly.
>
> Before proceeding further with this approach, we would greatly appreciate
> any feedback, suggestions, or concerns from the community. Specifically, we
> are interested in hearing about:
>
>- Potential challenges or drawbacks of using Cassandra as a backup
>solution for Kafka outages.
>- Best practices or recommendations for implementing such a backup
>mechanism effectively.
>- Any alternative approaches or technologies that we should consider?
>
> Your expertise and insights are invaluable to us, and we are eager to
> learn from your experiences and perspectives. Please feel free to share
> your thoughts or reach out to us with any questions or clarifications.
>
> Thank you for taking the time to consider our proposal, and we look
> forward to hearing from you soon.
> Thanks and regards,
> Karthick
>


Re: [VOTE] KIP-956: Tiered Storage Quotas

2024-03-08 Thread Jun Rao
Hi, Abhijeet,

Thanks for the KIP. +1

Jun

On Fri, Mar 8, 2024 at 3:44 AM Abhijeet Kumar 
wrote:

> Hi All,
>
> I would like to start the vote for KIP-956 - Tiered Storage Quotas
>
> The KIP is here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas
>
> Regards.
> Abhijeet.
>


[jira] [Created] (KAFKA-16357) Kafka Client JAR manifest breaks javac linting

2024-03-08 Thread Jacek Wojciechowski (Jira)
Jacek Wojciechowski created KAFKA-16357:
---

 Summary: Kafka Client JAR manifest breaks javac linting
 Key: KAFKA-16357
 URL: https://issues.apache.org/jira/browse/KAFKA-16357
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.7.0
 Environment: Linux, JDK 21 (Docker image eclipse-temurin:21-jdk-jammy)
Reporter: Jacek Wojciechowski


I upgraded kafka-clients from 3.6.1 to 3.7.0 and discovered that my project is 
not building anymore.

The reason is that kafka-clients-3.7.0.jar contains the following entry in its 
JAR manifest file:

Class-Path: zstd-jni-1.5.5-6.jar lz4-java-1.8.0.jar snappy-java-1.1.10
 .5.jar slf4j-api-1.7.36.jar

I'm using Maven repo to keep my dependencies and those files are not in the 
same directory as kafka-clients-3.7.0.jar, so the paths in the manifest's 
Class-Path are not correct. It fails my build because we build with javac with 
all linting options on, in particular -Xlint:-path. It produces the following 
warnings coming from javac:
[WARNING] COMPILATION WARNING : 
[INFO] -
[WARNING] [path] bad path element 
"/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/zstd-jni-1.5.5-6.jar":
 no such file or directory
[WARNING] [path] bad path element 
"/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/lz4-java-1.8.0.jar":
 no such file or directory
[WARNING] [path] bad path element 
"/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/snappy-java-1.1.10.5.jar":
 no such file or directory
[WARNING] [path] bad path element 
"/home/ci/.m2/repository/org/apache/kafka/kafka-clients/3.7.0/slf4j-api-1.7.36.jar":
 no such file or directory
Since we have also {{-Werror}} option enabled, it turns warnings into errors 
and fails our build.

I think our setup is quite typical: using Maven repo to store dependencies, 
having linting on and -Werror. Unfortunatelly, it doesn't work with the lastest 
kafka-clients because of the entries in the manifest's Class-Path. And I think 
it might affect quite a lot of projects set up in a similar way.

I don't know what was the reason to add Class-Path entry in the JAR manifest 
file - but perhaps this effect was not considered.

It would be great if you removed the Class-Path entry from the JAR manifest 
file.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16356) Remove class-name dispatch in RemoteLogMetadataSerde

2024-03-08 Thread Greg Harris (Jira)
Greg Harris created KAFKA-16356:
---

 Summary: Remove class-name dispatch in RemoteLogMetadataSerde
 Key: KAFKA-16356
 URL: https://issues.apache.org/jira/browse/KAFKA-16356
 Project: Kafka
  Issue Type: Task
  Components: Tiered-Storage
Affects Versions: 3.7.0
Reporter: Greg Harris


The RemoteLogMetadataSerde#serialize receives a RemoteLogMetadata object, and 
has to dispatch to one of four serializers depending on it's type. This is done 
by taking the class name of the RemoteLogMetadata and looking it up in maps to 
find the corresponding serializer for that class.

This later requires an unchecked cast, because the RemoteLogMetadataTransform 
is generic. This is all type-unsafe, and can be replaced with type-safe 
if-elseif-else statements that may also be faster than the double-indirect map 
lookups.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16126) Kcontroller dynamic configurations may fail to apply at startup

2024-03-08 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya resolved KAFKA-16126.
--
Resolution: Fixed

> Kcontroller dynamic configurations may fail to apply at startup
> ---
>
> Key: KAFKA-16126
> URL: https://issues.apache.org/jira/browse/KAFKA-16126
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.6.2, 3.7.0
>
>
> Some kcontroller dynamic configurations may fail to apply at startup. This 
> happens because there is a race between registering the reconfigurables to 
> the DynamicBrokerConfig class, and receiving the first update from the 
> metadata publisher. We can fix this by registering the reconfigurables first. 
> This seems to have been introduced by the "MINOR: Install ControllerServer 
> metadata publishers sooner" change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16355) ConcurrentModificationException in InMemoryTimeOrderedKeyValueBuffer.evictWhile

2024-03-08 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-16355:
--

 Summary: ConcurrentModificationException in 
InMemoryTimeOrderedKeyValueBuffer.evictWhile
 Key: KAFKA-16355
 URL: https://issues.apache.org/jira/browse/KAFKA-16355
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.5.1
Reporter: Mickael Maison


While a Streams application was restoring its state after an outage, it hit the 
following:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=0_16, processor=KSTREAM-SOURCE-00, topic=, partition=16, 
offset=454875695, stacktrace=java.util.ConcurrentModificationException
at java.base/java.util.TreeMap$PrivateEntryIterator.remove(TreeMap.java:1507)
at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.evictWhile(InMemoryTimeOrderedKeyValueBuffer.java:423)
at 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.enforceConstraints(KTableSuppressProcessorSupplier.java:178)
at 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier$KTableSuppressProcessor.process(KTableSuppressProcessorSupplier.java:165)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$4(MeteredWindowStore.java:181)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:124)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:99)
at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:158)
at 
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:252)
at 
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:302)
at 
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:179)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:173)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:47)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$put$5(MeteredWindowStore.java:201)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:200)
at 
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$WindowStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:201)
at 
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:138)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
at 
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
at 
org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:159)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at 
org.apache.kafka.streams.

[VOTE] KIP-956: Tiered Storage Quotas

2024-03-08 Thread Abhijeet Kumar
Hi All,

I would like to start the vote for KIP-956 - Tiered Storage Quotas

The KIP is here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas

Regards.
Abhijeet.


Re: [DISCUSS] KIP-956: Tiered Storage Quotas

2024-03-08 Thread Abhijeet Kumar
Thank you all for your comments. As all the comments in the thread are
addressed, I am starting a Vote thread for the KIP. Please have a look.

Regards.

On Thu, Mar 7, 2024 at 12:34 PM Luke Chen  wrote:

> Hi Abhijeet,
>
> Thanks for the update and the explanation.
> I had another look, and it LGTM now!
>
> Thanks.
> Luke
>
> On Tue, Mar 5, 2024 at 2:50 AM Jun Rao  wrote:
>
> > Hi, Abhijeet,
> >
> > Thanks for the reply. Sounds good to me.
> >
> > Jun
> >
> >
> > On Sat, Mar 2, 2024 at 7:40 PM Abhijeet Kumar <
> abhijeet.cse@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks for pointing it out. It makes sense to me. We can have the
> > following
> > > metrics instead. What do you think?
> > >
> > >- remote-(fetch|copy)-throttle-time-avg (The average time in ms
> remote
> > >fetches/copies was throttled by a broker)
> > >- remote-(fetch|copy)-throttle-time--max (The maximum time in ms
> > remote
> > >fetches/copies was throttled by a broker)
> > >
> > > These are similar to fetch-throttle-time-avg and
> fetch-throttle-time-max
> > > metrics we have for Kafak Consumers?
> > > The Avg and Max are computed over the (sliding) window as defined by
> the
> > > configuration metrics.sample.window.ms on the server.
> > >
> > > (Also, I will update the config and metric names to be consistent)
> > >
> > > Regards.
> > >
> > > On Thu, Feb 29, 2024 at 2:51 AM Jun Rao 
> > wrote:
> > >
> > > > Hi, Abhijeet,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > The issue with recording the throttle time as a gauge is that it's
> > > > transient. If the metric is not read immediately, the recorded value
> > > could
> > > > be reset to 0. The admin won't realize that throttling has happened.
> > > >
> > > > For client quotas, the throttle time is tracked as the average
> > > > throttle-time per user/client-id. This makes the metric less
> transient.
> > > >
> > > > Also, the configs use read/write whereas the metrics use fetch/copy.
> > > Could
> > > > we make them consistent?
> > > >
> > > > Jun
> > > >
> > > > On Wed, Feb 28, 2024 at 6:49 AM Abhijeet Kumar <
> > > abhijeet.cse@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Clarified the meaning of the two metrics. Also updated the KIP.
> > > > >
> > > > > kafka.log.remote:type=RemoteLogManager,
> name=RemoteFetchThrottleTime
> > ->
> > > > The
> > > > > duration of time required at a given moment to bring the observed
> > fetch
> > > > > rate within the allowed limit, by preventing further reads.
> > > > > kafka.log.remote:type=RemoteLogManager, name=RemoteCopyThrottleTime
> > ->
> > > > The
> > > > > duration of time required at a given moment to bring the observed
> > > remote
> > > > > copy rate within the allowed limit, by preventing further copies.
> > > > >
> > > > > Regards.
> > > > >
> > > > > On Wed, Feb 28, 2024 at 12:28 AM Jun Rao  >
> > > > wrote:
> > > > >
> > > > > > Hi, Abhijeet,
> > > > > >
> > > > > > Thanks for the explanation. Makes sense to me now.
> > > > > >
> > > > > > Just a minor comment. Could you document the exact meaning of the
> > > > > following
> > > > > > two metrics? For example, is the time accumulated? If so, is it
> > from
> > > > the
> > > > > > start of the broker or over some window?
> > > > > >
> > > > > > kafka.log.remote:type=RemoteLogManager,
> > name=RemoteFetchThrottleTime
> > > > > > kafka.log.remote:type=RemoteLogManager,
> name=RemoteCopyThrottleTime
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Tue, Feb 27, 2024 at 1:39 AM Abhijeet Kumar <
> > > > > abhijeet.cse@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > The existing quota system for consumers is designed to throttle
> > the
> > > > > > > consumer if it exceeds the allowed fetch rate.
> > > > > > > The additional quota we want to add works on the broker level.
> If
> > > the
> > > > > > > broker-level remote read quota is being
> > > > > > > exceeded, we prevent additional reads from the remote storage
> but
> > > do
> > > > > not
> > > > > > > prevent local reads for the consumer.
> > > > > > > If the consumer has specified other partitions to read, which
> can
> > > be
> > > > > > served
> > > > > > > from local, it can continue to read those
> > > > > > > partitions. To elaborate more, we make a check for quota
> exceeded
> > > if
> > > > we
> > > > > > > know a segment needs to be read from
> > > > > > > remote. If the quota is exceeded, we simply skip the partition
> > and
> > > > move
> > > > > > to
> > > > > > > other segments in the fetch request.
> > > > > > > This way consumers can continue to read the local data as long
> as
> > > > they
> > > > > > have
> > > > > > > not exceeded the client-level quota.
> > > > > > >
> > > > > > > Also, when we choose the appropriate consumer-level quota, we
> > would
> > > > > > > typically look at what kind of local fetch
> > > > > > > throughput is supported. If we were to reuse the

Re: [DISCUSS] KIP-932: Queues for Kafka

2024-03-08 Thread Andrew Schofield
Hi Manikumar,
Thanks for your queries.

1) Delivery count is added to the ConsumerRecord class so that a consumer can
tell how often a record has been processed. I imagine that some applications 
might
want to take different actions based on whether a record has previously failed. 
This
enables richer error handling for bad records. In the future, I plan another 
KIP to
enhance error handling.

2) It is only possible to delete a share group which is empty. As a result, all
well-behaved consumers will have closed their share sessions. After a short 
while,
the share-partition leaders will discard the share-partition information from 
memory.

In the presence of badly behaved consumers, a consumer would have to pretend to
be a member of a share group. There are several cases:

a) If the share-partition leader still has in-memory state for the deleted 
share-group, it will
continue to fetch records but it will be fenced by the share coordinator when 
it attempts to
write its persistent state.

b) If the share-partition leader does not have in-memory state, it will attempt 
to read it
from the share coordinator and this will fail.

3) I will add metrics for the share coordinator today. This was an omission. 
Thanks for catching it.

Thanks,
Andrew

> On 6 Mar 2024, at 17:53, Manikumar  wrote:
> 
> Hi Andrew,
> 
> Thanks for the updated KIP. Few queries below:
> 
> 1. What is the use-case of deliveryCount in ShareFetchResponse?
> 2. During delete share groups, Do we need to clean any in-memory state from
> share-partition leaders?
> 3. Any metrics for the share-coordinator?
> 
> Thanks
> Manikumar
> 
> On Wed, Feb 21, 2024 at 12:11 AM Andrew Schofield <
> andrew_schofield_j...@outlook.com> wrote:
> 
>> Hi Manikumar,
>> Thanks for your comments.
>> 
>> 1. I believe that in general, there are not situations in which a dynamic
>> config
>> change is prevented because of the existence of a resource. So, if we
>> prevented
>> setting config `group.type=consumer` on resource G of GROUP type
>> if there was a share group G in existence, it would be a bit weird.
>> 
>> I wonder whether changing the config name to `new.group.type` would help.
>> It’s
>> ensuring the type of a new group created.
>> 
>> 2. The behaviour for a DEAD share group is intended to be the same as a
>> DEAD
>> consumer group. The group cannot be “reused” again as such, but the group
>> ID
>> can be used by a new group.
>> 
>> 3. Yes. AlterShareGroupOffsets will cause a new SHARE_CHECKPOINT.
>> 
>> 4. In common with Admin.deleteConsumerGroups, the underlying Kafka RPC
>> for Admin.deleteShareGroups is DeleteGroups. This is handled by the group
>> coordinator and it does this by writing control records (a tombstone in
>> this case).
>> The KIP doesn’t say anything about this because it’s the same as consumer
>> groups.
>> Perhaps it would be sensible to add a GroupType to DeleteGroupsRequest so
>> we can
>> make sure we are deleting the correct type of group. The fact that there
>> is not a specific
>> RPC for DeleteShareGroups seems correct to me.
>> 
>> 5. I prefer using “o.a.k.clients.consumer” because it’s already a public
>> package and
>> many of the classes and interfaces such as ConsumerRecord are in that
>> package.
>> 
>> I definitely need to add more information about how the Admin operations
>> work.
>> I will add a section to the KIP in the next version, later today. This
>> will fill in details for
>> your questions (3) and (4).
>> 
>> Thanks,
>> Andrew
>> 
>>> On 14 Feb 2024, at 18:04, Manikumar  wrote:
>>> 
>>> Hi Andrew,
>>> 
>>> Thanks for the KIP. A few comments below.
>>> 
>>> 1. kafka-configs.sh (incrementalAlterConfigs) allows you to dynamically
>>> change the configs. Maybe in this case, we should not allow the user to
>>> change `group.type` if it's already set.
>>> 2. What's the behaviour after a group transitions into DEAD state. Do we
>>> add new control records to reset the state? Can we reuse the group again?
>>> 3. Are we going to write new control records after the
>>> AlterShareGroupOffsets API to reset the state?
>>> 4. Is there any API for DeleteShareGroups? I assume, group co-ordinator
>> is
>>> going to handle the API. If so, Does this mean the group co-ordinator
>> also
>>> needs to write control records?
>>> 5. How about using "org.apache.kafka.clients.consumer.share" package for
>>> new interfaces/classes?
>>> 
>>> 
>>> Thanks,
>>> Manikumar
>> 
>>