[jira] [Created] (KAFKA-9808) Refactor State Store Hierarchy
Richard Yu created KAFKA-9808: - Summary: Refactor State Store Hierarchy Key: KAFKA-9808 URL: https://issues.apache.org/jira/browse/KAFKA-9808 Project: Kafka Issue Type: Improvement Components: streams Reporter: Richard Yu Over years of development, Kafka contributors has been adding more and more state store classes on top of each other without too much regard to making it more approachable for future modifications. For instance, it has become increasingly difficult to add new API to state store classes while at the same time, preventing them from being exposed to users. In sum, the entire hierarchy is slowly spiraling out of control, and there is a growing need to consolidate the multiple state store types into a few more manageable ones for future Kafka developers. Note: There has already been a couple of attempts to simplify the state store hierarchy, but while the task isn't too complex, its just the enormous scope of the change which makes things difficult. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9733) Consider addition to Kafka's replication model
Richard Yu created KAFKA-9733: - Summary: Consider addition to Kafka's replication model Key: KAFKA-9733 URL: https://issues.apache.org/jira/browse/KAFKA-9733 Project: Kafka Issue Type: New Feature Components: clients, core Reporter: Richard Yu Note: Description still not finished. This feature I'm proposing might not offer too much of a performance boost, but I think it is still worth considering. In our current replication model, we have a single leader and several followers (with our ISR included). However, the current bottleneck would be that once the leader goes down, it will take a while to get the next leader online, which is a serious pain. (also leading to a considerable write/read delay) In order to help alleviate this issue, we can consider multiple clusters independent of each other i.e. each of them are their own leader/follower group for the _same partition set_. The difference here is that these clusters can _communicate_ between one another. At first, this might seem redundant, but there is a reasoning to this: # Let's say we have two leader/follower groups for the same replicated partition. # One leader goes down, and that means for the respective followers, they would under normal circumstances be unable to receive new write updates. # However, in this situation, we can have those followers poll their write/read requests from the other group whose leader has _not gone down._ It doesn't necessarily have to be the leader either, it can be other members from that group's ISR. # The idea here is that if the members of these two groups detect that they are lagging behind another, they would be able to poll one another for updates. So what is the difference here from just having multiple leaders in a single cluster? The answer is that the leader is responsible for making sure that there is consistency within _its own cluster._ Not the other cluster it is in communication with. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9285) Implement failed message topic to account for processing lag during failure
[ https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu resolved KAFKA-9285. --- Resolution: Fixed Already resolved by Kafka Connect. > Implement failed message topic to account for processing lag during failure > --- > > Key: KAFKA-9285 > URL: https://issues.apache.org/jira/browse/KAFKA-9285 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: Richard Yu >Assignee: Richard Yu >Priority: Major > Labels: kip > > Presently, in current Kafka failure schematics, when a consumer crashes, the > user is typically responsible for both detecting as well as restarting the > failed consumer. Therefore, during this period of time, when the consumer is > dead, it would result in a period of inactivity where no records are > consumed, hence lag results. Previously, there has been attempts to resolve > this problem: when failure is detected by broker, a substitute consumer will > be started (the so-called [Rebalance > Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) > which will continue processing records in Kafka's stead. > However, this has complications, as records will only be stored locally, and > in case of this consumer failing as well, that data will be lost. Instead, we > need to consider how we can still process these records and at the same time > effectively _persist_ them. It is here that I propose the concept of a > _failed message topic._ At a high level, it works like this. When we find > that a consumer has failed, messages which was originally meant to be sent to > that consumer would be redirected to this failed messaged topic. The user can > choose to assign consumers to this topic, which would consume messages (that > would've originally been processed by the failed consumers) from it. > Naturally, records from different topics can not go into the same failed > message topic, since we cannot tell which records belong to which consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9285) Implement failed message topic to account for processing lag during failure
Richard Yu created KAFKA-9285: - Summary: Implement failed message topic to account for processing lag during failure Key: KAFKA-9285 URL: https://issues.apache.org/jira/browse/KAFKA-9285 Project: Kafka Issue Type: New Feature Components: consumer Reporter: Richard Yu Presently, in current Kafka failure schematics, when a consumer crashes, the user is typically responsible for both detecting as well as restarting the failed consumer. Therefore, during this period of time, when the consumer is dead, it would result in a period of inactivity where no records are consumed, hence lag results. Previously, there has been attempts to resolve this problem: when failure is detected by broker, a substitute consumer will be started (the so-called [Rebalance Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]]) which will continue processing records in Kafka's stead. However, this has complications, as records will only be stored locally, and in case of this consumer failing as well, that data will be lost. Instead, we need to consider how we can still process these records and at the same time effectively _persist_ them. It is here that I propose the concept of a _failed message topic._ At a high level, it works like this. When we find that a consumer has failed, messages which was originally meant to be sent to that consumer would be redirected to this failed messaged topic. The user can choose to assign consumers to this topic, which would consume messages from failed consumers while other consumer threads are down. Naturally, records from different topics can not go into the same failed message topic, since we cannot tell which records belong to which consumer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8516) Consider allowing all replicas to have read/write permissions
Richard Yu created KAFKA-8516: - Summary: Consider allowing all replicas to have read/write permissions Key: KAFKA-8516 URL: https://issues.apache.org/jira/browse/KAFKA-8516 Project: Kafka Issue Type: Improvement Reporter: Richard Yu Currently, in Kafka internals, a leader is responsible for all the read and write operations requested by the user. This naturally incurs a bottleneck since one replica, as the leader, would experience a significantly heavier workload than other replicas and also means that all client commands must pass through a chokepoint. If a leader fails, all processing effectively comes to a halt until another leader election. In order to help solve this problem, we could think about redesigning Kafka core so that any replica is able to do read and write operations as well. That is, the system be changed so that _all_ replicas have read/write permissions. This has multiple positives. Notably the following: - Workload can be more evenly distributed since leader replicas are weighted more than follower replicas (in this new design, all partitions are equal) - Some failures would not be as catastrophic as in the leader-follower paradigm. There is no one single "leader". If one replica goes down, others are still able to read/write as needed. Processing could continue without interruption. The implementation for such a change like this will be very extensive and discussion would be needed to decide if such an improvement as described above would warrant such a drastic redesign of Kafka internals. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure
Richard Yu created KAFKA-8438: - Summary: Add API to allow user to define end behavior of consumer failure Key: KAFKA-8438 URL: https://issues.apache.org/jira/browse/KAFKA-8438 Project: Kafka Issue Type: New Feature Components: consumer Reporter: Richard Yu Recently, in a concerted effort to make Kafka's rebalances less painful, various approaches has been used to reduce the number of and impact of rebalances. Often, the trigger of a rebalance is a failure of some sort, in which case, the workload will be redistributed among surviving threads. Working to reduce rebalances due to random consumer crashes, a recent change to Kafka internals had been made (which introduces the concept of static membership) that prevents a rebalance from occurring within {{session.timeout.ms}} in the hope that the consumer thread which crashed would recover in that time interval. However, in some cases, some consumer threads would permanently go down or remain dead for long periods of time. In these scenarios, users of Kafka would possibly not be aware of such a crash until hours later after it happened which forces Kafka users to manually start a new KafkaConsumer process a considerable period of time after the failure had occurred. That is where the addition of a callback such as {{onConsumerFailure}} would help. There are multiple use cases for this callback (which is defined by the user). {{onConsumerFailure}} is called when a particular consumer thread goes under for some specified time interval (i.e. a config called {{acceeptable.consumer.failure.timeout.ms}}). When called, this method could be used to log a consumer failure or should the user wish it, create a new thread which would then rejoin the consumer group (which could also include the required {{group.instance.id}} so that a rebalance wouldn't be re-triggered). Should the old thread recover and attempt to rejoin the consumer group (with the substitute thread being part of the group), the old thread will be denied access and an exception would be thrown (to indicate that another process has already taken its place). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8434) Make global stream time consistent over all stream tasks
[ https://issues.apache.org/jira/browse/KAFKA-8434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu resolved KAFKA-8434. --- Resolution: Fixed > Make global stream time consistent over all stream tasks > > > Key: KAFKA-8434 > URL: https://issues.apache.org/jira/browse/KAFKA-8434 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Richard Yu >Priority: Major > Labels: kip, needs-discussion > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8434) Make global stream time consistent over all stream tasks
Richard Yu created KAFKA-8434: - Summary: Make global stream time consistent over all stream tasks Key: KAFKA-8434 URL: https://issues.apache.org/jira/browse/KAFKA-8434 Project: Kafka Issue Type: Improvement Reporter: Richard Yu -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8431) Add a onTimeoutExpired callback to Kafka Consumer
Richard Yu created KAFKA-8431: - Summary: Add a onTimeoutExpired callback to Kafka Consumer Key: KAFKA-8431 URL: https://issues.apache.org/jira/browse/KAFKA-8431 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Richard Yu Currently, after the changes introduced in KIP-266, many methods in Kafka Consumer have a bounded execution time given by a user specified {{Duration}} parameter. However, in some cases, some methods could not perform their operations in the allocated timeout. In this case, the user might wish to have a {{onTimeoutExpired}} callback which would be called should a blocking method timeout before any results could be returned. The user can implement something like described above, but Kafka can spare the user the necessity of coding such a feature if we can support one by itself. One possible use of this callback is to retry the method (e.g. the {{onTimeoutExpired}} callback triggers another call to the same method after some allocated time). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8388) Add methods to query for entries in KTable using timestamp
Richard Yu created KAFKA-8388: - Summary: Add methods to query for entries in KTable using timestamp Key: KAFKA-8388 URL: https://issues.apache.org/jira/browse/KAFKA-8388 Project: Kafka Issue Type: Improvement Components: streams Reporter: Richard Yu -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8020) Consider making ThreadCache a time-aware LRU Cache
Richard Yu created KAFKA-8020: - Summary: Consider making ThreadCache a time-aware LRU Cache Key: KAFKA-8020 URL: https://issues.apache.org/jira/browse/KAFKA-8020 Project: Kafka Issue Type: Improvement Components: streams Reporter: Richard Yu Currently, in Kafka Streams, ThreadCache is used to store {{InternalProcessorContext}}s. Typically, these entries apply for only a limited time span. For example, in {{CachingWindowStore}}, a window is of fixed size. After it expires, it would no longer be queried for, but it potentially could stay in the ThreadCache for an unnecessary amount of time if it is not evicted (i.e. the number of entries being inserted is few). For better allocation of memory, it would be better if we implement a time-aware LRU Cache which takes into account the lifespan of an entry and removes it once it has expired. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7850) Remove deprecated KStreamTestDriver
Richard Yu created KAFKA-7850: - Summary: Remove deprecated KStreamTestDriver Key: KAFKA-7850 URL: https://issues.apache.org/jira/browse/KAFKA-7850 Project: Kafka Issue Type: Improvement Components: unit tests Reporter: Richard Yu Assignee: Richard Yu Eversince a series of new test improvements were made to KafkaStreams, KStreamTestDriver was deprecated in favor of TopologyTestDriver. However, a couple existing unit tests continues to use KStreamTestDriver. We wish to migrate all remaining classes to TopologyTestDriver so we could remove the deprecated class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7132) Consider adding multithreaded form of recovery
Richard Yu created KAFKA-7132: - Summary: Consider adding multithreaded form of recovery Key: KAFKA-7132 URL: https://issues.apache.org/jira/browse/KAFKA-7132 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Richard Yu Currently, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When it recovers at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 70). This is because it restarted at 70, forcing it to reprocess old data. To avoid this from happening, one option would be to allow the current consumer to start processing not from the last checkpointed offset (which is 70 in the example), but from 120 where it recovers. Meanwhile, a new KafkaConsumer will be instantiated and start reading from offset 70 in concurrency with the old process, and will be terminated once it reaches 120. In this manner, a considerable amount of lag can be avoided, particularly since the old consumer could proceed as if nothing had happened. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7127) Add asynchronous support for methods in KafkaConsumer
Richard Yu created KAFKA-7127: - Summary: Add asynchronous support for methods in KafkaConsumer Key: KAFKA-7127 URL: https://issues.apache.org/jira/browse/KAFKA-7127 Project: Kafka Issue Type: Wish Components: clients Reporter: Richard Yu Currently, in KafkaConsumer, various methods blocks due to a remote callback. It would be nice if we also added asynchronous version of these methods, like what was done with {{poll()}} in KIP-266 and {{commitAsync()}} which uses a {{OffsetCommitCallback}} as an input argument. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7118) Currently, it was discovered that KafkaConsumer's close() method might not be multi-thread safe when multiple cores are calling the same consumer.
Richard Yu created KAFKA-7118: - Summary: Currently, it was discovered that KafkaConsumer's close() method might not be multi-thread safe when multiple cores are calling the same consumer. Key: KAFKA-7118 URL: https://issues.apache.org/jira/browse/KAFKA-7118 Project: Kafka Issue Type: Bug Reporter: Richard Yu -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6608) Add timeout parameter to methods which fetches and resets offsets
Richard Yu created KAFKA-6608: - Summary: Add timeout parameter to methods which fetches and resets offsets Key: KAFKA-6608 URL: https://issues.apache.org/jira/browse/KAFKA-6608 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Richard Yu In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's {{timeout}} being set to {{Long.MAX_VALUE}}. While fixing this issue, it was pointed out that if a timeout was added to methods which updates and fetches offset positions, a tighter control on time could be achieved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6583) Metadata should include number of state stores for task
Richard Yu created KAFKA-6583: - Summary: Metadata should include number of state stores for task Key: KAFKA-6583 URL: https://issues.apache.org/jira/browse/KAFKA-6583 Project: Kafka Issue Type: Improvement Reporter: Richard Yu Currently, in the need for clients to be more evenly balanced, stateful tasks should be distributed in such a manner that it will be spread equally. However, for such an awareness to be implemented during task assignment, it would require the need for the present rebalance protocol metadata to also contain the number of state stores in a particular task. This way, it will allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)