[jira] [Assigned] (KAFKA-17109) Reduce log message load for failed locking
[ https://issues.apache.org/jira/browse/KAFKA-17109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danica Fine reassigned KAFKA-17109: --- Assignee: Danica Fine > Reduce log message load for failed locking > -- > > Key: KAFKA-17109 > URL: https://issues.apache.org/jira/browse/KAFKA-17109 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.8.0 >Reporter: Bruno Cadonna >Assignee: Danica Fine >Priority: Major > > The following exception with stack traces is logged many times when state > updater is enabled: > {code} > 01:08:03 INFO [KAFKA] TaskManager - stream-thread [acme-StreamThread-4] > Encountered lock exception. Reattempting locking the state in the next > iteration. > org.apache.kafka.streams.errors.LockException: stream-thread > [acme-StreamThread-4] standby-task [1_15] Failed to lock the state directory > for task 1_15 > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96) > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:114) > at > org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008) > > at > org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995) > > at > org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911) > > at > org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996) > > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711) > > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {code} > The exception is expected since it happens because a lock on the task state > directory is not yet been freed by a different stream thread on the same > Kafka Streams client after an assignment. But with the state updater > acquiring the lock is attempted in each poll iteration which is every 100 ms > by default. > One option to reduce the log messages is to reduce the rate at which a lock > is attempted to be acquired. The other is to reduce the logging. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-7371) Finally deprecate org.apache.kafka.common.serialization.ExtendedSerializer
[ https://issues.apache.org/jira/browse/KAFKA-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danica Fine reassigned KAFKA-7371: -- Assignee: Danica Fine > Finally deprecate org.apache.kafka.common.serialization.ExtendedSerializer > -- > > Key: KAFKA-7371 > URL: https://issues.apache.org/jira/browse/KAFKA-7371 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.0.0 >Reporter: Jouni >Assignee: Danica Fine >Priority: Major > Labels: beginner, needs-kip, newbie > > As mentioned in the javadocs "Once Kafka drops support for Java 7, the > serialize()} method introduced by this interface will be added to Serializer > with a default implementation so that backwards compatibility is maintained. > This interface may be deprecated once that happens.". Support for Java 7 was > already dropped in Kafka 2.0.0 but this hasn't yet happened. > The problem is that some out-of-project external serializers (for example, > org.springframework.kafka.support.serializer.JsonSerializer, quite commonly > used) already do add message headers when using producer API, one of those > being __TypeId__, which contains the java class name. But when using streams > DSL, there's no way to either access or modify those headers, and according > to KIP-244, they get just copied to the sink. Also, because in > RecordCollectorImpl.send there are calls > final byte[] keyBytes = keySerializer.serialize(topic, key); > final byte[] valBytes = valueSerializer.serialize(topic, value); > and not > final byte[] keyBytes = keySerializer.serialize(topic, headers, key); > final byte[] valBytes = valueSerializer.serialize(topic, headers, value); > which would be possible when the plain Serializer gets the default method > added. So, currently, there's no way for anyone to write a serializer that > modifies the headers if necessary when using streams. > In my case, the problem occurred when transforming an object from input > stream to a different type of object in output stream. Took a while to debug > where did those (wrong) headers come from, and either disable adding those > headers on producer side, or as I happened to be using Processor API, modify > myself the headers in ProcessorContext. > An unfortunate side-effect of two different projects making decisions > affecting each other. Not exactly a bug in either one, but a really big > nuisance to find out what's happening. I'd prefer things working as much as > possible mostly out-of-the-box. Ok, API changes must sometimes just be made. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13499) Avoid restoring outdated records
[ https://issues.apache.org/jira/browse/KAFKA-13499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danica Fine reassigned KAFKA-13499: --- Assignee: Danica Fine > Avoid restoring outdated records > > > Key: KAFKA-13499 > URL: https://issues.apache.org/jira/browse/KAFKA-13499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Danica Fine >Priority: Major > > Kafka Streams has the config `windowstore.changelog.additional.retention.ms` > to allow for an increase retention time. > While an increase retention time can be useful, it can also lead to > unnecessary restore cost, especially for stream-stream joins. Assume a > stream-stream join with 1h window size and a grace period of 1h. For this > case, we only need 2h of data to restore. If we lag, the > `windowstore.changelog.additional.retention.ms` helps to prevent the broker > from truncating data too early. However, if we don't lag and we need to > restore, we restore everything from the changelog. > Instead of doing a seek-to-beginning, we could use the timestamp index to > seek the first offset older than the 2h "window" of data that we need to > restore, to avoid unnecessary work. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14539) Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
[ https://issues.apache.org/jira/browse/KAFKA-14539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Danica Fine reassigned KAFKA-14539: --- Assignee: Danica Fine > Simplify StreamsMetadataState by replacing the Cluster metadata with > partition info map > --- > > Key: KAFKA-14539 > URL: https://issues.apache.org/jira/browse/KAFKA-14539 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Danica Fine >Priority: Major > > We can clean up the StreamsMetadataState class a bit by removing the > #onChange invocation that currently occurs within > StreamsPartitionAssignor#assign, which then lets us remove the `Cluster` > parameter in that callback. Instead of building a fake Cluster object from > the map of partition info when we invoke #onChange inside the > StreamsPartitionAssignor#onAssignment method, we can just directly pass in > the `Map` and replace the usage of `Cluster` > everywhere in StreamsMetadataState > (I believe the current system is a historical artifact from when we used to > require passing in a {{Cluster}} for the default partitioning strategy, which > the StreamMetadataState needs to compute the partition for a key. At some > point in the past we provided a better way to get the default partition, so > we no longer need a {{Cluster}} parameter/field at all) -- This message was sent by Atlassian Jira (v8.20.10#820010)