Thanks, Vinoth. Looks like we are on the same page. I will add some of these 
explanations to the KIP as well. Have assigned the KAFKA-6144 to myself and 
KAFKA-8994 is closed(by you). As suggested, we will replace "replica" with 
"standby".

In the new API, "StreamsMetadataState::allMetadataForKey(boolean 
enableReplicaServing, String storeName, K key, Serializer<K> keySerializer)" Do 
we really need a per key configuration? or a new StreamsConfig is good 
enough?>> Coming from experience, when teams are building a platform with Kafka 
Streams and these API's serve data to multiple teams, we can't have a 
generalized config that says as a platform we will support stale reads or not. 
It should be the choice of someone who is calling the API's to choose whether 
they are ok with stale reads or not. Makes sense?
    On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar 
<vchan...@confluent.io> wrote:  
 
 Looks like we are covering ground :)

>>Only if it is within a permissible  range(say 10000) we will serve from
Restoring state of active.
+1 on having a knob like this.. My reasoning is as follows.

Looking at the Streams state as a read-only distributed kv store. With
num_standby = f , we should be able to tolerate f failures and if there is
a f+1' failure, the system should be unavailable.

A) So with num_standby=0, the system should be unavailable even if there is
1 failure and thats my argument for not allowing querying in restoration
state, esp in this case it will be a total rebuild of the state (which IMO
cannot be considered a normal fault free operational state).

B) Even there are standby's, say num_standby=2, if the user decides to shut
down all 3 instances, then only outcome should be unavailability until all
of them come back or state is rebuilt on other nodes in the cluster. In
normal operations, f <= 2 and when a failure does happen we can then either
choose to be C over A and fail IQs until replication is fully caught up or
choose A over C by serving in restoring state as long as lag is minimal. If
even with f=1 say, all the standbys are lagging a lot due to some issue,
then that should be considered a failure since that is different from
normal/expected operational mode. Serving reads with unbounded replication
lag and calling it "available" may not be very usable or even desirable :)
IMHO, since it gives the user no way to reason about the app that is going
to query this store.

So there is definitely a need to distinguish between :  Replication catchup
while being in fault free state vs Restoration of state when we lose more
than f standbys. This knob is a great starting point towards this.

If you agree with some of the explanation above, please feel free to
include it in the KIP as well since this is sort of our design principle
here..

Small nits :

- let's standardize on "standby" instead of "replica", KIP or code,  to be
consistent with rest of Streams code/docs?
- Can we merge KAFKA-8994 into KAFKA-6144 now and close the former?
Eventually need to consolidate KAFKA-6555 as well
- In the new API, "StreamsMetadataState::allMetadataForKey(boolean
enableReplicaServing, String storeName, K key, Serializer<K> keySerializer)" Do
we really need a per key configuration? or a new StreamsConfig is good
enough?

On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar
<navinder_b...@yahoo.com.invalid> wrote:

> @Vinoth, I have incorporated a few of the discussions we have had in the
> KIP.
>
> In the current code, t0 and t1 serve queries from Active(Running)
> partition. For case t2, we are planning to return List<StreamsMetadata>
> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so that if IQ
> fails on A, the replica on B can serve the data by enabling serving from
> replicas. This still does not solve case t3 and t4 since B has been
> promoted to active but it is in Restoring state to catchup till A’s last
> committed position as we don’t serve from Restoring state in Active and new
> Replica on R is building itself from scratch. Both these cases can be
> solved if we start serving from Restoring state of active as well since it
> is almost equivalent to previous Active.
>
> There could be a case where all replicas of a partition become unavailable
> and active and all replicas of that partition are building themselves from
> scratch, in this case, the state in Active is far behind even though it is
> in Restoring state. To cater to such cases that we don’t serve from this
> state we can either add another state before Restoring or check the
> difference between last committed offset and current position. Only if it
> is within a permissible range (say 10000) we will serve from Restoring the
> state of Active.
>
>
>    On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar <
> vchan...@confluent.io> wrote:
>
>  Thanks for the updates on the KIP, Navinder!
>
> Few comments
>
> - AssignmentInfo is not public API?. But we will change it and thus need to
> increment the version and test for version_probing etc. Good to separate
> that from StreamsMetadata changes (which is public API)
> - From what I see, there is going to be choice between the following
>
>  A) introducing a new *KafkaStreams::allMetadataForKey() *API that
> potentially returns List<StreamsMetadata> ordered from most upto date to
> least upto date replicas. Today we cannot fully implement this ordering,
> since all we know is which hosts are active and which are standbys.
> However, this aligns well with the future. KIP-441 adds the lag information
> to the rebalancing protocol. We could also sort replicas based on the
> report lags eventually. This is fully backwards compatible with existing
> clients. Only drawback I see is the naming of the existing method
> KafkaStreams::metadataForKey, not conveying the distinction that it simply
> returns the active replica i.e allMetadataForKey.get(0).
>  B) Change KafkaStreams::metadataForKey() to return a List. Its a breaking
> change.
>
> I prefer A, since none of the semantics/behavior changes for existing
> users. Love to hear more thoughts. Can we also work this into the KIP?
> I already implemented A to unblock myself for now. Seems feasible to do.
>
>
> On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar <vchan...@confluent.io>
> wrote:
>
> > >>I get your point. But suppose there is a replica which has just become
> > active, so in that case replica will still be building itself from
> scratch
> > and this active will go to restoring state till it catches up with
> previous
> > active, wouldn't serving from a restoring active make more sense than a
> > replica in such case.
> >
> > KIP-441 will change this behavior such that promotion to active happens
> > based on how caught up a replica is. So, once we have that (work underway
> > already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the
> > staleness window should not be that long as you describe. IMO if user
> wants
> > availability for state, then should configure num.standby.replicas > 0.
> If
> > not, then on a node loss, few partitions would be unavailable for a while
> > (there are other ways to bring this window down, which I won't bring in
> > here). We could argue for querying a restoring active (say a new node
> added
> > to replace a faulty old node) based on AP vs CP principles. But not sure
> > reading really really old values for the sake of availability is useful.
> No
> > AP data system would be inconsistent for such a long time in practice.
> >
> > So, I still feel just limiting this to standby reads provides best
> > semantics.
> >
> > Just my 2c. Would love to see what others think as well.
> >
> > On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
> > <navinder_b...@yahoo.com.invalid> wrote:
> >
> >> Hi Vinoth,
> >> Thanks for the feedback.
> >>  Can we link the JIRA, discussion thread also to the KIP.>> Added.
> >> Based on the discussion on KAFKA-6144, I was under the impression that
> >> this KIP is also going to cover exposing of the standby information in
> >> StreamsMetadata and thus subsume KAFKA-8994 . That would require a
> public
> >> API change?>> Sure, I can add changes for 8994 in this KIP and link
> >> KAFKA-6144 to KAFKA-8994 as well.
> >>  KIP seems to be focussing on restoration when a new node is added.
> >> KIP-441 is underway and has some major changes proposed for this. It
> would
> >> be good to clarify dependencies if any. Without KIP-441, I am not very
> sure
> >> if we should allow reads from nodes in RESTORING state, which could
> amount
> >> to many minutes/few hours of stale reads?  This is different from
> allowing
> >> querying standby replicas, which could be mostly caught up and the
> >> staleness window could be much smaller/tolerable. (once again the focus
> on
> >> KAFKA-8994).>> I get your point. But suppose there is a replica which
> has
> >> just become active, so in that case replica will still be building
> itself
> >> from scratch and this active will go to restoring state till it catches
> up
> >> with previous active, wouldn't serving from a restoring active make more
> >> sense than a replica in such case.
> >>
> >> Finally, we may need to introduce a configuration to control this. Some
> >> users may prefer errors to stale data. Can we also add it to the KIP?>>
> >> Will add this.
> >>
> >> Regards,
> >> Navinder
> >>
> >>
> >> On2019/10/14 16:56:49, Vinoth Chandar <v...@confluent.io>wrote:
> >>
> >> >Hi Navinder,>
> >>
> >> >
> >>
> >> >Thanks for sharing the KIP! Few thoughts>
> >>
> >> >
> >>
> >> >- Can we link the JIRA, discussion thread also to the KIP>
> >>
> >> >- Based on the discussion on KAFKA-6144, I was under the impression
> >> that>
> >>
> >> >this KIP is also going to cover exposing of the standby information in>
> >>
> >> >StreamsMetadata and thus subsume KAFKA-8994 . That would require a
> >> public>
> >>
> >> >API change?>
> >>
> >> >- KIP seems to be focussing on restoration when a new node is added.>
> >>
> >> >KIP-441 is underway and has some major changes proposed for this. It
> >> would>
> >>
> >> >be good to clarify dependencies if any. Without KIP-441, I am not very
> >> sure>
> >>
> >> >if we should allow reads from nodes in RESTORING state, which could
> >> amount>
> >>
> >> >to many minutes/few hours of stale reads?  This is different
> >> fromallowing>
> >>
> >> >querying standby replicas, which could be mostly caught up and the>
> >>
> >> >staleness window could be much smaller/tolerable. (once again the focus
> >> on>
> >>
> >> >KAFKA-8994)>
> >>
> >> >- Finally, we may need to introduce a configuration to control this.
> >> Some>
> >>
> >> >users may prefer errors to stale data. Can we also add it to the KIP?>
> >>
> >> >
> >>
> >> >Thanks>
> >>
> >> >Vinoth>
> >>
> >> >
> >>
> >> >
> >>
> >> >
> >>
> >> >
> >>
> >> >On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar>
> >>
> >> ><na...@yahoo.com.invalid>wrote:>
> >>
> >> >
> >>
> >> >> Hi,>
> >>
> >> >> Starting a discussion on the KIP to Allow state stores to serve
> stale>
> >>
> >> >> reads during rebalance(>
> >>
> >> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> >> >
> >>
> >> >> ).>
> >>
> >> >> Thanks & Regards,Navinder>
> >>
> >> >> LinkedIn>
> >>
> >> >>>
> >> >
> >
> >
>  

Reply via email to