[jira] [Created] (KAFKA-9808) Refactor State Store Hierarchy

2020-04-02 Thread Richard Yu (Jira)
Richard Yu created KAFKA-9808:
-

 Summary: Refactor State Store Hierarchy
 Key: KAFKA-9808
 URL: https://issues.apache.org/jira/browse/KAFKA-9808
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Richard Yu


Over years of development, Kafka contributors has been adding more and more 
state store classes on top of each other without too much regard to making it 
more approachable for future modifications. For instance, it has become 
increasingly difficult to add new API to state store classes while at the same 
time, preventing them from being exposed to users. 

In sum, the entire hierarchy is slowly spiraling out of control, and there is a 
growing need to consolidate the multiple state store types into a few more 
manageable ones for future Kafka developers. 

Note: There has already been a couple of attempts to simplify the state store 
hierarchy, but while the task isn't too complex, its just the enormous scope of 
the change which makes things difficult.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9733) Consider addition to Kafka's replication model

2020-03-18 Thread Richard Yu (Jira)
Richard Yu created KAFKA-9733:
-

 Summary: Consider addition to Kafka's replication model
 Key: KAFKA-9733
 URL: https://issues.apache.org/jira/browse/KAFKA-9733
 Project: Kafka
  Issue Type: New Feature
  Components: clients, core
Reporter: Richard Yu


Note: Description still not finished.

This feature I'm proposing might not offer too much of a performance boost, but 
I think it is still worth considering. In our current replication model, we 
have a single leader and several followers (with our ISR included). However, 
the current bottleneck would be that once the leader goes down, it will take a 
while to get the next leader online, which is a serious pain. (also leading to 
a considerable write/read delay)

In order to help alleviate this issue, we can consider multiple clusters 
independent of each other i.e. each of them are their own leader/follower group 
for the _same partition set_. The difference here is that these clusters can 
_communicate_ between one another. 

At first, this might seem redundant, but there is a reasoning to this:
 # Let's say we have two leader/follower groups for the same replicated 
partition.
 # One leader goes down, and that means for the respective followers, they 
would under normal circumstances be unable to receive new write updates.
 # However, in this situation, we can have those followers poll their 
write/read requests from the other group whose leader has _not gone down._ It 
doesn't necessarily have to be  the leader either, it can be other members from 
that group's ISR. 
 # The idea here is that if the members of these two groups detect that they 
are lagging behind another, they would be able to poll one another for updates.

So what is the difference here from just having multiple leaders in a single 
cluster?

The answer is that the leader is responsible for making sure that there is 
consistency within _its own cluster._ Not the other cluster it is in 
communication with.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9285) Implement failed message topic to account for processing lag during failure

2019-12-07 Thread Richard Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu resolved KAFKA-9285.
---
Resolution: Fixed

Already resolved by Kafka Connect.

> Implement failed message topic to account for processing lag during failure
> ---
>
> Key: KAFKA-9285
> URL: https://issues.apache.org/jira/browse/KAFKA-9285
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Richard Yu
>Assignee: Richard Yu
>Priority: Major
>  Labels: kip
>
> Presently, in current Kafka failure schematics, when a consumer crashes, the 
> user is typically responsible for both detecting as well as restarting the 
> failed consumer. Therefore, during this period of time, when the consumer is 
> dead, it would result in a period of inactivity where no records are 
> consumed, hence lag results. Previously, there has been attempts to resolve 
> this problem: when failure is detected by broker, a substitute consumer will 
> be started (the so-called [Rebalance 
> Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
>  which will continue processing records in Kafka's stead. 
> However, this has complications, as records will only be stored locally, and 
> in case of this consumer failing as well, that data will be lost. Instead, we 
> need to consider how we can still process these records and at the same time 
> effectively _persist_ them. It is here that I propose the concept of a 
> _failed message topic._ At a high level, it works like this. When we find 
> that a consumer has failed, messages which was originally meant to be sent to 
> that consumer would be redirected to this failed messaged topic. The user can 
> choose to assign consumers to this topic, which would consume messages (that 
> would've originally been processed by the failed consumers) from it.
> Naturally, records from different topics can not go into the same failed 
> message topic, since we cannot tell which records belong to which consumer.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9285) Implement failed message topic to account for processing lag during failure

2019-12-06 Thread Richard Yu (Jira)
Richard Yu created KAFKA-9285:
-

 Summary: Implement failed message topic to account for processing 
lag during failure
 Key: KAFKA-9285
 URL: https://issues.apache.org/jira/browse/KAFKA-9285
 Project: Kafka
  Issue Type: New Feature
  Components: consumer
Reporter: Richard Yu


Presently, in current Kafka failure schematics, when a consumer crashes, the 
user is typically responsible for both detecting as well as restarting the 
failed consumer. Therefore, during this period of time, when the consumer is 
dead, it would result in a period of inactivity where no records are consumed, 
hence lag results. Previously, there has been attempts to resolve this problem: 
when failure is detected by broker, a substitute consumer will be started (the 
so-called [Rebalance 
Consumer|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-333%3A+Add+faster+mode+of+rebalancing]])
 which will continue processing records in Kafka's stead. 

However, this has complications, as records will only be stored locally, and in 
case of this consumer failing as well, that data will be lost. Instead, we need 
to consider how we can still process these records and at the same time 
effectively _persist_ them. It is here that I propose the concept of a _failed 
message topic._ At a high level, it works like this. When we find that a 
consumer has failed, messages which was originally meant to be sent to that 
consumer would be redirected to this failed messaged topic. The user can choose 
to assign consumers to this topic, which would consume messages from failed 
consumers while other consumer threads are down. 

Naturally, records from different topics can not go into the same failed 
message topic, since we cannot tell which records belong to which consumer.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8516) Consider allowing all replicas to have read/write permissions

2019-06-10 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-8516:
-

 Summary: Consider allowing all replicas to have read/write 
permissions
 Key: KAFKA-8516
 URL: https://issues.apache.org/jira/browse/KAFKA-8516
 Project: Kafka
  Issue Type: Improvement
Reporter: Richard Yu


Currently, in Kafka internals, a leader is responsible for all the read and 
write operations requested by the user. This naturally incurs a bottleneck 
since one replica, as the leader, would experience a significantly heavier 
workload than other replicas and also means that all client commands must pass 
through a chokepoint. If a leader fails, all processing effectively comes to a 
halt until another leader election. In order to help solve this problem, we 
could think about redesigning Kafka core so that any replica is able to do read 
and write operations as well. That is, the system be changed so that _all_ 
replicas have read/write permissions.
 
This has multiple positives. Notably the following:
- Workload can be more evenly distributed since leader replicas are weighted 
more than follower replicas (in this new design, all partitions are equal)
- Some failures would not be as catastrophic as in the leader-follower 
paradigm. There is no one single "leader". If one replica goes down, others are 
still able to read/write as needed. Processing could continue without 
interruption.
 
The implementation for such a change like this will be very extensive and 
discussion would be needed to decide if such an improvement as described above 
would warrant such a drastic redesign of Kafka internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8438) Add API to allow user to define end behavior of consumer failure

2019-05-27 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-8438:
-

 Summary: Add API to allow user to define end behavior of consumer 
failure
 Key: KAFKA-8438
 URL: https://issues.apache.org/jira/browse/KAFKA-8438
 Project: Kafka
  Issue Type: New Feature
  Components: consumer
Reporter: Richard Yu


Recently, in a concerted effort to make Kafka's rebalances less painful, 
various approaches has been used to reduce the number of and impact of 
rebalances. Often, the trigger of a rebalance is a failure of some sort, in 
which case, the workload will be redistributed among surviving threads. Working 
to reduce rebalances due to random consumer crashes, a recent change to Kafka 
internals had been made (which introduces the concept of static membership) 
that prevents a rebalance from occurring within {{session.timeout.ms}} in the 
hope that the consumer thread which crashed would recover in that time interval.

However, in some cases, some consumer threads would permanently go down or 
remain dead for long periods of time. In these scenarios, users of Kafka would 
possibly not be aware of such a crash until hours later after it happened which 
forces Kafka users to manually start a new KafkaConsumer process a considerable 
period of time after the failure had occurred. That is where the addition of a 
callback such as {{onConsumerFailure}} would help. There are multiple use cases 
for this callback (which is defined by the user). {{onConsumerFailure}} is 
called when a particular consumer thread goes under for some specified time 
interval (i.e. a config called {{acceeptable.consumer.failure.timeout.ms}}). 
When called, this method could be used to log a consumer failure or should the 
user wish it, create a new thread which would then rejoin the consumer group 
(which could also include the required {{group.instance.id}} so that a 
rebalance wouldn't be re-triggered). 

Should the old thread recover and attempt to rejoin the consumer group (with 
the substitute thread being part of the group), the old thread will be denied 
access and an exception would be thrown (to indicate that another process has 
already taken its place).

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8434) Make global stream time consistent over all stream tasks

2019-05-26 Thread Richard Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu resolved KAFKA-8434.
---
Resolution: Fixed

> Make global stream time consistent over all stream tasks
> 
>
> Key: KAFKA-8434
> URL: https://issues.apache.org/jira/browse/KAFKA-8434
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Richard Yu
>Priority: Major
>  Labels: kip, needs-discussion
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8434) Make global stream time consistent over all stream tasks

2019-05-26 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-8434:
-

 Summary: Make global stream time consistent over all stream tasks
 Key: KAFKA-8434
 URL: https://issues.apache.org/jira/browse/KAFKA-8434
 Project: Kafka
  Issue Type: Improvement
Reporter: Richard Yu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8431) Add a onTimeoutExpired callback to Kafka Consumer

2019-05-26 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-8431:
-

 Summary: Add a onTimeoutExpired callback to Kafka Consumer
 Key: KAFKA-8431
 URL: https://issues.apache.org/jira/browse/KAFKA-8431
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Richard Yu


Currently, after the changes introduced in KIP-266, many methods in Kafka 
Consumer have a bounded execution time given by a user specified {{Duration}} 
parameter. However, in some cases, some methods could not perform their 
operations in the allocated timeout. In this case, the user might wish to have 
a {{onTimeoutExpired}} callback which would be called should a blocking method 
timeout before any results could be returned. 

The user can implement something like described above, but Kafka can spare the 
user the necessity of coding such a feature if we can support one by itself.

One possible use of this callback is to retry the method (e.g. the 
{{onTimeoutExpired}} callback triggers another call to the same method after 
some allocated time).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8388) Add methods to query for entries in KTable using timestamp

2019-05-17 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-8388:
-

 Summary: Add methods to query for entries in KTable using timestamp
 Key: KAFKA-8388
 URL: https://issues.apache.org/jira/browse/KAFKA-8388
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Richard Yu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8020) Consider making ThreadCache a time-aware LRU Cache

2019-02-28 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-8020:
-

 Summary: Consider making ThreadCache a time-aware LRU Cache
 Key: KAFKA-8020
 URL: https://issues.apache.org/jira/browse/KAFKA-8020
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Richard Yu


Currently, in Kafka Streams, ThreadCache is used to store 
{{InternalProcessorContext}}s. Typically, these entries apply for only a 
limited time span. For example, in {{CachingWindowStore}}, a window is of fixed 
size. After it expires, it would no longer be queried for, but it potentially 
could stay in the ThreadCache for an unnecessary amount of time if it is not 
evicted (i.e. the number of entries being inserted is few). For better 
allocation of memory, it would be better if we implement a time-aware LRU Cache 
which takes into account the lifespan of an entry and removes it once it has 
expired. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7850) Remove deprecated KStreamTestDriver

2019-01-21 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-7850:
-

 Summary: Remove deprecated KStreamTestDriver
 Key: KAFKA-7850
 URL: https://issues.apache.org/jira/browse/KAFKA-7850
 Project: Kafka
  Issue Type: Improvement
  Components: unit tests
Reporter: Richard Yu
Assignee: Richard Yu


Eversince a series of new test improvements were made to KafkaStreams, 
KStreamTestDriver was deprecated in favor of TopologyTestDriver. However, a 
couple existing unit tests continues to use KStreamTestDriver. We wish to 
migrate all remaining classes to TopologyTestDriver so we could remove the 
deprecated class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7132) Consider adding multithreaded form of recovery

2018-07-04 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-7132:
-

 Summary: Consider adding multithreaded form of recovery
 Key: KAFKA-7132
 URL: https://issues.apache.org/jira/browse/KAFKA-7132
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Richard Yu


Currently, when a consumer falls out of a consumer group, it will restart 
processing from the last checkpointed offset. However, this design could result 
in a lag which some users could not afford to let happen. For example, lets say 
a consumer crashed at offset 100, with the last checkpointed offset being at 
70. When it recovers at a later offset (say, 120), it will be behind by an 
offset range of 50 (120 - 70). This is because it restarted at 70, forcing it 
to reprocess old data. To avoid this from happening, one option would be to 
allow the current consumer to start processing not from the last checkpointed 
offset (which is 70 in the example), but from 120 where it recovers. Meanwhile, 
a new KafkaConsumer will be instantiated and start reading from offset 70 in 
concurrency with the old process, and will be terminated once it reaches 120. 
In this manner, a considerable amount of lag can be avoided, particularly since 
the old consumer could proceed as if nothing had happened. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7127) Add asynchronous support for methods in KafkaConsumer

2018-07-02 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-7127:
-

 Summary: Add asynchronous support for methods in KafkaConsumer
 Key: KAFKA-7127
 URL: https://issues.apache.org/jira/browse/KAFKA-7127
 Project: Kafka
  Issue Type: Wish
  Components: clients
Reporter: Richard Yu


Currently, in KafkaConsumer, various methods blocks due to a remote callback. 
It would be nice if we also added asynchronous version of these methods, like 
what was done with {{poll()}} in KIP-266 and {{commitAsync()}} which uses a 
{{OffsetCommitCallback}} as an input argument.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7118) Currently, it was discovered that KafkaConsumer's close() method might not be multi-thread safe when multiple cores are calling the same consumer.

2018-06-28 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-7118:
-

 Summary: Currently, it was discovered that KafkaConsumer's close() 
method might not be multi-thread safe when multiple cores are calling the same 
consumer. 
 Key: KAFKA-7118
 URL: https://issues.apache.org/jira/browse/KAFKA-7118
 Project: Kafka
  Issue Type: Bug
Reporter: Richard Yu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6608) Add timeout parameter to methods which fetches and resets offsets

2018-03-03 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-6608:
-

 Summary: Add timeout parameter to methods which fetches and resets 
offsets
 Key: KAFKA-6608
 URL: https://issues.apache.org/jira/browse/KAFKA-6608
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Richard Yu


In KAFKA-4879, Kafka Consumer hangs indefinitely due to Fetcher's {{timeout}} 
being set to {{Long.MAX_VALUE}}. While fixing this issue, it was pointed out 
that if a timeout was added to methods which updates and fetches offset 
positions, a tighter control on time could be achieved.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6583) Metadata should include number of state stores for task

2018-02-22 Thread Richard Yu (JIRA)
Richard Yu created KAFKA-6583:
-

 Summary: Metadata should include number of state stores for task
 Key: KAFKA-6583
 URL: https://issues.apache.org/jira/browse/KAFKA-6583
 Project: Kafka
  Issue Type: Improvement
Reporter: Richard Yu


Currently, in the need for clients to be more evenly balanced, stateful tasks 
should be distributed in such a manner that it will be spread equally. However, 
for such an awareness to be implemented during task assignment, it would 
require the need for the present rebalance protocol metadata to also contain 
the number of state stores in a particular task. This way, it will allow us to 
"weight" tasks during assignment. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)