[jira] [Assigned] (KAFKA-17109) Reduce log message load for failed locking

2024-07-23 Thread Danica Fine (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danica Fine reassigned KAFKA-17109:
---

Assignee: Danica Fine

> Reduce log message load for failed locking
> --
>
> Key: KAFKA-17109
> URL: https://issues.apache.org/jira/browse/KAFKA-17109
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Bruno Cadonna
>Assignee: Danica Fine
>Priority: Major
>
> The following exception with stack traces is logged many times when state 
> updater is enabled:
> {code}
> 01:08:03 INFO  [KAFKA] TaskManager - stream-thread [acme-StreamThread-4] 
> Encountered lock exception. Reattempting locking the state in the next 
> iteration.
> org.apache.kafka.streams.errors.LockException: stream-thread 
> [acme-StreamThread-4] standby-task [1_15] Failed to lock the state directory 
> for task 1_15
>   at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)
>   at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:114)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  {code}
> The exception is expected since it happens because a lock on the task state 
> directory is not yet been freed by a different stream thread on the same 
> Kafka Streams client after an assignment. But with the state updater 
> acquiring the lock is attempted in each poll iteration which is every 100 ms 
> by default.
> One option to reduce the log messages is to reduce the rate at which a lock 
> is attempted to be acquired. The other is to reduce the logging.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-7371) Finally deprecate org.apache.kafka.common.serialization.ExtendedSerializer

2024-07-18 Thread Danica Fine (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danica Fine reassigned KAFKA-7371:
--

Assignee: Danica Fine

> Finally deprecate org.apache.kafka.common.serialization.ExtendedSerializer
> --
>
> Key: KAFKA-7371
> URL: https://issues.apache.org/jira/browse/KAFKA-7371
> Project: Kafka
>  Issue Type: Wish
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Jouni
>Assignee: Danica Fine
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> As mentioned in the javadocs "Once Kafka drops support for Java 7, the 
> serialize()} method introduced by this interface will be added to Serializer 
> with a default implementation so that backwards compatibility is maintained. 
> This interface may be deprecated once that happens.". Support for Java 7 was 
> already dropped in Kafka 2.0.0 but this hasn't yet happened.
> The problem is that some out-of-project external serializers (for example, 
> org.springframework.kafka.support.serializer.JsonSerializer, quite commonly 
> used) already do add message headers when using producer API, one of those 
> being __TypeId__, which contains the java class name. But when using streams 
> DSL, there's no way to either access or modify those headers, and according 
> to KIP-244, they get just copied to the sink. Also, because in 
> RecordCollectorImpl.send there are calls
> final byte[] keyBytes = keySerializer.serialize(topic, key);
> final byte[] valBytes = valueSerializer.serialize(topic, value);
> and not
> final byte[] keyBytes = keySerializer.serialize(topic, headers, key);
> final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
> which would be possible when the plain Serializer gets the default method 
> added. So, currently, there's no way for anyone to write a serializer that 
> modifies the headers if necessary when using streams.
> In my case, the problem occurred when transforming an object from input 
> stream to a different type of object in output stream. Took a while to debug 
> where did those (wrong) headers come from, and either disable adding those 
> headers on producer side, or as I happened to be using Processor API, modify 
> myself the headers in ProcessorContext.
> An unfortunate side-effect of two different projects making decisions 
> affecting each other. Not exactly a bug in either one, but a really big 
> nuisance to find out what's happening. I'd prefer things working as much as 
> possible mostly out-of-the-box. Ok, API changes must sometimes just be made.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13499) Avoid restoring outdated records

2023-05-24 Thread Danica Fine (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danica Fine reassigned KAFKA-13499:
---

Assignee: Danica Fine

> Avoid restoring outdated records
> 
>
> Key: KAFKA-13499
> URL: https://issues.apache.org/jira/browse/KAFKA-13499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Danica Fine
>Priority: Major
>
> Kafka Streams has the config `windowstore.changelog.additional.retention.ms` 
> to allow for an increase retention time.
> While an increase retention time can be useful, it can also lead to 
> unnecessary restore cost, especially for stream-stream joins. Assume a 
> stream-stream join with 1h window size and a grace period of 1h. For this 
> case, we only need 2h of data to restore. If we lag, the 
> `windowstore.changelog.additional.retention.ms` helps to prevent the broker 
> from truncating data too early. However, if we don't lag and we need to 
> restore, we restore everything from the changelog.
> Instead of doing a seek-to-beginning, we could use the timestamp index to 
> seek the first offset older than the 2h "window" of data that we need to 
> restore, to avoid unnecessary work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14539) Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

2023-05-23 Thread Danica Fine (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danica Fine reassigned KAFKA-14539:
---

Assignee: Danica Fine

> Simplify StreamsMetadataState by replacing the Cluster metadata with 
> partition info map
> ---
>
> Key: KAFKA-14539
> URL: https://issues.apache.org/jira/browse/KAFKA-14539
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Danica Fine
>Priority: Major
>
> We can clean up the StreamsMetadataState class a bit by removing the 
> #onChange invocation that currently occurs within 
> StreamsPartitionAssignor#assign, which then lets us remove the `Cluster` 
> parameter in that callback. Instead of building a fake Cluster object from 
> the map of partition info when we invoke #onChange inside the 
> StreamsPartitionAssignor#onAssignment method, we can just directly pass in 
> the  `Map` and replace the usage of `Cluster` 
> everywhere in StreamsMetadataState
> (I believe the current system is a historical artifact from when we used to 
> require passing in a {{Cluster}} for the default partitioning strategy, which 
> the StreamMetadataState needs to compute the partition for a key. At some 
> point in the past we provided a better way to get the default partition, so 
> we no longer need a {{Cluster}} parameter/field at all)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)