Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #1980

2023-07-07 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 385797 lines...]
Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerLeft[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterInner[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterInner[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterOuter[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testOuterOuter[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithRightVersionedOnly[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithRightVersionedOnly[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testLeftWithLeftVersionedOnly[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testLeftWithLeftVersionedOnly[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithLeftVersionedOnly[caching enabled = false] STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TableTableJoinIntegrationTest > [caching enabled = false] > 
testInnerWithLeftVersionedOnly[caching enabled = false] PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TaskAssignorIntegrationTest > shouldProperlyConfigureTheAssignor STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TaskAssignorIntegrationTest > shouldProperlyConfigureTheAssignor PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TaskMetadataIntegrationTest > shouldReportCorrectEndOffsetInformation STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TaskMetadataIntegrationTest > shouldReportCorrectEndOffsetInformation PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TaskMetadataIntegrationTest > shouldReportCorrectCommittedOffsetInformation 
STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
TaskMetadataIntegrationTest > shouldReportCorrectCommittedOffsetInformation 
PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
EmitOnChangeIntegrationTest > shouldEmitSameRecordAfterFailover() STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
EmitOnChangeIntegrationTest > shouldEmitSameRecordAfterFailover() PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndPersistentStores(TestInfo) STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndPersistentStores(TestInfo) PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndInMemoryStores(TestInfo) STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
HighAvailabilityTaskAssignorIntegrationTest > 
shouldScaleOutWithWarmupTasksAndInMemoryStores(TestInfo) PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
KStreamAggregationDedupIntegrationTest > shouldReduce(TestInfo) STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
KStreamAggregationDedupIntegrationTest > shouldReduce(TestInfo) PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
KStreamAggregationDedupIntegrationTest > shouldGroupByKey(TestInfo) STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
KStreamAggregationDedupIntegrationTest > shouldGroupByKey(TestInfo) PASSED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 
KStreamAggregationDedupIntegrationTest > shouldReduceWindowed(TestInfo) STARTED

Gradle Test Run :streams:integrationTest > Gradle Test Executor 183 > 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1979

2023-07-07 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-15164) Extract reusable logic from OffsetsForLeaderEpochClient

2023-07-07 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15164:
--

 Summary: Extract reusable logic from OffsetsForLeaderEpochClient
 Key: KAFKA-15164
 URL: https://issues.apache.org/jira/browse/KAFKA-15164
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans


The OffsetsForLeaderEpochClient class is used for making asynchronous requests 
to the OffsetsForLeaderEpoch API. It encapsulates the logic for:
 * preparing the requests
 * sending them over the network using the network client
 * handling the response

The new KafkaConsumer implementation, based on a new threading model, requires 
the same logic for preparing the requests and handling the responses, with 
different behaviour for how the request is actually sent.

This task includes refactoring OffsetsForLeaderEpochClient by extracting out 
the logic for preparing the requests and handling the responses. No changes in 
the existing logic, just making the functionality available to be reused.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer

2023-07-07 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15163:
--

 Summary: Implement validatePositions functionality for new 
KafkaConsumer
 Key: KAFKA-15163
 URL: https://issues.apache.org/jira/browse/KAFKA-15163
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans


Introduce support for resetting positions in the new ListOffsetsRequestManager. 
This task will include a new event for the resetPositions calls performed from 
the new consumer, and the logic for handling such events in the 
ListOffsetRequestManager.

The reset positions implementation will keep the same behaviour as the one in 
the old consumer, but adapted to the new threading model. So it is based in a 
RESET_POSITIONS events that is submitted to the background thread, and the 
processed by the ApplicationEventProcessor. The processing itself is done by 
the ListOffsetRequestManager given that this will require a LIST_OFFSETS 
request for the partitions awaiting reset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15162) Reflective plugin scanning misses plugins which are in parent classloaders but not classpath

2023-07-07 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15162:
---

 Summary: Reflective plugin scanning misses plugins which are in 
parent classloaders but not classpath
 Key: KAFKA-15162
 URL: https://issues.apache.org/jira/browse/KAFKA-15162
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Greg Harris
Assignee: Greg Harris
 Fix For: 3.6.0


When the Plugins class is instantiated with a parent classloader other than the 
system classloader, as in these tests: 
[https://github.com/apache/kafka/blob/fd5b300b573dc41b94c710a791f9ee6f568992d4/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java#L493-L501]

Plugins which are present in that parent classloader but not in the app 
classloader are not visible. This is because the classes and resources become 
visible, but the set of URLs used for reflective scanning only includes the 
classpath jars: 
[https://github.com/apache/kafka/blob/fd5b300b573dc41b94c710a791f9ee6f568992d4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java#L353]
 

This causes a discrepancy between the Reflection and ServiceLoading scanners, 
because the Reflection scanner relies on these URLs but the ServiceLoader 
relies on the classes and resources instead.

Either the tests should be rewritten to accommodate this difference in 
behavior, or the PluginUtils.pluginSources should be amended to make a 
best-effort evaluation of the parent classloader URLs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.5 #30

2023-07-07 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1978

2023-07-07 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-944 Support async runtimes in consumer, votes needed!

2023-07-07 Thread Colin McCabe
Hi Erik,

It's not clear ot me that it's safe to access the Kafka consumer or producer 
concurrently from different threads. There are data structures that aren't 
protected by locks, so I wouldn't necessarily expect accessing and mutating 
them in a concurrent way to work. This is true even if the accesses happen at 
different times, because modern CPUs require memory barriers to guarantee 
inter-thread visibilty of loads and stores.

I am writing this is without doing a detailed dive into the code (I haven't 
been into the consumer / producer code in a bit.) Someone who has worked more 
on the consumer recently might be able to give specific examples of things that 
wouldn't work.

I know that there are at least a few locks in the consumer code now, due to our 
need to send heartbeats from a worker thread. I don't think those would be 
sufficient to protect a client that is making calls from random threads.

There has been some discussion of moving to a more traditional model where 
people make calls to the client and the clients passes the given data to a 
single background worker thread. This would avoid a lot lof the footguns of the 
current model and probably better reflect how people actually use the client.

Another issue is that neither the producer nor the consumer is fully 
nonblocking. There are some corner cases where we do in fact block. From 
memory, the producer blocks in some "buffer full" cases, and the consumer 
blocks sometimes when fetching metadata.

I suspect it would be more appropriate for Kotlin coroutines, Zio coroutines 
and so on to adopt this "pass messages to and from a background worker thread" 
model  than to try to re-engineer the Kafka client ot work from random threads.

There is actually somed good  advice about how to handle multiple threads in 
the KafkaConsumer.java header file itself. Check the sections  "One Consumer 
Per Thread" and "Decouple Consumption and Processing." What I'm recommending 
here is essentially the latter.

I do understand that it's frustrating to not get a quick response. However, 
overall I think this one needs a lot more discussion before getting anywhere 
near a vote. I will leave a -1 just as a procedural step. Maybe some of the 
people working in the client area can also chime in.

best,
Colin


On Thu, Jul 6, 2023, at 12:02, Erik van Oosten wrote:
> Dear PMCs,
>
> So far there have been 0 responses to KIP-944. I understand this may not 
> be something that keeps you busy, but this KIP is important to people 
> that use async runtimes like Zio, Cats and Kotlin.
>
> Is there anything you need to come to a decision?
>
> Kind regards,
>      Erik.
>
>
> Op 05-07-2023 om 11:38 schreef Erik van Oosten:
>> Hello all,
>>
>> I'd like to call a vote on KIP-944 Support async runtimes in consumer. 
>> It has has been 'under discussion' for 7 days now. 'Under discussion' 
>> between quotes, because there were 0 comments so far. I hope the KIP 
>> is clear!
>>
>> KIP description: https://cwiki.apache.org/confluence/x/chw0Dw
>>
>> Kind regards,
>>     Erik.
>>
> -- 
> Erik van Oosten
> e.vanoos...@grons.nl
> https://day-to-day-stuff.blogspot.com


[GitHub] [kafka-site] stevenbooke commented on pull request #521: KAFKA-14995: Automate asf.yaml collaborators refresh

2023-07-07 Thread via GitHub


stevenbooke commented on PR #521:
URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1625706492

   @mimaison Resolved merge conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1977

2023-07-07 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-15149) Fix not sending UMR and LISR RPCs in dual-write mode when there are new partitions

2023-07-07 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant resolved KAFKA-15149.
--
Resolution: Fixed

> Fix not sending UMR and LISR RPCs in dual-write mode when there are new 
> partitions
> --
>
> Key: KAFKA-15149
> URL: https://issues.apache.org/jira/browse/KAFKA-15149
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
> Fix For: 3.5.1
>
>
> In AK in {{KRaftMigrationZkWriter}} 
> [here|https://github.com/apache/kafka/blame/trunk/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java#L294]
>  we keep references to both the new and changed partitions maps from the 
> {{TopicsDelta}} instance. We mutate {{changedPartitions}} resulting in 
> possibly mutating the {{TopicsDelta}} instance that is provided as input to 
> the method. After making the ZK writes when we try and figure out the UMR and 
> LISR requests we need to make in 
> {{MigrationPropagator.sendRPCsToBrokersFromMetadataDelta}} the 
> {{TopicsDelta}} has lost the changed partitions metadata. As a result, we 
> might not send the expected UMR and LISR requests. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] KIP-949: Add flag to enable the usage of topic separator in MM2 DefaultReplicationPolicy

2023-07-07 Thread Omnia Ibrahim
Hi everyone,
I want to start the discussion of the KIP-949. The proposal is here
https://cwiki.apache.org/confluence/display/KAFKA/KIP-949%3A+Add+flag+to+enable+the+usage+of+topic+separator+in+MM2+DefaultReplicationPolicy


Thanks for your time and feedback.
Omnia


[GitHub] [kafka-site] mimaison commented on pull request #521: KAFKA-14995: Automate asf.yaml collaborators refresh

2023-07-07 Thread via GitHub


mimaison commented on PR #521:
URL: https://github.com/apache/kafka-site/pull/521#issuecomment-1625421768

   @stevenbooke Your branch has conflicts with `asf-site`. Can you resolve 
them? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15161) InvalidReplicationFactorException at connect startup

2023-07-07 Thread Viktor Somogyi-Vass (Jira)
Viktor Somogyi-Vass created KAFKA-15161:
---

 Summary: InvalidReplicationFactorException at connect startup
 Key: KAFKA-15161
 URL: https://issues.apache.org/jira/browse/KAFKA-15161
 Project: Kafka
  Issue Type: Improvement
  Components: clients, KafkaConnect
Affects Versions: 3.6.0
Reporter: Viktor Somogyi-Vass


.h2 Problem description

In our system test environment in certain cases due to a very specific timing 
issue Connect may fail to start up. the problem lies in the very specific 
timing of a Kafka cluster and connect start/restart. In these cases while the 
broker doesn't have metadata and a consumer in connect starts and asks for 
topic metadata, it returns the following exception and fails:
{noformat}
[2023-07-07 13:56:47,994] ERROR [Worker clientId=connect-1, 
groupId=connect-cluster] Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.common.KafkaException: Unexpected error fetching metadata for 
topic connect-offsets
at 
org.apache.kafka.clients.consumer.internals.TopicMetadataFetcher.getTopicMetadata(TopicMetadataFetcher.java:130)
at 
org.apache.kafka.clients.consumer.internals.TopicMetadataFetcher.getTopicMetadata(TopicMetadataFetcher.java:66)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:2001)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1969)
at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:251)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:230)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:151)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:363)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: 
Replication factor is below 1 or larger than the number of available brokers.
{noformat}

Due to this error the connect node stops and it has to be manually restarted 
(and ofc it fails the test scenarios as well).

.h2 Reproduction

In my test scenario I had:
- 1 broker
- 1 connect distributed node
- I also had a patch that I applied on the broker to make sure we don't have 
metadata

Steps to repro:
# start up a zookeeper based broker without the patch
# put a breakpoint here: 
https://github.com/apache/kafka/blob/1d8b07ed6435568d3daf514c2d902107436d2ac8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataFetcher.java#L94
# start up a distributed connect node
# restart the kafka broker with the patch to make sure there is no metadata
# once the broker is started, release the debugger in connect

It should run into the error cited above and shut down.

This is not desirable, the connect cluster should retry to ensure its 
continuous operation or the broker should handle this case somehow differently, 
for instance by returning a RetriableException.

The earliest I've tried this is 2.8 but I think this affects versions before 
that as well (and after).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #1976

2023-07-07 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-932: Queues for Kafka

2023-07-07 Thread Dániel Urbán
Hi Matthias,
Can you please elaborate on this: "First, you need to understand that
aborted records are filtered client side, and thus for "read-committed" we
can never read beyond the LSO, and the same seems to apply for queuing."
I don't understand the connection here - what does skipping aborted records
have to do with the LSO? As you said, aborted message filtering is done on
the client side (in consumers, yes, but not sure if it has to be the same
for queues), but being blocked on the LSO is the responsibility of the
broker, isn't it? My thought was that the broker could act differently when
working with queues and read_committed isolation.
Thanks,
Daniel

On Thu, Jul 6, 2023 at 7:26 PM Matthias J. Sax  wrote:

> Thanks for the KIP.
>
> It seems we are in very early stage, and some very important sections in
> the KIP are still marked as TODO. In particular, I am curious about the
> protocol changes, how the "queuing state" will be represented and made
> durable, and all the error edge case / fail-over / fencing
> (broker/clients) that we need to put in place.
>
>
> A few other comments/question from my side:
>
> (1) Fetch from follower: this was already touched on, but the point is
> really that the consumer does not decide about it, but the broker does.
> When a consumer sends it's first fetch request it will always go to the
> leader, and the broker would reply to the consumer "go and fetch from
> this other broker". -- I think it's ok to exclude fetch from follower in
> the first version of the KIP, but it would need a broker change such
> that the broker knows it's a "queue fetch" request. -- It would also be
> worth to explore how fetch from follow could work in the future and
> ensure that our initial design allows for it and is future proof.
>
>
> (2) Why do we not allow pattern subscription and what happens if
> different consumers subscribe to different topics? It's not fully
> explained in the KIP.
>
>
> (3) auto.offset.reset and SPSO/SPSE -- I don't understand why we would
> not allow auto.offset.reset? In the discussion, you mentioned that
> "first consumer would win, if two consumers have a different config" --
> while this is correct, it's the same for a consumer group right now.
> Maybe we should not try to solve a "non problem"? -- In general, my
> impression is that we are going to do Kafkaeque Queuing, what is fine,
> but it might be to our advantage to carry over as many established
> concepts as we can? And if not, have a very good reason not to.
>
> In the end, it find if very clumsy to only have an admin API to change
> the starting point of a consumer.
>
> (3B) What happens if lag grows and data is purged broker side?
>
> (3C) What happens if the broker released records (based on "timeout /
> exceeding deliver count), and the "ack/reject" comes afterwards?
>
> (3D) How to find out what records got archived but where not acked (ie,
> lost) for re-processing/debugging purpose? The question was already
> asked and the answer was "not supported", but I think it would be
> must-have before the feature is usable in production? We can of course
> also only do it in a future release and not the first "MVP"
> implementation, but the KIP should address it. In the end, the overall
> group monitoring story is missing.
>
>
> (4) I am also wondering about the overall design with regard to "per
> record" vs "per batch" granularity. In the end, queuing usually aims for
> "per records" semantics, but "per record" implies to keep track of a lot
> of metadata. Kafka is designed on a "per batch" granularity, and it's
> unclear to me how both will go together?
>
> (4A) Do we keep "ack/reject/..." state per-record, or per batch? It
> seems per record, but it would require to hold a lot of meta-data. Also,
> how does it work for the current protocol, is a batch is partially acked
> and we need to re-deliver? Would we add metadata and the let client
> filter acked messages (similar to how "read-committed" mode works)?
>
> (4B) What does "the share-partition leader prefers to return complete
>   record batches." exactly mean? "Prefers" is a fuzzy word. What happens
> if we cannot return a complete record batch?
>
> (4C) What happens if different consumer of the same group configure
> different batch sizes for fetching records? How do we track the
> corresponding meta-data?
>
> (4D)
>
> > In the situation where some records in a batch have been released or
> rejected separately, subsequent fetches of those records are more likely to
> have gaps.
>
> What does this mean?
>
> (4E)
>
> > For efficiency, the consumer preferentially returns complete record sets
> with no gaps
>
> Can you elaborate on the details?
>
>
> API contract:
>
> (5A)
> > acks must be issued in the order in which the records appear
>
> Why is this the case? Sounds like an arbitrary restriction to me? Can
> you share your reasoning?
>
>
> (5B) How to "reject" (or just "release") all records of a batch at once?
> It seem the API only 

[jira] [Created] (KAFKA-15160) Message bytes duplication in Kafka headers when compression is enabled

2023-07-07 Thread Vikash Mishra (Jira)
Vikash Mishra created KAFKA-15160:
-

 Summary: Message bytes duplication in Kafka headers when 
compression is enabled
 Key: KAFKA-15160
 URL: https://issues.apache.org/jira/browse/KAFKA-15160
 Project: Kafka
  Issue Type: Bug
  Components: clients, compression, consumer
Affects Versions: 3.3.2, 3.2.3
Reporter: Vikash Mishra
 Attachments: java heap dump.png, wireshark-min.png

I created a spring Kafka consumer using @KafkaListener.
During this, I encounter a scenario where when data is compressed ( any 
compression snappy/gzip) and consumed by the consumer then I see that in a heap 
dump, there is a " byte" occupying the same amount of memory as in Message 
value.

This behavior is seen only in cases when compressed data is consumed by 
consumers not in the case of uncompressed data.

Tried to capture Kafka's message through Wireshark, there it shows the proper 
size of data incoming from Kafka server & no extra bytes in headers. So, this 
is definitely something in Kafka client. Spring doesn't do any actions about 
compression; the whole functionality is done internally in the Kafka client 
library.

Attached is the screenshot of the heap dump and Wireshark.

This seems like a critical issue as message size in memory almost gets doubles 
impacting consumer memory and performance. Somewhere it feels like the actual 
message value is copied to headers?

*To Reproduce*
 # Produce compressed data on any topic.
 # Create a simple consumer consuming from the above-created topic.
 # Capture heap dump.

*Expected behavior*

Headers should not show bytes consuming memory equivalent to value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)