[jira] [Resolved] (KAFKA-10657) Incorporate Envelope into auto-generated JSON schema

2023-03-09 Thread Vinoth (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth resolved KAFKA-10657.

Resolution: Resolved

Has been fixed in https://issues.apache.org/jira/browse/KAFKA-10525

> Incorporate Envelope into auto-generated JSON schema
> 
>
> Key: KAFKA-10657
> URL: https://issues.apache.org/jira/browse/KAFKA-10657
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>    Assignee: Vinoth
>Priority: Major
>
> We need to add support to output JSON format for embed request inside 
> Envelope to do better request logging.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: A query on log truncation.

2023-03-06 Thread Vinoth
Hi Luke ,
  Thanks for acknowledging my mail. Sorry for the late reply.
My query was not on keeping uncommitted records but on how teams managed
the loss of committed data in case of unclean leader election. Is there a
means to track lost data?. Is this a common problem?. I am asking based on
kip-320 which mentions committed data might be lost when unclean leader
election is enabled.

Regards,
Vinoth

On Mon, 16 Jan 2023 at 10:37, Luke Chen  wrote:

> Hi Vinoth,
>
> I'm wondering what's the use case or pain point you're trying to resolve?
> Like you said, the client will be notified the data is not successfully
> sent or propagated and handle the error, why should we keep the un-commited
> records?
> Could you elaborate more on the motivation?
>
> Thank you.
> Luke
>
> On Mon, Jan 16, 2023 at 12:33 PM Vinoth  wrote:
>
> > I was reading through about kafka , the way leader election works , log
> > truncation etc. One thing that kind of struck me was how records which
> were
> > written to log but then were not committed (It has not propagated
> > successfully through to all of the isr and and the high watermark has not
> > increased and so not committed ) ,were truncated following the
> replication
> > reconciliation logic . In case they are not committed they would not be
> > available for the consumer since the reads are  only upto to the high
> > watermark. the producer client will also be notified or will eventually
> > know if the message has not successfully propagated and it should be
> > handled thru application logic. It seems straight forward in this case.
> >
> > KIP-405
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage
> > >
> > talks about tiered storage and kafka being an important part of and an
> > entry point for data infrastructure . Else where i have read that kafka
> > also serves as way of replaying data to restore state / viewing data.
> > KIP-320
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation
> > >
> > mentions users wanting higher availability opting for unclean leader
> > election.
> >
> > Would it be fair to assume that users might be interested in a feature or
> > at least  one that can be user enabled where a write to kafka (even with
> a
> > 0 or no acks configuration or unlcean leader election ) will remain
> written
> > until the event where clean or delete config is acted upon?.
> >
> > If this is a valid use case , i have thoughts of suggesting a kip around
> > picking up the data that is to be truncated at time of truncation and
> > replaying it as if it came through a fresh produce request. That is a
> > truncation of data will not result in the data being removed from kafka
> but
> > rather be placed differently at a different offset.
> >
> > Regards,
> > Vinoth
> >
>


[jira] [Created] (KAFKA-14778) Kafka Streams 2.7.1 to 3.3.1 rolling upgrade with static membership triggers a rebalance

2023-03-06 Thread Vinoth Rengarajan (Jira)
Vinoth Rengarajan created KAFKA-14778:
-

 Summary: Kafka Streams 2.7.1 to 3.3.1 rolling upgrade with static 
membership triggers a rebalance
 Key: KAFKA-14778
 URL: https://issues.apache.org/jira/browse/KAFKA-14778
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1, 2.7.1
Reporter: Vinoth Rengarajan


Trying to upgrade Kaka Streams application from 2.7.1 to 3.3.1 with static 
membership but it triggers a rebalance

Brokers are running on Kafka 2.7.1. Enabled the static membership in the 
application. Below are the configs {*}(Stream Config & Consumer Config){*}.

Followed below steps to upgrade
 * Brokers are running on Kafka 2.7.1(tried with 3.3.1 version then also 
rebalance happens).
 * Application is running with 2.7.1 Kafka streams libraries.
 * Deployed the latest version of the application with 3.3.1 Kafka streams 
libraries, and configured the *upgrade.from* property to 2.7 (based on the 
upgrade documentation available here 
[https://kafka.apache.org/33/documentation/streams/upgrade-guide]).
 * Doing a rolling bounce with the latest changes, rebalance is being triggered 
on other instances in the cluster.

Below are logs on the instance which is being bounced, forcing a rebalance on 
others. 

*Logs:*

 
{code:java}
INFO  2023-02-27 09:52:16.805 | streams.KafkaStreams stream-client 
[kafka_upgrade.Kafka_Upgrade_Test] State transition from CREATED to REBALANCING
INFO  2023-02-27 09:52:16.946 | internals.ConsumerCoordinator [Consumer 
instanceId=kafka_upgrade.Kafka_Upgrade_Test-4, 
clientId=kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer, 
groupId=kafka_upgrade.Kafka_Upgrade_Test] Notifying assignor about the new 
Assignment(partitions=[kafka_upgrade.Kafka_Upgrade_Test-version-updates-11, 
kafka_upgrade.Kafka_Upgrade_Test-version-updates-23], userDataSize=56)
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] Sent a 
version 11 subscription and got version 8 assignment back (successful version 
probing). Downgrade subscription metadata to commonly supported version 8 and 
trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] Sent a 
version 11 subscription and got version 8 assignment back (successful version 
probing). Downgrade subscription metadata to commonly supported version 8 and 
trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] Sent a 
version 11 subscription and got version 8 assignment back (successful version 
probing). Downgrade subscription metadata to commonly supported version 8 and 
trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] Sent a 
version 11 subscription and got version 8 assignment back (successful version 
probing). Downgrade subscription metadata to commonly supported version 8 and 
trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor 
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer] 
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor 
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer] 
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor 
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer] 
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor 
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer] 
Requested to schedule immediate rebalance due to version probing. {code}
 

 

*Streams Config:*

 
{code:java}
acceptable.recovery.lag = 1
application.id = Kafka_Upgrade_Test
application.server =
bootstrap.servers = [broker1, broker2, broker3]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id = kafka_upgrade.Kafka_Upgrade_Test
commit.interval.ms = 3
connections.max.idle.ms = 54
default.deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.dsl.store = rocksDB
default.key.serde = null
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class 
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = c

A query on log truncation.

2023-01-15 Thread Vinoth
I was reading through about kafka , the way leader election works , log
truncation etc. One thing that kind of struck me was how records which were
written to log but then were not committed (It has not propagated
successfully through to all of the isr and and the high watermark has not
increased and so not committed ) ,were truncated following the replication
reconciliation logic . In case they are not committed they would not be
available for the consumer since the reads are  only upto to the high
watermark. the producer client will also be notified or will eventually
know if the message has not successfully propagated and it should be
handled thru application logic. It seems straight forward in this case.

KIP-405
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage>
talks about tiered storage and kafka being an important part of and an
entry point for data infrastructure . Else where i have read that kafka
also serves as way of replaying data to restore state / viewing data.
KIP-320
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation>
mentions users wanting higher availability opting for unclean leader
election.

Would it be fair to assume that users might be interested in a feature or
at least  one that can be user enabled where a write to kafka (even with a
0 or no acks configuration or unlcean leader election ) will remain written
until the event where clean or delete config is acted upon?.

If this is a valid use case , i have thoughts of suggesting a kip around
picking up the data that is to be truncated at time of truncation and
replaying it as if it came through a fresh produce request. That is a
truncation of data will not result in the data being removed from kafka but
rather be placed differently at a different offset.

Regards,
Vinoth


Request permissions to contribute to kafka

2022-10-09 Thread Vinoth
Jira ID : Vinoth96

Regards,
Vinoth


Testing Kafka on laptop

2022-09-20 Thread vinoth rajagopalan
I want know if I can run the bench mark tests for kafka on a single laptop.
If not is there anything available for open source contributors to run
thier tests that is provided by the apache software foundation

Regards,
Vinoth


[jira] [Created] (KAFKA-10258) Get rid of use_zk_connection flag in kafka.py public methods

2020-07-09 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-10258:
--

 Summary: Get rid of use_zk_connection flag in kafka.py public 
methods
 Key: KAFKA-10258
 URL: https://issues.apache.org/jira/browse/KAFKA-10258
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vinoth Chandar






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10213) Prefer --bootstrap-server in ducktape tests for Kafka clients

2020-06-29 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-10213:
--

 Summary: Prefer --bootstrap-server in ducktape tests for Kafka 
clients
 Key: KAFKA-10213
 URL: https://issues.apache.org/jira/browse/KAFKA-10213
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10138) Prefer --bootstrap-server for reassign_partitions command in ducktape tests

2020-06-22 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar resolved KAFKA-10138.

Resolution: Fixed

> Prefer --bootstrap-server for reassign_partitions command in ducktape tests
> ---
>
> Key: KAFKA-10138
> URL: https://issues.apache.org/jira/browse/KAFKA-10138
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vinoth Chandar
>    Assignee: Vinoth Chandar
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10174) Prefer --bootstrap-server ducktape tests using kafka_configs.sh

2020-06-16 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-10174:
--

 Summary: Prefer --bootstrap-server ducktape tests using 
kafka_configs.sh
 Key: KAFKA-10174
 URL: https://issues.apache.org/jira/browse/KAFKA-10174
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10138) Prefer --bootstrap-server for reassign_partitions command in ducktape tests

2020-06-10 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-10138:
--

 Summary: Prefer --bootstrap-server for reassign_partitions command 
in ducktape tests
 Key: KAFKA-10138
 URL: https://issues.apache.org/jira/browse/KAFKA-10138
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10131) Minimize use of --zookeeper flag in ducktape tests

2020-06-09 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-10131:
--

 Summary: Minimize use of --zookeeper flag in ducktape tests
 Key: KAFKA-10131
 URL: https://issues.apache.org/jira/browse/KAFKA-10131
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar


Get the ducktape tests working without the --zookeeper flag (except for scram).

(Note: When doing compat testing we'll still use the old flags.)

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10071) TopicCommand tool should make more efficient metadata calls to Kafka Servers

2020-05-29 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-10071:
--

 Summary: TopicCommand tool should make more efficient metadata 
calls to Kafka Servers
 Key: KAFKA-10071
 URL: https://issues.apache.org/jira/browse/KAFKA-10071
 Project: Kafka
  Issue Type: Improvement
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar


This is a follow up from discussion of. KAFKA-9945 

[https://github.com/apache/kafka/pull/8737] 

alter, describe, delete all pull down the entire topic list today, in order to 
support regex matching .. We need to make these commands much more efficient 
(there is also the issue that regex includes support for period.. so may be we 
need two different switches going forward).. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9512) Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration

2020-02-18 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar resolved KAFKA-9512.
---
Resolution: Fixed

Closing since the PR is now landed

> Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
> ---
>
> Key: KAFKA-9512
> URL: https://issues.apache.org/jira/browse/KAFKA-9512
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.5.0
>Reporter: Matthias J. Sax
>Assignee: Vinoth Chandar
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/497/testReport/junit/org.apache.kafka.streams.integration/LagFetchIntegrationTest/shouldFetchLagsDuringRestoration/]
> {quote}java.lang.NullPointerException at 
> org.apache.kafka.streams.integration.LagFetchIntegrationTest.shouldFetchLagsDuringRestoration(LagFetchIntegrationTest.java:306){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-21 Thread Vinoth Chandar
Chiming in a bit late here..

+1 This is a very valid improvement. Avoiding doing gets on irrelevant
partitions will improve performance and efficiency for IQs.

As an incremental improvement to the current APIs,  adding an option to
filter out based on partitions makes sense







On Mon, Jan 20, 2020 at 3:13 AM Navinder Brar
 wrote:

> Thanks John. If there are no other comments to be addressed, I will start
> a vote today so that we are on track for this release.~Navinder
>
>
> On Monday, January 20, 2020, 8:32 AM, John Roesler 
> wrote:
>
> Thanks, Navinder,
>
> The Param object looks a bit different than I would have done, but it
> certainly is explicit. We might have to deprecate those particular factory
> methods and move to a builder pattern if we need to add any more options in
> the future, but I’m fine with that possibility.
>
> The KIP also discusses some implementation details that aren’t necessary
> here. We really only need to see the public interfaces. We can discuss the
> implementation in the PR.
>
> That said, the public API part of the current proposal looks good to me! I
> would be a +1 if you called for a vote.
>
> Thanks,
> John
>
> On Sun, Jan 19, 2020, at 20:50, Navinder Brar wrote:
> > I have made some edits in the KIP, please take another look. It would
> > be great if we can push it in 2.5.0.
> > ~Navinder
> >
> >
> > On Sunday, January 19, 2020, 12:59 AM, Navinder Brar
> >  wrote:
> >
> > Sure John, I will update the StoreQueryParams with static factory
> > methods.
> > @Ted, we would need to create taskId only in case a user provides one
> > single partition. In case user wants to query all partitions of an
> > instance the current code is good enough where we iterate over all
> > stream threads and go over all taskIds to match the store. But in case
> > a user requests for a single partition-based store, we need to create a
> > taskId out of that partition and store name(using
> > internalTopologyBuilder class) and match with the taskIds belonging to
> > that instance. I will add the code in the KIP.
> >
> > On Sunday, 19 January, 2020, 12:47:08 am IST, Ted Yu
> >  wrote:
> >
> >  Looking at the current KIP-562:
> >
> > bq. Create a taskId from the combination of store name and partition
> > provided by the user
> >
> > I wonder if a single taskId would be used for the “all partitions” case.
> > If so, we need to choose a numerical value for the partition portion of
> the
> > taskId.
> >
> > On Sat, Jan 18, 2020 at 10:27 AM John Roesler 
> wrote:
> >
> > > Thanks, Ted!
> > >
> > > This makes sense, but it seems like we should lean towards explicit
> > > semantics in the public API. ‘-1’ meaning “all partitions” is
> reasonable,
> > > but not explicit. That’s why I suggested the Boolean for “all
> partitions”.
> > > I guess this also means getPartition() should either throw an
> exception or
> > > return null if the partition is unspecified.
> > >
> > > Thanks,
> > > John
> > >
> > > On Sat, Jan 18, 2020, at 08:43, Ted Yu wrote:
> > > > I wonder if the following two methods can be combined:
> > > >
> > > > Integer getPartition() // would be null if unset or if "all
> partitions"
> > > > boolean getAllLocalPartitions() // true/false if "all partitions"
> > > requested
> > > >
> > > > into:
> > > >
> > > > Integer getPartition() // would be null if unset or -1 if "all
> > > partitions"
> > > >
> > > > Cheers
> > > >
> > > > On Fri, Jan 17, 2020 at 9:56 PM John Roesler 
> > > wrote:
> > > >
> > > > > Thanks, Navinder!
> > > > >
> > > > > I took a look at the KIP.
> > > > >
> > > > > We tend to use static factory methods instead of public
> constructors,
> > > and
> > > > > also builders for optional parameters.
> > > > >
> > > > > Given that, I think it would be more typical to have a factory
> method:
> > > > > storeQueryParams()
> > > > >
> > > > > and also builders for setting the optional parameters, like:
> > > > > withPartitions(List partitions)
> > > > > withStaleStoresEnabled()
> > > > > withStaleStoresDisabled()
> > > > >
> > > > >
> > > > > I was also thinking this over today, and it really seems like
> there are
> > > > > two main cases for specifying partitions,
> > > > > 1. you know exactly what partition you want. In this case, you'll
> only
> > > > > pass in a single number.
> > > > > 2. you want to get a handle on all the stores for this instance
> (the
> > > > > current behavior). In this case, it's not clear how to use
> > > withPartitions
> > > > > to achieve the goal, unless you want to apply a-priori knowledge
> of the
> > > > > number of partitions in the store. We could consider an empty
> list, or
> > > a
> > > > > null, to indicate "all", but that seems a little complicated.
> > > > >
> > > > > Thus, maybe it would actually be better to eschew withPartitions
> for
> > > now
> > > > > and instead just offer:
> > > > > withPartition(int partition)
> > > > > withAllLocalPartitions()
> > > > >
> > > > > and the getters:
> > > > > Integer getPartition() /

[jira] [Created] (KAFKA-9431) Expose API in KafkaStreams to fetch all local offset lags

2020-01-14 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-9431:
-

 Summary: Expose API in KafkaStreams to fetch all local offset lags
 Key: KAFKA-9431
 URL: https://issues.apache.org/jira/browse/KAFKA-9431
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar
 Fix For: 2.5.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9430) Tighten up lag estimates when source topic optimization is on

2020-01-14 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-9430:
-

 Summary: Tighten up lag estimates when source topic optimization 
is on 
 Key: KAFKA-9430
 URL: https://issues.apache.org/jira/browse/KAFKA-9430
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar


Right now, we use _endOffsets_ of the source topic for the computation. Since 
the source topics can also have user event produces, this is an over estimate

 

>From John:

For "optimized" changelogs, this will be wrong, strictly speaking, but it's an 
over-estimate (which seems better than an under-estimate), and it's also still 
an apples-to-apples comparison, since all replicas would use the same upper 
bound to compute their lags, so the "pick the freshest" replica is still going 
to pick the right one. We can add a new 2.5 blocker ticket to really fix it, 
and not worry about it until after this KSQL stuff is done.

 

For active: we need to use  consumed offsets and not end of source topic



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9429) Allow ability to control whether stale reads out of state stores are desirable

2020-01-14 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-9429:
-

 Summary: Allow ability to control whether stale reads out of state 
stores are desirable
 Key: KAFKA-9429
 URL: https://issues.apache.org/jira/browse/KAFKA-9429
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar
 Fix For: 2.5.0


>From John :

 

I also meant to talk with you about the change to allow querying recovering 
stores. I think you might have already talked with Matthias a little about this 
in the scope of KIP-216, but it's probably not ok to just change the default 
from only allowing query while running, since there are actually people 
depending on full-consistency queries for correctness right now.

 

What we can do is add an overload {{KafkaStreams.store(name, 
QueriableStoreType, QueriableStoreOptions)}}, with one option: 
{{queryStaleState(true/false)}} (your preference on the name, I just made that 
up right now). The default would be false, and KSQL would set it to true. While 
false, it would not allow querying recovering stores OR standbys. This 
basically allows a single switch to preserve existing behavior.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9428) Expose standby information in KafkaStreams via queryMetadataForKey API

2020-01-14 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-9428:
-

 Summary: Expose standby information in KafkaStreams via 
queryMetadataForKey API
 Key: KAFKA-9428
 URL: https://issues.apache.org/jira/browse/KAFKA-9428
 Project: Kafka
  Issue Type: Sub-task
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar
 Fix For: 2.5.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS]KIP-216: IQ should throw different exceptions for different errors

2020-01-10 Thread Vinoth Chandar
+1 on a streams config to control this behavior. I have no strong opinions
on the default, but I would pick allowing to query if standbys are enabled
else throw the exception..
But we can keep it simpler, throw exception by default and have a flag to
turn it off, as you suggest as well.

On Thu, Jan 9, 2020 at 12:01 PM Matthias J. Sax 
wrote:

> Good question about `StreamsRebalancingException` -- when this KIP was
> started, KIP-535 was not on the horizon yet.
>
> What I am wondering is, if we should allow people to opt-in into
> querying during a rebalance, or to be more precise during a restore (if
> a state store is not migrated, it will be up-to-date during a rebalance
> and can be queried returning correct, ie, non-stall, data)?
>
> Otherwise, if people want to get only correct results, ie, they never
> want to query stall state, they have no way to implement it, because
> they are always subject to a race condition.
>
> For this case, we could have a `StateStoreIsRecoveringException` (or
> similar) that is only throw during a restore phases (and people can
> opt-in / opt-out if this exception should be throws or not, ie, if they
> want to query stall state during recovery or not).
>
> It's unclear to me though atm, how a user would opt-in/opt-out and what
> the default should be (maybe better to throw the exception by default to
> have strong consistency guarantees by default?)
>
>
> -Matthias
>
>
> On 1/9/20 11:35 AM, Vinoth Chandar wrote:
> > +1 on merging `StreamsNotRunningException` and
> `StateStoreNotAvailableException`, both exceptions are fatal anyway. IMO
> its best to have these exceptions be about the state store (and not streams
> state), to easier understanding.
> >
> > Additionally, KIP-535 allows for querying of state stores in rebalancing
> state. So do we need the StreamsRebalancingException?
> >
> >
> > On 2020/01/09 03:38:11, "Matthias J. Sax" 
> wrote:
> >> Sorry that I dropped the ball on this...
> >>
> >> Thanks for updating the KIP. Overall LGTM now. Feel free to start a VOTE
> >> thread.
> >>
> >> What is still unclear to me is, what we gain by having both
> >> `StreamsNotRunningException` and `StateStoreNotAvailableException`. Both
> >> exception are thrown when KafkaStreams is in state PENDING_SHUTDOWN /
> >> NOT_RUNNING / ERROR. Hence, as a user what do I gain to know if the
> >> state store is closed on not -- I can't query it anyway? Maybe I miss
> >> something thought?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 11/3/19 6:07 PM, Vito Jeng wrote:
> >>> Sorry for the late reply, thanks for the review.
> >>>
> >>>
> >>>> About `StateStoreMigratedException`:
> >>>>
> >>>> Why is it only thrown if the state is REBALANCING? A store might be
> >>>> migrated during a rebalance, and Kafka Streams might resume back to
> >>>> RUNNING state and afterward somebody tries to use an old store handle.
> >>>> Also, if state is REBALANCING, should we throw
> >>>> `StreamThreadRebalancingException`? Hence, I think
> >>>> `StateStoreMigratedException` does only make sense during `RUNNING`
> state.
> >>>>
> >>>
> >>> Thank you point this, already updated.
> >>>
> >>>
> >>> Why do we need to distinguish between `KafkaStreamsNotRunningException`
> >>>> and `StateStoreNotAvailableException`?
> >>>>
> >>>
> >>> `KafkaStreamsNotRunningException` may be caused by various reasons, I
> think
> >>> it would be helpful that the
> >>> user can distinguish whether it is caused by the state store closed.
> >>> (Maybe I am wrong...)
> >>>
> >>>
> >>> Last, why do we distinguish between `KafkaStreams` instance and
> >>>> `StreamsThread`? To me, it seems we should always refer to the
> instance,
> >>>> because that is the level of granularity in which we enable/disable
> IQ atm.
> >>>>
> >>>
> >>> Totally agree. Do you mean the naming of state store exceptions?
> >>> I don't have special reason to distinguish these two.
> >>> Your suggestion look more reasonable for the exception naming.
> >>>
> >>>
> >>> Last, for `StateStoreMigratedException`, I would add that a user need
> to
> >>>> rediscover the store and cannot blindly retry as the store handle is
> >>>> invalid and a new store 

Re: [DISCUSS]KIP-216: IQ should throw different exceptions for different errors

2020-01-09 Thread Vinoth Chandar
+1 on merging `StreamsNotRunningException` and 
`StateStoreNotAvailableException`, both exceptions are fatal anyway. IMO its 
best to have these exceptions be about the state store (and not streams state), 
to easier understanding. 

Additionally, KIP-535 allows for querying of state stores in rebalancing state. 
So do we need the StreamsRebalancingException? 


On 2020/01/09 03:38:11, "Matthias J. Sax"  wrote: 
> Sorry that I dropped the ball on this...
> 
> Thanks for updating the KIP. Overall LGTM now. Feel free to start a VOTE
> thread.
> 
> What is still unclear to me is, what we gain by having both
> `StreamsNotRunningException` and `StateStoreNotAvailableException`. Both
> exception are thrown when KafkaStreams is in state PENDING_SHUTDOWN /
> NOT_RUNNING / ERROR. Hence, as a user what do I gain to know if the
> state store is closed on not -- I can't query it anyway? Maybe I miss
> something thought?
> 
> 
> -Matthias
> 
> 
> On 11/3/19 6:07 PM, Vito Jeng wrote:
> > Sorry for the late reply, thanks for the review.
> > 
> > 
> >> About `StateStoreMigratedException`:
> >>
> >> Why is it only thrown if the state is REBALANCING? A store might be
> >> migrated during a rebalance, and Kafka Streams might resume back to
> >> RUNNING state and afterward somebody tries to use an old store handle.
> >> Also, if state is REBALANCING, should we throw
> >> `StreamThreadRebalancingException`? Hence, I think
> >> `StateStoreMigratedException` does only make sense during `RUNNING` state.
> >>
> > 
> > Thank you point this, already updated.
> > 
> > 
> > Why do we need to distinguish between `KafkaStreamsNotRunningException`
> >> and `StateStoreNotAvailableException`?
> >>
> > 
> > `KafkaStreamsNotRunningException` may be caused by various reasons, I think
> > it would be helpful that the
> > user can distinguish whether it is caused by the state store closed.
> > (Maybe I am wrong...)
> > 
> > 
> > Last, why do we distinguish between `KafkaStreams` instance and
> >> `StreamsThread`? To me, it seems we should always refer to the instance,
> >> because that is the level of granularity in which we enable/disable IQ atm.
> >>
> > 
> > Totally agree. Do you mean the naming of state store exceptions?
> > I don't have special reason to distinguish these two.
> > Your suggestion look more reasonable for the exception naming.
> > 
> > 
> > Last, for `StateStoreMigratedException`, I would add that a user need to
> >> rediscover the store and cannot blindly retry as the store handle is
> >> invalid and a new store handle must be retrieved. That is a difference
> >> to `StreamThreadRebalancingException` that allows for "blind" retries
> >> that either resolve (if the store is still on the same instance after
> >> rebalancing finishes, or changes to `StateStoreMigratedException` if the
> >> store was migrated away during rebalancing).
> >>
> > 
> > Nice, it's great! Thank you.
> > 
> > 
> > The KIP already updated, please take a look. :)
> > 
> > 
> > 
> > On Wed, Oct 23, 2019 at 1:48 PM Matthias J. Sax 
> > wrote:
> > 
> >> Any update on this KIP?
> >>
> >> On 10/7/19 3:35 PM, Matthias J. Sax wrote:
> >>> Sorry for the late reply. The 2.4 deadline kept us quite busy.
> >>>
> >>> About `StateStoreMigratedException`:
> >>>
> >>> Why is it only thrown if the state is REBALANCING? A store might be
> >>> migrated during a rebalance, and Kafka Streams might resume back to
> >>> RUNNING state and afterward somebody tries to use an old store handle.
> >>> Also, if state is REBALANCING, should we throw
> >>> `StreamThreadRebalancingException`? Hence, I think
> >>> `StateStoreMigratedException` does only make sense during `RUNNING`
> >> state.
> >>>
> >>>
> >>> Why do we need to distinguish between `KafkaStreamsNotRunningException`
> >>> and `StateStoreNotAvailableException`?
> >>>
> >>>
> >>> Last, why do we distinguish between `KafkaStreams` instance and
> >>> `StreamsThread`? To me, it seems we should always refer to the instance,
> >>> because that is the level of granularity in which we enable/disable IQ
> >> atm.
> >>>
> >>>
> >>> Last, for `StateStoreMigratedException`, I would add that a user need to
> >>> rediscover the store and cannot blindly retry as the store handle is
> >>> invalid and a new store handle must be retrieved. That is a difference
> >>> to `StreamThreadRebalancingException` that allows for "blind" retries
> >>> that either resolve (if the store is still on the same instance after
> >>> rebalancing finishes, or changes to `StateStoreMigratedException` if the
> >>> store was migrated away during rebalancing).
> >>>
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 8/9/19 10:20 AM, Vito Jeng wrote:
>  My bad. The short link `https://shorturl.at/CDNT9`
> >> 
>   seems incorrect.
> 
>  Please use the following instead: https://shorturl.at/bkKQU
> 
> 
>  ---
>  Vito
> 
> 
>  On Fri, Aug 9, 2019 at 10:53 AM Vito Jeng  wrote:
>

Re: [VOTE] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-18 Thread Vinoth Chandar
Thanks, everyone involved!

On Mon, Nov 18, 2019 at 7:51 AM John Roesler  wrote:

> Thanks to you, also, Navinder!
>
> Looking forward to getting this feature in.
> -John
>
> On Sun, Nov 17, 2019 at 11:34 PM Navinder Brar
>  wrote:
> >
> >  Hello all,
> >
> > With 4 binding +1 votes from Guozhang Wang, Matthias J. Sax, Bill Bejeck,
> > and John Roesler, the vote passes.
> > Thanks Guozhang, Matthias, Bill, John, Sophie for the healthy
> discussions and Vinoth for all the help on this KIP.
> > Best,
> > Navinder
> >
> > On Friday, 15 November, 2019, 11:32:31 pm IST, John Roesler <
> j...@confluent.io> wrote:
> >
> >  I'm +1 (binding) as well.
> >
> > Thanks,
> > -John
> >
> > On Fri, Nov 15, 2019 at 6:20 AM Bill Bejeck  wrote:
> > >
> > > +1 (binding)
> > >
> > > On Fri, Nov 15, 2019 at 1:11 AM Matthias J. Sax  >
> > > wrote:
> > >
> > > > +1 (binding)
> > > >
> > > >
> > > > On 11/14/19 3:48 PM, Guozhang Wang wrote:
> > > > > +1 (binding), thanks for the KIP!
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Fri, Nov 15, 2019 at 4:38 AM Navinder Brar
> > > > >  wrote:
> > > > >
> > > > >> Hello all,
> > > > >>
> > > > >> I'd like to propose a vote for serving interactive queries during
> > > > >> Rebalancing, as it is a big deal for applications looking for high
> > > > >> availability. With this change, users will have control over the
> > > > tradeoff
> > > > >> between consistency and availability during serving.
> > > > >> The full KIP is provided here:
> > > > >>
> > > > >>
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> > > > >>
> > > > >>
> > > > >> Thanks,
> > > > >> Navinder
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> >
>


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Vinoth Chandar
>during restoring state the active might have some lag

Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
lag as well then. If active is too laggy, the app can then deem the store
partition unavailable (based on what the application is willing to
tolerate).

@matthias do you agree? We can then begin the vote.

On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar
 wrote:

> I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all
> actives don't have 0 lag, as during restoring state the active might have
> some lag and one of the features of this KIP is to provide an option to
> query from active (which might be in restoring state).
> I will update the KIP with rejected alternatives and post this will start
> a vote if everyone agrees on this.
> On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler <
> j...@confluent.io> wrote:
>
>  Hi all,
>
> Thanks for the "reset", Vinoth. It brings some needed clarity to the
> discussion.
>
> 10. My 2 cents: we might as well include the lags for the active
> copies as well. This is a more context-free API. If we only include
> standbys, this choice won't make sense to users unless they understand
> that the active task cannot lag in the steady state, since it's the
> source of updates. This isn't a bad thing to realize, but it's just
> more mental overhead for the person who wants to list the lags for
> "all local stores".
>
> Another reason is that we could consider also reporting the lag for
> actives during recovery (when they would have non-zero lag). We don't
> have to now, but if we choose to call the method "standby lags", then
> we can't make this choice in the future.
>
> That said, it's just my opinion. I'm fine either way.
>
> 20. Vinoth's reply makes sense to me, fwiw.
>
> Beyond these two points, I'm happy with the current proposal.
>
> Thanks again,
> -John
>
> On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar 
> wrote:
> >
> > 10. I considered that. Had to pick one or the other. Can just return
> > standby too and rename method to may be “allLocalStandbyOffsetLags()” to
> > have it explicit. (Standby should implicitly convey that we are talking
> > about stores)
> >
> > 20. What I meant was, we are returning HostInfo instead of
> StreamsMetadata
> > since thats sufficient to route query; same for “int partition “ vs topic
> > partition before. Previously KeyQueryMetadata had similar structure but
> > used StreamsMetadata and TopicPartition objects to convey same
> information
> >
> > @navinder KIP is already upto date with the email I sent, except for the
> > reasonings I was laying out. +1 on revisiting rejected alternatives.
> > Please make the follow up changes
> >
> > On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for the summary Vinoth!
> > >
> > > I buy the overall argument. Couple of clarification questions:
> > >
> > >
> > > 10. Why do we need to include the active stores in
> > > `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag
> > > for standbys?
> > >
> > >
> > > 20: What does
> > >
> > > > Thin the KeyQueryMetadata object to just contain the minimum
> information
> > > > needed.
> > >
> > > exaclty mean? What is the "minimum information needed" ?
> > >
> > >
> > > @Navinder: if you agree, can you update the KIP accoringly? With all
> the
> > > proposals, it's hard to keep track and it would be great to have the
> > > current proposal summarized in the wiki page.
> > >
> > > Please also update the "Rejected alternative" sections to avoid that we
> > > cycle back to old proposal (including the reason _why_ they got
> rejected).
> > >
> > >
> > > Thanks a lot!
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > > > Given we have had a healthy discussion on this topic for a month now
> and
> > > > still with many loose ends and open ended conversations, I thought It
> > > would
> > > > be worthwhile to take a step back and re-evaluate everything in the
> > > context
> > > > of the very real use-case and its specific scenarios.
> > > >
> > > > First, let's remind ourselves of the query routing flow of the
> streams
> > > > applic

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-14 Thread Vinoth Chandar
10. I considered that. Had to pick one or the other. Can just return
standby too and rename method to may be “allLocalStandbyOffsetLags()” to
have it explicit. (Standby should implicitly convey that we are talking
about stores)

20. What I meant was, we are returning HostInfo instead of StreamsMetadata
since thats sufficient to route query; same for “int partition “ vs topic
partition before. Previously KeyQueryMetadata had similar structure but
used StreamsMetadata and TopicPartition objects to convey same information

@navinder KIP is already upto date with the email I sent, except for the
reasonings I was laying out. +1 on revisiting rejected alternatives.
Please make the follow up changes

On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax 
wrote:

> Thanks for the summary Vinoth!
>
> I buy the overall argument. Couple of clarification questions:
>
>
> 10. Why do we need to include the active stores in
> `allLocalStoreOffsetLags()`? Would it not be simpler to just return lag
> for standbys?
>
>
> 20: What does
>
> > Thin the KeyQueryMetadata object to just contain the minimum information
> > needed.
>
> exaclty mean? What is the "minimum information needed" ?
>
>
> @Navinder: if you agree, can you update the KIP accoringly? With all the
> proposals, it's hard to keep track and it would be great to have the
> current proposal summarized in the wiki page.
>
> Please also update the "Rejected alternative" sections to avoid that we
> cycle back to old proposal (including the reason _why_ they got rejected).
>
>
> Thanks a lot!
>
>
> -Matthias
>
>
>
> On 11/13/19 7:10 PM, Vinoth Chandar wrote:
> > Given we have had a healthy discussion on this topic for a month now and
> > still with many loose ends and open ended conversations, I thought It
> would
> > be worthwhile to take a step back and re-evaluate everything in the
> context
> > of the very real use-case and its specific scenarios.
> >
> > First, let's remind ourselves of the query routing flow of the streams
> > application ("app" here on)
> >
> >1. queries get routed to any random streams instance in the cluster
> >("router" here on)
> >2. router then uses Streams metadata to pick active/standby instances
> >for that key's store/partition
> >3. router instance also maintains global lag information for all
> stores
> >and all their partitions, by a gossip/broadcast/heartbeat protocol
> (done
> >outside of Streams framework), but using KafkaStreams#allMetadata()
> for
> >streams instance discovery.
> >4. router then uses information in 2 & 3 to determine which instance
> to
> >send the query to  : always picks active instance if alive or the most
> >in-sync live standby otherwise.
> >
> > Few things to note :
> >
> > A) We choose to decouple how the lag information is obtained (control
> > plane) from query path (data plane), since that provides more flexibility
> > in designing the control plane. i.e pick any or combination of gossip,
> > N-way broadcast, control the rate of propagation, piggybacking on request
> > responses
> > B) Since the app needs to do its own control plane, talking to other
> > instances directly for failure detection & exchanging other metadata, we
> > can leave the lag APIs added to KafkaStreams class itself local and
> simply
> > return lag for all store/partitions on that instance.
> > C) Streams preserves its existing behavior of instances only talking to
> > each other through the Kafka brokers.
> > D) Since the router treats active/standby differently, it would be good
> for
> > the KafkaStreams APIs to hand them back explicitly, with no additional
> > logic needed for computing them. Specifically, the router only knows two
> > things - key and store and if we just return a
> Collection
> > back, it cannot easily tease apart active and standby. Say, a streams
> > instance hosts the same store as both active and standby for different
> > partitions, matching by just storename the app will find it in both
> active
> > and standby lists.
> > E) From above, we assume the global lag estimate (lag per store topic
> > partition) are continuously reported amongst application instances and
> > already available on the router during step 2 above. Hence, attaching lag
> > APIs to StreamsMetadata is unnecessary and does not solve the needs
> anyway.
> > F) Currently returned StreamsMetadata object is really information about
> a
> > streams instance, that is not very specific to the key being queried.
> &

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-12 Thread Vinoth Chandar
gt; query a key without noticing.
> >
> >
> >
> > -Matthias
> >
> >
> > On 11/8/19 6:07 AM, Navinder Brar wrote:
> > > Thanks, Guozhang for going through it again.
> > >
> > >- 1.1 & 1.2: The main point of adding topicPartition in
> > KeyQueryMetadata is not topicName, but the partition number. I agree
> > changelog topicNames and store names will have 1-1 mapping but we also
> need
> > the partition number of the changelog for which are calculating the lag.
> > Now we can add partition number in StreamsMetadata but it will be
> > orthogonal to the definition of StreamsMetadata i.e.- “Represents the
> state
> > of an instance (process) in a {@link KafkaStreams} application.”  If we
> add
> > partition number in this, it doesn’t stay metadata for an instance,
> because
> > now it is storing the partition information for a key being queried. So,
> > having “KeyQueryMetadata” simplifies this as now it contains all the
> > metadata and also changelog and partition information for which we need
> to
> > calculate the lag.
> > >
> > > Another way is having another function in parallel to metadataForKey,
> > which returns the partition number for the key being queried. But then we
> > would need 2 calls to StreamsMetadataState, once to fetch metadata and
> > another to fetch partition number. Let me know if any of these two ways
> > seem more intuitive than KeyQueryMetadata then we can try to converge on
> > one.
> > >- 1.3:  Again, it is required for the partition number. We can drop
> > store name though.
> > >- 2.1: I think this was done in accordance with the opinion from
> John
> > as time lag would be better implemented with a broker level change and
> > offset change is readily implementable. @vinoth?
> > >- 2.2.1: Good point.  +1
> > >- 2.2.2: I am not well aware of it, @vinoth any comments?
> > >- 3.1: I think we have already agreed on dropping this, we need to
> > KIP. Also, is there any opinion on lagInfoForStore(String storeName) vs
> > lagInfoForStore(String storeName, int partition)
> > >- 3.2: But in functions such as onAssignment(),
> > onPartitionsAssigned(), for standbyTasks also the topicPartitions we use
> > are input topic partitions and not changelog partitions. Would this be
> > breaking from that semantics?
> > >
> > >
> > >On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang Wang <
> > wangg...@gmail.com> wrote:
> > >
> > >  Hi Navinder, Vinoth, thanks for the updated KIP!
> > >
> > > Read through the discussions so far and made another pass on the wiki
> > page,
> > > and here are some more comments:
> > >
> > > 1. About the public APIs:
> > >
> > > 1.1. It is not clear to me how allStandbyMetadataForStore
> > > and allStandbyMetadata would be differentiated from the original APIs
> > given
> > > that we will augment StreamsMetadata to include both active and standby
> > > topic-partitions and store names, so I think we can still use
> allMetadata
> > > and allMetadataForStore to get the collection of instance metadata that
> > > host the store both as active and standbys. Are there any specific use
> > > cases where we ONLY want to get the standby's metadata? And even if
> there
> > > are, we can easily filter it out from the allMetadata /
> > allMetadataForStore
> > > right?
> > >
> > > 1.2. Similarly I'm wondering for allMetadataForKey, can we return the
> > same
> > > type: "Collection" which includes 1 for active, and
> N-1
> > > for standbys, and callers can easily identify them by looking inside
> the
> > > StreamsMetadata objects? In addition I feel the "topicPartition" field
> > > inside "KeyQueryMetadata" is not very important since the changelog
> > > topic-name is always 1-1 mapping to the store name, so as long as the
> > store
> > > name matches, the changelog topic name should always match (i.e. in
> > > the pseudo code, just checking store names should be sufficient). If
> all
> > of
> > > the above assumption is true, I think we can save us from introducing
> one
> > > more public class here.
> > >
> > > 1.3. Similarly in StoreLagInfo, seems not necessary to include the
> topic
> > > partition name in addition to the store name.
> > >
> > > 2. About querying store lags: we've discussed about separating t

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-06 Thread Vinoth Chandar
most likely buffer up all writes to changelogs
> anyway and are only flushed on commit (including a flush of the
> producer). Hence, I would suggest to update the time-lag information
> based on the same strategy in the background. This way there is no
> additional config or methods and the user does not need to worry about
> it at all.
>
> To avoid refresh overhead if we don't need it (a user might not use IQ
> to begin with), it might be worth to maintain an internal flag
> `updateTimeLagEnabled` that is set to `false` initially and only set to
> `true` on the first call of a user to get standby-metadata.
>
>
> -Matthias
>
>
>
> On 11/4/19 5:13 PM, Vinoth Chandar wrote:
> >>>  I'm having some trouble wrapping my head around what race conditions
> > might occur, other than the fundamentally broken state in which different
> > instances are running totally different topologies.
> > 3. @both Without the topic partitions that the tasks can map back to, we
> > have to rely on topology/cluster metadata in each Streams instance to map
> > the task back. If the source topics are wild carded for e,g then each
> > instance could have different source topics in topology, until the next
> > rebalance happens. You can also read my comments from here
> >
> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
> >
> >
> >>> seems hard to imagine how encoding arbitrarily long topic names plus an
> > integer for the partition number could be as efficient as task ids, which
> > are just two integers.
> > 3. if you still have concerns about the efficacy of dictionary encoding,
> > happy to engage. The link above also has some benchmark code I used.
> > Theoretically, we would send each topic name atleast once, so yes if you
> > compare a 10-20 character topic name + an integer to two integers, it
> will
> > be more bytes. But its constant overhead proportional to size of topic
> name
> > and with 4,8,12, partitions the size difference between baseline
> (version 4
> > where we just repeated topic names for each topic partition) and the two
> > approaches becomes narrow.
> >
> >>> Plus, Navinder is going to implement a bunch of protocol code that we
> > might just want to change when the discussion actually does take place,
> if
> > ever.
> >>> it'll just be a mental burden for everyone to remember that we want to
> > have this follow-up discussion.
> > 3. Is n't people changing same parts of code and tracking follow ups a
> > common thing, we need to deal with anyway?  For this KIP, is n't it
> enough
> > to reason about whether the additional map on top of the topic dictionary
> > would incur more overhead than the sending task_ids? I don't think it's
> > case, both of them send two integers. As I see it, we can do a separate
> > follow up to (re)pursue the task_id conversion and get it working for
> both
> > maps within the next release?
> >
> >>> Can you elaborate on "breaking up the API"? It looks like there are
> > already separate API calls in the proposal, one for time-lag, and another
> > for offset-lag, so are they not already broken up?
> > The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo
> objects
> > which has both time and offset lags. If we had separate APIs, say (e.g
> > offsetLagForStore(), timeLagForStore()), we can implement offset version
> > using the offset lag that the streams instance already tracks i.e no need
> > for external calls. The time based lag API would incur the kafka read for
> > the timestamp. makes sense?
> >
> > Based on the discussions so far, I only see these two pending issues to
> be
> > aligned on. Is there any other open item people want to bring up?
> >
> > On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman  >
> > wrote:
> >
> >> Regarding 3) I'm wondering, does your concern still apply even now
> >> that the pluggable PartitionGrouper interface has been deprecated?
> >> Now that we can be sure that the DefaultPartitionGrouper is used to
> >> generate
> >> the taskId -> partitions mapping, we should be able to convert any
> taskId
> >> to any
> >> partitions.
> >>
> >> On Mon, Nov 4, 2019 at 11:17 AM John Roesler  wrote:
> >>
> >>> Hey Vinoth, thanks for the reply!
> >>>
> >>> 3.
> >>> I get that it's not the main focus of this KIP, but if it&#x

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-05 Thread Vinoth Chandar
Ping :) Any thoughts?

On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar  wrote:

> >>  I'm having some trouble wrapping my head around what race conditions
> might occur, other than the fundamentally broken state in which different
> instances are running totally different topologies.
> 3. @both Without the topic partitions that the tasks can map back to, we
> have to rely on topology/cluster metadata in each Streams instance to map
> the task back. If the source topics are wild carded for e,g then each
> instance could have different source topics in topology, until the next
> rebalance happens. You can also read my comments from here
> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
>
>
> >> seems hard to imagine how encoding arbitrarily long topic names plus an
> integer for the partition number could be as efficient as task ids, which
> are just two integers.
> 3. if you still have concerns about the efficacy of dictionary encoding,
> happy to engage. The link above also has some benchmark code I used.
> Theoretically, we would send each topic name atleast once, so yes if you
> compare a 10-20 character topic name + an integer to two integers, it will
> be more bytes. But its constant overhead proportional to size of topic name
> and with 4,8,12, partitions the size difference between baseline (version 4
> where we just repeated topic names for each topic partition) and the two
> approaches becomes narrow.
>
> >>Plus, Navinder is going to implement a bunch of protocol code that we
> might just want to change when the discussion actually does take place, if
> ever.
> >>it'll just be a mental burden for everyone to remember that we want to
> have this follow-up discussion.
> 3. Is n't people changing same parts of code and tracking follow ups a
> common thing, we need to deal with anyway?  For this KIP, is n't it enough
> to reason about whether the additional map on top of the topic dictionary
> would incur more overhead than the sending task_ids? I don't think it's
> case, both of them send two integers. As I see it, we can do a separate
> follow up to (re)pursue the task_id conversion and get it working for both
> maps within the next release?
>
> >>Can you elaborate on "breaking up the API"? It looks like there are
> already separate API calls in the proposal, one for time-lag, and another
> for offset-lag, so are they not already broken up?
> The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo
> objects which has both time and offset lags. If we had separate APIs, say
> (e.g offsetLagForStore(), timeLagForStore()), we can implement offset
> version using the offset lag that the streams instance already tracks i.e
> no need for external calls. The time based lag API would incur the kafka
> read for the timestamp. makes sense?
>
> Based on the discussions so far, I only see these two pending issues to be
> aligned on. Is there any other open item people want to bring up?
>
> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman 
> wrote:
>
>> Regarding 3) I'm wondering, does your concern still apply even now
>> that the pluggable PartitionGrouper interface has been deprecated?
>> Now that we can be sure that the DefaultPartitionGrouper is used to
>> generate
>> the taskId -> partitions mapping, we should be able to convert any taskId
>> to any
>> partitions.
>>
>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler  wrote:
>>
>> > Hey Vinoth, thanks for the reply!
>> >
>> > 3.
>> > I get that it's not the main focus of this KIP, but if it's ok, it
>> > would be nice to hash out this point right now. It only came up
>> > because this KIP-535 is substantially extending the pattern in
>> > question. If we push it off until later, then the reviewers are going
>> > to have to suspend their concerns not just while voting for the KIP,
>> > but also while reviewing the code. Plus, Navinder is going to
>> > implement a bunch of protocol code that we might just want to change
>> > when the discussion actually does take place, if ever. Finally, it'll
>> > just be a mental burden for everyone to remember that we want to have
>> > this follow-up discussion.
>> >
>> > It makes sense what you say... the specific assignment is already
>> > encoded in the "main" portion of the assignment, not in the "userdata"
>> > part. It also makes sense that it's simpler to reason about races if
>> > you simply g

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Vinoth Chandar
>>  I'm having some trouble wrapping my head around what race conditions
might occur, other than the fundamentally broken state in which different
instances are running totally different topologies.
3. @both Without the topic partitions that the tasks can map back to, we
have to rely on topology/cluster metadata in each Streams instance to map
the task back. If the source topics are wild carded for e,g then each
instance could have different source topics in topology, until the next
rebalance happens. You can also read my comments from here
https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106


>> seems hard to imagine how encoding arbitrarily long topic names plus an
integer for the partition number could be as efficient as task ids, which
are just two integers.
3. if you still have concerns about the efficacy of dictionary encoding,
happy to engage. The link above also has some benchmark code I used.
Theoretically, we would send each topic name atleast once, so yes if you
compare a 10-20 character topic name + an integer to two integers, it will
be more bytes. But its constant overhead proportional to size of topic name
and with 4,8,12, partitions the size difference between baseline (version 4
where we just repeated topic names for each topic partition) and the two
approaches becomes narrow.

>>Plus, Navinder is going to implement a bunch of protocol code that we
might just want to change when the discussion actually does take place, if
ever.
>>it'll just be a mental burden for everyone to remember that we want to
have this follow-up discussion.
3. Is n't people changing same parts of code and tracking follow ups a
common thing, we need to deal with anyway?  For this KIP, is n't it enough
to reason about whether the additional map on top of the topic dictionary
would incur more overhead than the sending task_ids? I don't think it's
case, both of them send two integers. As I see it, we can do a separate
follow up to (re)pursue the task_id conversion and get it working for both
maps within the next release?

>>Can you elaborate on "breaking up the API"? It looks like there are
already separate API calls in the proposal, one for time-lag, and another
for offset-lag, so are they not already broken up?
The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo objects
which has both time and offset lags. If we had separate APIs, say (e.g
offsetLagForStore(), timeLagForStore()), we can implement offset version
using the offset lag that the streams instance already tracks i.e no need
for external calls. The time based lag API would incur the kafka read for
the timestamp. makes sense?

Based on the discussions so far, I only see these two pending issues to be
aligned on. Is there any other open item people want to bring up?

On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman 
wrote:

> Regarding 3) I'm wondering, does your concern still apply even now
> that the pluggable PartitionGrouper interface has been deprecated?
> Now that we can be sure that the DefaultPartitionGrouper is used to
> generate
> the taskId -> partitions mapping, we should be able to convert any taskId
> to any
> partitions.
>
> On Mon, Nov 4, 2019 at 11:17 AM John Roesler  wrote:
>
> > Hey Vinoth, thanks for the reply!
> >
> > 3.
> > I get that it's not the main focus of this KIP, but if it's ok, it
> > would be nice to hash out this point right now. It only came up
> > because this KIP-535 is substantially extending the pattern in
> > question. If we push it off until later, then the reviewers are going
> > to have to suspend their concerns not just while voting for the KIP,
> > but also while reviewing the code. Plus, Navinder is going to
> > implement a bunch of protocol code that we might just want to change
> > when the discussion actually does take place, if ever. Finally, it'll
> > just be a mental burden for everyone to remember that we want to have
> > this follow-up discussion.
> >
> > It makes sense what you say... the specific assignment is already
> > encoded in the "main" portion of the assignment, not in the "userdata"
> > part. It also makes sense that it's simpler to reason about races if
> > you simply get all the information about the topics and partitions
> > directly from the assignor, rather than get the partition number from
> > the assignor and the topic name from your own a priori knowledge of
> > the topology. On the other hand, I'm having some trouble wrapping my
> > head around what race conditions might occur, other than the
> > fundamentally broken state in which different instances are running
&g

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-11-04 Thread Vinoth Chandar
3. Right now, we still get the topic partitions assigned as a part of the
top level Assignment object (the one that wraps AssignmentInfo) and use
that to convert taskIds back. This list of only contains assignments for
that particular instance. Attempting to also reverse map for "all" the
tasksIds in the streams cluster i.e all the topic partitions in these
global assignment maps was what was problematic. By explicitly sending the
global assignment maps as actual topic partitions,  group coordinator (i.e
the leader that computes the assignment's ) is able to consistently enforce
its view of the topic metadata. Still don't think doing such a change that
forces you to reconsider semantics, is not needed to save bits on wire. May
be we can discuss this separately from this KIP?

4. There needs to be some caching/interval somewhere though since we don't
want to make 1 kafka read per 1 IQ potentially. But I think its a valid
suggestion, to make this call just synchronous and leave the caching or how
often you want to call to the application. Would it be good to then break
up the APIs for time and offset based lag?  We can obtain offset based lag
for free? Only incur the overhead of reading kafka if we want time
based lags?

On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman 
wrote:

> Adding on to John's response to 3), can you clarify when and why exactly we
> cannot
> convert between taskIds and partitions? If that's really the case I don't
> feel confident
> that the StreamsPartitionAssignor is not full of bugs...
>
> It seems like it currently just encodes a list of all partitions (the
> assignment) and also
> a list of the corresponding task ids, duplicated to ensure each partition
> has the corresponding
> taskId at the same offset into the list. Why is that problematic?
>
>
> On Fri, Nov 1, 2019 at 12:39 PM John Roesler  wrote:
>
> > Thanks, all, for considering the points!
> >
> > 3. Interesting. I have a vague recollection of that... Still, though,
> > it seems a little fishy. After all, we return the assignments
> > themselves as task ids, and the members have to map these to topic
> > partitions in order to configure themselves properly. If it's too
> > complicated to get this right, then how do we know that Streams is
> > computing the correct partitions at all?
> >
> > 4. How about just checking the log-end timestamp when you call the
> > method? Then, when you get an answer, it's as fresh as it could
> > possibly be. And as a user you have just one, obvious, "knob" to
> > configure how much overhead you want to devote to checking... If you
> > want to call the broker API less frequently, you just call the Streams
> > API less frequently. And you don't have to worry about the
> > relationship between your invocations of that method and the config
> > setting (e.g., you'll never get a negative number, which you could if
> > you check the log-end timestamp less frequently than you check the
> > lag).
> >
> > Thanks,
> > -John
> >
> > On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
> >  wrote:
> > >
> > > Thanks John for going through this.
> > >
> > >- +1, makes sense
> > >- +1, no issues there
> > >- Yeah the initial patch I had submitted for K-7149(
> > https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo
> > object had taskIds but the merged PR had similar size according to Vinoth
> > and it was simpler so if the end result is of same size, it would not
> make
> > sense to pivot from dictionary and again move to taskIDs.
> > >- Not sure about what a good default would be if we don't have a
> > configurable setting. This gives the users the flexibility to the users
> to
> > serve their requirements as at the end of the day it would take CPU
> cycles.
> > I am ok with starting it with a default and see how it goes based upon
> > feedback.
> > >
> > > Thanks,
> > > Navinder
> > > On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar <
> > vchan...@confluent.io> wrote:
> > >
> > >  1. Was trying to spell them out separately. but makes sense for
> > > readability. done
> > >
> > > 2. No I immediately agree :) .. makes sense. @navinder?
> > >
> > > 3. I actually attempted only sending taskIds while working on
> KAFKA-7149.
> > > Its non-trivial to handle edges cases resulting from newly added topic
> > > partitions and wildcarded topic entries. I ended up simplifying it to
> > just
> > > dictionary encoding the topic names

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread Vinoth Chandar
1. Was trying to spell them out separately. but makes sense for
readability. done

2. No I immediately agree :) .. makes sense. @navinder?

3. I actually attempted only sending taskIds while working on KAFKA-7149.
Its non-trivial to handle edges cases resulting from newly added topic
partitions and wildcarded topic entries. I ended up simplifying it to just
dictionary encoding the topic names to reduce size. We can apply the same
technique here for this map. Additionally, we could also dictionary encode
HostInfo, given its now repeated twice. I think this would save more space
than having a flag per topic partition entry. Lmk if you are okay with
this.

4. This opens up a good discussion. Given we support time lag estimates
also, we need to read the tail record of the changelog periodically (unlike
offset lag, which we can potentially piggyback on metadata in
ConsumerRecord IIUC). we thought we should have a config that control how
often this read happens? Let me know if there is a simple way to get
timestamp value of the tail record that we are missing.

On Thu, Oct 31, 2019 at 12:58 PM John Roesler  wrote:

> Hey Navinder,
>
> Thanks for updating the KIP, it's a lot easier to see the current
> state of the proposal now.
>
> A few remarks:
> 1. I'm sure it was just an artifact of revisions, but you have two
> separate sections where you list additions to the KafkaStreams
> interface. Can you consolidate those so we can see all the additions
> at once?
>
> 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead,
> to be clearer that we're specifically measuring a number of offsets?
> If you don't immediately agree, then I'd at least point out that we
> usually refer to elements of Kafka topics as "records", not
> "messages", so "recordLagEstimate" might be more appropriate.
>
> 3. The proposal mentions adding a map of the standby _partitions_ for
> each host to AssignmentInfo. I assume this is designed to mirror the
> existing "partitionsByHost" map. To keep the size of these metadata
> messages down, maybe we can consider making two changes:
> (a) for both actives and standbys, encode the _task ids_ instead of
> _partitions_. Every member of the cluster has a copy of the topology,
> so they can convert task ids into specific partitions on their own,
> and task ids are only (usually) three characters.
> (b) instead of encoding two maps (hostinfo -> actives AND hostinfo ->
> standbys), which requires serializing all the hostinfos twice, maybe
> we can pack them together in one map with a structured value (hostinfo
> -> [actives,standbys]).
> Both of these ideas still require bumping the protocol version to 6,
> and they basically mean we drop the existing `PartitionsByHost` field
> and add a new `TasksByHost` field with the structured value I
> mentioned.
>
> 4. Can we avoid adding the new "lag refresh" config? The lags would
> necessarily be approximate anyway, so adding the config seems to
> increase the operational complexity of the system for little actual
> benefit.
>
> Thanks for the pseudocode, by the way, it really helps visualize how
> these new interfaces would play together. And thanks again for the
> update!
> -John
>
> On Thu, Oct 31, 2019 at 2:41 PM John Roesler  wrote:
> >
> > Hey Vinoth,
> >
> > I started going over the KIP again yesterday. There are a lot of
> > updates, and I didn't finish my feedback in one day. I'm working on it
> > now.
> >
> > Thanks,
> > John
> >
> > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar 
> wrote:
> > >
> > > Wondering if anyone has thoughts on these changes? I liked that the new
> > > metadata fetch APIs provide all the information at once with consistent
> > > naming..
> > >
> > > Any guidance on what you would like to be discussed or fleshed out more
> > > before we call a VOTE?
> > >
> > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
> > >  wrote:
> > >
> > > > Hi,
> > > > We have made some edits in the KIP(
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> )
> > > > after due deliberation on the agreed design to support the new query
> > > > design. This includes the new public API to query offset/time lag
> > > > information and other details related to querying standby tasks
> which have
> > > > come up after thinking of thorough details.
> > > >
> > > >
> > > >
> > > >- Addition of new config, “lag.fetch.interval

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-31 Thread Vinoth Chandar
Wondering if anyone has thoughts on these changes? I liked that the new
metadata fetch APIs provide all the information at once with consistent
naming..

Any guidance on what you would like to be discussed or fleshed out more
before we call a VOTE?

On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
 wrote:

> Hi,
> We have made some edits in the KIP(
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance)
> after due deliberation on the agreed design to support the new query
> design. This includes the new public API to query offset/time lag
> information and other details related to querying standby tasks which have
> come up after thinking of thorough details.
>
>
>
>- Addition of new config, “lag.fetch.interval.ms” to configure the
> interval of time/offset lag
>- Addition of new class StoreLagInfo to store the periodically obtained
> time/offset lag
>- Addition of two new functions in KafkaStreams, List
> allLagInfo() and List lagInfoForStore(String storeName) to
> return the lag information for an instance and a store respectively
>- Addition of new class KeyQueryMetadata. We need topicPartition for
> each key to be matched with the lag API for the topic partition. One way is
> to add new functions and fetch topicPartition from StreamsMetadataState but
> we thought having one call and fetching StreamsMetadata and topicPartition
> is more cleaner.
>-
> Renaming partitionsForHost to activePartitionsForHost in StreamsMetadataState
> and partitionsByHostState to activePartitionsByHostState
> in StreamsPartitionAssignor
>- We have also added the pseudo code of how all the changes will exist
> together and support the new querying APIs
>
> Please let me know if anything is pending now, before a vote can be
> started on this.   On Saturday, 26 October, 2019, 05:41:44 pm IST, Navinder
> Brar  wrote:
>
>  >> Since there are two soft votes for separate active/standby API
> methods, I also change my position on that. Fine with 2 separate
> methods. Once we remove the lag information from these APIs, returning a
> List is less attractive, since the ordering has no special meaning now.
> Agreed, now that we are not returning lag, I am also sold on having two
> separate functions. We already have one which returns streamsMetadata for
> active tasks, and now we can add another one for standbys.
>
>
>
> On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar <
> vchan...@confluent.io> wrote:
>
>  +1 to Sophie's suggestion. Having both lag in terms of time and offsets is
> good and makes for a more complete API.
>
> Since there are two soft votes for separate active/standby API methods, I
> also change my position on that. Fine with 2 separate methods.
> Once we remove the lag information from these APIs, returning a List is
> less attractive, since the ordering has no special meaning now.
>
> >> lag in offsets vs time: Having both, as suggested by Sophie would of
> course be best. What is a little unclear to me is, how in details are we
> going to compute both?
> @navinder may be next step is to flesh out these details and surface any
> larger changes we need to make if need be.
>
> Any other details we need to cover, before a VOTE can be called on this?
>
>
> On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck  wrote:
>
> > I am jumping in a little late here.
> >
> > Overall I agree with the proposal to push decision making on what/how to
> > query in the query layer.
> >
> > For point 5 from above, I'm slightly in favor of having a new method,
> > "standbyMetadataForKey()" or something similar.
> > Because even if we return all tasks in one list, the user will still have
> > to perform some filtering to separate the different tasks, so I don't
> feel
> > making two calls is a burden, and IMHO makes things more transparent for
> > the user.
> > If the final vote is for using an "isActive" field, I'm good with that as
> > well.
> >
> > Just my 2 cents.
> >
> > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
> >  wrote:
> >
> > > I think now we are aligned on almost all the design parts. Summarising
> > > below what has been discussed above and we have a general consensus on.
> > >
> > >
> > >- Rather than broadcasting lag across all nodes at rebalancing/with
> > the
> > > heartbeat, we will just return a list of all available standby’s in the
> > > system and the user can make IQ query any of those nodes which will
> > return
> > > the response, and the lag and offset time. Based 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-25 Thread Vinoth Chandar
+1 to Sophie's suggestion. Having both lag in terms of time and offsets is
good and makes for a more complete API.

Since there are two soft votes for separate active/standby API methods, I
also change my position on that. Fine with 2 separate methods.
Once we remove the lag information from these APIs, returning a List is
less attractive, since the ordering has no special meaning now.

>> lag in offsets vs time: Having both, as suggested by Sophie would of
course be best. What is a little unclear to me is, how in details are we
going to compute both?
@navinder may be next step is to flesh out these details and surface any
larger changes we need to make if need be.

Any other details we need to cover, before a VOTE can be called on this?


On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck  wrote:

> I am jumping in a little late here.
>
> Overall I agree with the proposal to push decision making on what/how to
> query in the query layer.
>
> For point 5 from above, I'm slightly in favor of having a new method,
> "standbyMetadataForKey()" or something similar.
> Because even if we return all tasks in one list, the user will still have
> to perform some filtering to separate the different tasks, so I don't feel
> making two calls is a burden, and IMHO makes things more transparent for
> the user.
> If the final vote is for using an "isActive" field, I'm good with that as
> well.
>
> Just my 2 cents.
>
> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
>  wrote:
>
> > I think now we are aligned on almost all the design parts. Summarising
> > below what has been discussed above and we have a general consensus on.
> >
> >
> >- Rather than broadcasting lag across all nodes at rebalancing/with
> the
> > heartbeat, we will just return a list of all available standby’s in the
> > system and the user can make IQ query any of those nodes which will
> return
> > the response, and the lag and offset time. Based on which user can decide
> > if he wants to return the response back or call another standby.
> >-  The current metadata query frequency will not change. It will be
> the
> > same as it does now, i.e. before each query.
> >
> >-  For fetching list in StreamsMetadataState.java and
> > List in StreamThreadStateStoreProvider.java
> (which
> > will return all active stores which are running/restoring and replica
> > stores which are running), we will add new functions and not disturb the
> > existing functions
> >
> >- There is no need to add new StreamsConfig for implementing this KIP
> >
> >- We will add standbyPartitionsByHost in AssignmentInfo and
> > StreamsMetadataState which would change the existing rebuildMetadata()
> and
> > setPartitionsByHostState()
> >
> >
> >
> > If anyone has any more concerns please feel free to add. Post this I will
> > be initiating a vote.
> > ~Navinder
> >
> > On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax <
> > matth...@confluent.io> wrote:
> >
> >  Just to close the loop @Vinoth:
> >
> > > 1. IIUC John intends to add (or we can do this in this KIP) lag
> > information
> > > to AssignmentInfo, which gets sent to every participant.
> >
> > As explained by John, currently KIP-441 plans to only report the
> > information to the leader. But I guess, with the new proposal to not
> > broadcast this information anyway, this concern is invalidated anyway
> >
> > > 2. At-least I was under the assumption that it can be called per query,
> > > since the API docs don't seem to suggest otherwise. Do you see any
> > > potential issues if we call this every query? (we should benchmark this
> > > nonetheless)
> >
> > I did not see a real issue if people refresh the metadata frequently,
> > because it would be a local call. My main point was, that this would
> > change the current usage pattern of the API, and we would clearly need
> > to communicate this change. Similar to (1), this concern in invalidated
> > anyway.
> >
> >
> > @John: I think it's a great idea to get rid of reporting lag, and
> > pushing the decision making process about "what to query" into the query
> > serving layer itself. This simplifies the overall design of this KIP
> > significantly, and actually aligns very well with the idea that Kafka
> > Streams (as it is a library) should only provide the basic building
> > block. Many of my raised questions are invalided by this.
> >
> >
> >
> > Some questions are still open though:
> >
> > > 

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-24 Thread Vinoth Chandar
+1 As someone implementing a query routing layer, there is already a need
to have mechanisms in place to do healthchecks/failure detection to detect
failures for queries, while Streams rebalancing eventually kicks in the
background.
So, pushing this complexity to the IQ client app keeps Streams simpler as
well. IQs will be potentially issues at an order of magnitude more
frequently and it can achieve good freshness for the lag information.

I would like to add however, that we would also need to introduce apis in
KafkaStreams class, for obtaining lag information for all stores local to
that host. This is for the IQs to relay back with the response/its own
heartbeat mechanism.

On Thu, Oct 24, 2019 at 3:12 PM John Roesler  wrote:

> Hi all,
>
> I've been mulling about this KIP, and I think I was on the wrong track
> earlier with regard to task lags. Tl;dr: I don't think we should add
> lags at all to the metadata API (and also not to the AssignmentInfo
> protocol message).
>
> Like I mentioned early on, reporting lag via
> SubscriptionInfo/AssignmentInfo would only work while rebalances are
> happening. Once the group stabilizes, no members would be notified of
> each others' lags anymore. I had been thinking that the solution would
> be the heartbeat proposal I mentioned earlier, but that proposal would
> have reported the heartbeats of the members only to the leader member
> (the one who makes assignments). To be useful in the context of _this_
> KIP, we would also have to report the lags in the heartbeat responses
> to of _all_ members. This is a concern to be because now _all_ the
> lags get reported to _all_ the members on _every_ heartbeat... a lot
> of chatter.
>
> Plus, the proposal for KIP-441 is only to report the lags of each
> _task_. This is the sum of the lags of all the stores in the tasks.
> But this would be insufficient for KIP-535. For this kip, we would
> want the lag specifically of the store we want to query. So this
> means, we have to report the lags of all the stores of all the members
> to every member... even more chatter!
>
> The final nail in the coffin to me is that IQ clients would have to
> start refreshing their metadata quite frequently to stay up to date on
> the lags, which adds even more overhead to the system.
>
> Consider a strawman alternative: we bring KIP-535 back to extending
> the metadata API to tell the client the active and standby replicas
> for the key in question (not including and "staleness/lag"
> restriction, just returning all the replicas). Then, the client picks
> a replica and sends the query. The server returns the current lag
> along with the response (maybe in an HTML header or something). Then,
> the client keeps a map of its last observed lags for each replica, and
> uses this information to prefer fresher replicas.
>
> OR, if it wants only to query the active replica, it would throw an
> error on any lag response greater than zero, refreshes its metadata by
> re-querying the metadata API, and tries again with the current active
> replica.
>
> This way, the lag information will be super fresh for the client, and
> we keep the Metadata API / Assignment,Subscription / and Heartbeat as
> slim as possible.
>
> Side note: I do think that some time soon, we'll have to add a library
> for IQ server/clients. I think that this logic will start to get
> pretty complex.
>
> I hope this thinking is reasonably clear!
> Thanks again,
> -John
>
> Does that
>
> On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar 
> wrote:
> >
> > Responding to the points raised by Matthias
> >
> > 1. IIUC John intends to add (or we can do this in this KIP) lag
> information
> > to AssignmentInfo, which gets sent to every participant.
> >
> > 2. At-least I was under the assumption that it can be called per query,
> > since the API docs don't seem to suggest otherwise. Do you see any
> > potential issues if we call this every query? (we should benchmark this
> > nonetheless)
> >
> > 4. Agree. metadataForKey() implicitly would return the active host
> metadata
> > (as it was before). We should also document this in that APIs javadoc,
> > given we have another method(s) that returns more host metadata now.
> >
> > 5.  While I see the point, the app/caller has to make two different APIs
> > calls to obtain active/standby and potentially do the same set of
> operation
> > to query the state. I personally still like a method like isActive()
> > better, but don't have strong opinions.
> >
> > 9. If we do expose the lag information, could we just leave it upto to
> the
> > caller to decide whether it errors out or not and not make t

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-23 Thread Vinoth Chandar
 tasks. How can a user actually get a handle
> >> to an store of an active(restoring) or standby task for querying? Seems
> >> we should add a new method to get standby handles? Changing the
> >> semantics to existing `state()` would be possible, but I think adding a
> >> new method is preferable?
> >>
> >> 9) How does the user actually specify the acceptable lag? A global
> >> config via StreamsConfig (this would be a public API change that needs
> >> to be covered in the KIP)? Or on a per-store or even per-query basis for
> >> more flexibility? We could also have a global setting that is used as
> >> default and allow to overwrite it on a per-query basis.
> >>
> >> 10) Do we need to distinguish between active(restoring) and standby
> >> tasks? Or could be treat both as the same?
> >>
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 10/21/19 5:40 PM, Vinoth Chandar wrote:
> >>>>> I'm wondering, rather than putting "acceptable lag" into the
> >>> configuration at all, or even making it a parameter on
> `allMetadataForKey`,
> >>> why not just _always_ return all available metadata (including
> >>> active/standby or lag) and let the caller decide to which node they
> want to
> >>> route the query?
> >>> +1 on exposing lag information via the APIs. IMO without having
> >>> continuously updated/fresh lag information, its true value as a signal
> for
> >>> query routing decisions is much limited. But we can design the API
> around
> >>> this model and iterate? Longer term, we should have continuously
> shared lag
> >>> information.
> >>>
> >>>>> more general to refactor it to "allMetadataForKey(long
> >>> tolerableDataStaleness, ...)", and when it's set to 0 it means "active
> task
> >>> only".
> >>> +1 IMO if we plan on having `enableReplicaServing`, it makes sense to
> >>> generalize based on dataStaleness. This seems complementary to
> exposing the
> >>> lag information itself.
> >>>
> >>>>> This is actually not a public api change at all, and I'm planning to
> >>> implement it asap as a precursor to the rest of KIP-441
> >>> +1 again. Do we have a concrete timeline for when this change will
> land on
> >>> master? I would like to get the implementation wrapped up (as much as
> >>> possible) by end of the month. :). But I agree this sequencing makes
> >>> sense..
> >>>
> >>>
> >>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang 
> wrote:
> >>>
> >>>> Hi Navinder,
> >>>>
> >>>> Thanks for the KIP, I have a high level question about the proposed
> API
> >>>> regarding:
> >>>>
> >>>> "StreamsMetadataState::allMetadataForKey(boolean
> enableReplicaServing...)"
> >>>>
> >>>> I'm wondering if it's more general to refactor it to
> >>>> "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's
> set to
> >>>> 0 it means "active task only". Behind the scene, we can have the
> committed
> >>>> offsets to encode the stream time as well, so that when processing
> standby
> >>>> tasks the stream process knows not long the lag in terms of offsets
> >>>> comparing to the committed offset (internally we call it offset
> limit), but
> >>>> also the lag in terms of timestamp diff comparing the committed
> offset.
> >>>>
> >>>> Also encoding the timestamp as part of offset have other benefits for
> >>>> improving Kafka Streams time semantics as well, but for KIP-535
> itself I
> >>>> think it can help giving users a more intuitive interface to reason
> about.
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>> On Mon, Oct 21, 2019 at 12:30 PM John Roesler 
> wrote:
> >>>>
> >>>>> Hey Navinder,
> >>>>>
> >>>>> Thanks for the KIP! I've been reading over the discussion thus far,
> >>>>> and I have a couple of thoughts to pile on as well:
> >>>>>
> >>>>> It seems confusing to propose the API in terms of the current system
> >>>>> state, but also propose how the API w

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-21 Thread Vinoth Chandar
ould stop being propagated when the cluster
> > achieves a balanced task distribution. There was a follow-on idea I
> > POCed to continuously share lag information in the heartbeat protocol,
> > which you might be interested in, if you want to make sure that nodes
> > are basically _always_ aware of each others' lag on different
> > partitions: https://github.com/apache/kafka/pull/7096
> >
> > Thanks again!
> > -John
> >
> >
> > On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar
> >  wrote:
> > >
> > > Thanks, Vinoth. Looks like we are on the same page. I will add some of
> > these explanations to the KIP as well. Have assigned the KAFKA-6144 to
> > myself and KAFKA-8994 is closed(by you). As suggested, we will replace
> > "replica" with "standby".
> > >
> > > In the new API, "StreamsMetadataState::allMetadataForKey(boolean
> > enableReplicaServing, String storeName, K key, Serializer
> > keySerializer)" Do we really need a per key configuration? or a new
> > StreamsConfig is good enough?>> Coming from experience, when teams are
> > building a platform with Kafka Streams and these API's serve data to
> > multiple teams, we can't have a generalized config that says as a
> platform
> > we will support stale reads or not. It should be the choice of someone
> who
> > is calling the API's to choose whether they are ok with stale reads or
> not.
> > Makes sense?
> > > On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar <
> > vchan...@confluent.io> wrote:
> > >
> > >  Looks like we are covering ground :)
> > >
> > > >>Only if it is within a permissible  range(say 1) we will serve
> from
> > > Restoring state of active.
> > > +1 on having a knob like this.. My reasoning is as follows.
> > >
> > > Looking at the Streams state as a read-only distributed kv store. With
> > > num_standby = f , we should be able to tolerate f failures and if there
> > is
> > > a f+1' failure, the system should be unavailable.
> > >
> > > A) So with num_standby=0, the system should be unavailable even if
> there
> > is
> > > 1 failure and thats my argument for not allowing querying in
> restoration
> > > state, esp in this case it will be a total rebuild of the state (which
> > IMO
> > > cannot be considered a normal fault free operational state).
> > >
> > > B) Even there are standby's, say num_standby=2, if the user decides to
> > shut
> > > down all 3 instances, then only outcome should be unavailability until
> > all
> > > of them come back or state is rebuilt on other nodes in the cluster. In
> > > normal operations, f <= 2 and when a failure does happen we can then
> > either
> > > choose to be C over A and fail IQs until replication is fully caught up
> > or
> > > choose A over C by serving in restoring state as long as lag is
> minimal.
> > If
> > > even with f=1 say, all the standbys are lagging a lot due to some
> issue,
> > > then that should be considered a failure since that is different from
> > > normal/expected operational mode. Serving reads with unbounded
> > replication
> > > lag and calling it "available" may not be very usable or even desirable
> > :)
> > > IMHO, since it gives the user no way to reason about the app that is
> > going
> > > to query this store.
> > >
> > > So there is definitely a need to distinguish between :  Replication
> > catchup
> > > while being in fault free state vs Restoration of state when we lose
> more
> > > than f standbys. This knob is a great starting point towards this.
> > >
> > > If you agree with some of the explanation above, please feel free to
> > > include it in the KIP as well since this is sort of our design
> principle
> > > here..
> > >
> > > Small nits :
> > >
> > > - let's standardize on "standby" instead of "replica", KIP or code,  to
> > be
> > > consistent with rest of Streams code/docs?
> > > - Can we merge KAFKA-8994 into KAFKA-6144 now and close the former?
> > > Eventually need to consolidate KAFKA-6555 as well
> > > - In the new API, "StreamsMetadataState::allMetadataForKey(boolean
> > > enableReplicaServing, String storeName, K key, Serializer
> > keySerializer)" Do
> > > we really need a per key configuration? or a new S

[jira] [Resolved] (KAFKA-8994) Streams should expose standby replication information & allow stale reads of state store

2019-10-18 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar resolved KAFKA-8994.
---
Resolution: Duplicate

> Streams should expose standby replication information & allow stale reads of 
> state store
> 
>
> Key: KAFKA-8994
> URL: https://issues.apache.org/jira/browse/KAFKA-8994
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>  Labels: needs-kip
>
> Currently Streams interactive queries (IQ) fail during the time period where 
> there is a rebalance in progress. 
> Consider the following scenario in a three node Streams cluster with node A, 
> node S and node R, executing a stateful sub-topology/topic group with 1 
> partition and `_num.standby.replicas=1_`  
>  * *t0*: A is the active instance owning the partition, B is the standby that 
> keeps replicating the A's state into its local disk, R just routes streams 
> IQs to active instance using StreamsMetadata
>  * *t1*: IQs pick node R as router, R forwards query to A, A responds back to 
> R which reverse forwards back the results.
>  * *t2:* Active A instance is killed and rebalance begins. IQs start failing 
> to A
>  * *t3*: Rebalance assignment happens and standby B is now promoted as active 
> instance. IQs continue to fail
>  * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
> commit position, IQs continue to fail
>  * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
> start succeeding again
>  
> Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
> take few seconds (~10 seconds based on defaults values). Depending on how 
> laggy the standby B was prior to A being killed, t4 can take few 
> seconds-minutes. 
> While this behavior favors consistency over availability at all times, the 
> long unavailability window might be undesirable for certain classes of 
> applications (e.g simple caches or dashboards). 
> This issue aims to also expose information about standby B to R, during each 
> rebalance such that the queries can be routed by an application to a standby 
> to serve stale reads, choosing availability over consistency. 
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-8994) Streams should expose standby replication information & allow stale reads of state store

2019-10-18 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar reopened KAFKA-8994:
---

> Streams should expose standby replication information & allow stale reads of 
> state store
> 
>
> Key: KAFKA-8994
> URL: https://issues.apache.org/jira/browse/KAFKA-8994
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>  Labels: needs-kip
>
> Currently Streams interactive queries (IQ) fail during the time period where 
> there is a rebalance in progress. 
> Consider the following scenario in a three node Streams cluster with node A, 
> node S and node R, executing a stateful sub-topology/topic group with 1 
> partition and `_num.standby.replicas=1_`  
>  * *t0*: A is the active instance owning the partition, B is the standby that 
> keeps replicating the A's state into its local disk, R just routes streams 
> IQs to active instance using StreamsMetadata
>  * *t1*: IQs pick node R as router, R forwards query to A, A responds back to 
> R which reverse forwards back the results.
>  * *t2:* Active A instance is killed and rebalance begins. IQs start failing 
> to A
>  * *t3*: Rebalance assignment happens and standby B is now promoted as active 
> instance. IQs continue to fail
>  * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
> commit position, IQs continue to fail
>  * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
> start succeeding again
>  
> Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
> take few seconds (~10 seconds based on defaults values). Depending on how 
> laggy the standby B was prior to A being killed, t4 can take few 
> seconds-minutes. 
> While this behavior favors consistency over availability at all times, the 
> long unavailability window might be undesirable for certain classes of 
> applications (e.g simple caches or dashboards). 
> This issue aims to also expose information about standby B to R, during each 
> rebalance such that the queries can be routed by an application to a standby 
> to serve stale reads, choosing availability over consistency. 
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8994) Streams should expose standby replication information & allow stale reads of state store

2019-10-18 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar resolved KAFKA-8994.
---
Resolution: Fixed

> Streams should expose standby replication information & allow stale reads of 
> state store
> 
>
> Key: KAFKA-8994
> URL: https://issues.apache.org/jira/browse/KAFKA-8994
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>  Labels: needs-kip
>
> Currently Streams interactive queries (IQ) fail during the time period where 
> there is a rebalance in progress. 
> Consider the following scenario in a three node Streams cluster with node A, 
> node S and node R, executing a stateful sub-topology/topic group with 1 
> partition and `_num.standby.replicas=1_`  
>  * *t0*: A is the active instance owning the partition, B is the standby that 
> keeps replicating the A's state into its local disk, R just routes streams 
> IQs to active instance using StreamsMetadata
>  * *t1*: IQs pick node R as router, R forwards query to A, A responds back to 
> R which reverse forwards back the results.
>  * *t2:* Active A instance is killed and rebalance begins. IQs start failing 
> to A
>  * *t3*: Rebalance assignment happens and standby B is now promoted as active 
> instance. IQs continue to fail
>  * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
> commit position, IQs continue to fail
>  * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
> start succeeding again
>  
> Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
> take few seconds (~10 seconds based on defaults values). Depending on how 
> laggy the standby B was prior to A being killed, t4 can take few 
> seconds-minutes. 
> While this behavior favors consistency over availability at all times, the 
> long unavailability window might be undesirable for certain classes of 
> applications (e.g simple caches or dashboards). 
> This issue aims to also expose information about standby B to R, during each 
> rebalance such that the queries can be routed by an application to a standby 
> to serve stale reads, choosing availability over consistency. 
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-17 Thread Vinoth Chandar
Looks like we are covering ground :)

>>Only if it is within a permissible  range(say 1) we will serve from
Restoring state of active.
+1 on having a knob like this.. My reasoning is as follows.

Looking at the Streams state as a read-only distributed kv store. With
num_standby = f , we should be able to tolerate f failures and if there is
a f+1' failure, the system should be unavailable.

A) So with num_standby=0, the system should be unavailable even if there is
1 failure and thats my argument for not allowing querying in restoration
state, esp in this case it will be a total rebuild of the state (which IMO
cannot be considered a normal fault free operational state).

B) Even there are standby's, say num_standby=2, if the user decides to shut
down all 3 instances, then only outcome should be unavailability until all
of them come back or state is rebuilt on other nodes in the cluster. In
normal operations, f <= 2 and when a failure does happen we can then either
choose to be C over A and fail IQs until replication is fully caught up or
choose A over C by serving in restoring state as long as lag is minimal. If
even with f=1 say, all the standbys are lagging a lot due to some issue,
then that should be considered a failure since that is different from
normal/expected operational mode. Serving reads with unbounded replication
lag and calling it "available" may not be very usable or even desirable :)
IMHO, since it gives the user no way to reason about the app that is going
to query this store.

So there is definitely a need to distinguish between :  Replication catchup
while being in fault free state vs Restoration of state when we lose more
than f standbys. This knob is a great starting point towards this.

If you agree with some of the explanation above, please feel free to
include it in the KIP as well since this is sort of our design principle
here..

Small nits :

- let's standardize on "standby" instead of "replica", KIP or code,  to be
consistent with rest of Streams code/docs?
- Can we merge KAFKA-8994 into KAFKA-6144 now and close the former?
Eventually need to consolidate KAFKA-6555 as well
- In the new API, "StreamsMetadataState::allMetadataForKey(boolean
enableReplicaServing, String storeName, K key, Serializer keySerializer)" Do
we really need a per key configuration? or a new StreamsConfig is good
enough?

On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar
 wrote:

> @Vinoth, I have incorporated a few of the discussions we have had in the
> KIP.
>
> In the current code, t0 and t1 serve queries from Active(Running)
> partition. For case t2, we are planning to return List
> such that it returns  so that if IQ
> fails on A, the replica on B can serve the data by enabling serving from
> replicas. This still does not solve case t3 and t4 since B has been
> promoted to active but it is in Restoring state to catchup till A’s last
> committed position as we don’t serve from Restoring state in Active and new
> Replica on R is building itself from scratch. Both these cases can be
> solved if we start serving from Restoring state of active as well since it
> is almost equivalent to previous Active.
>
> There could be a case where all replicas of a partition become unavailable
> and active and all replicas of that partition are building themselves from
> scratch, in this case, the state in Active is far behind even though it is
> in Restoring state. To cater to such cases that we don’t serve from this
> state we can either add another state before Restoring or check the
> difference between last committed offset and current position. Only if it
> is within a permissible range (say 1) we will serve from Restoring the
> state of Active.
>
>
> On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar <
> vchan...@confluent.io> wrote:
>
>  Thanks for the updates on the KIP, Navinder!
>
> Few comments
>
> - AssignmentInfo is not public API?. But we will change it and thus need to
> increment the version and test for version_probing etc. Good to separate
> that from StreamsMetadata changes (which is public API)
> - From what I see, there is going to be choice between the following
>
>   A) introducing a new *KafkaStreams::allMetadataForKey() *API that
> potentially returns List ordered from most upto date to
> least upto date replicas. Today we cannot fully implement this ordering,
> since all we know is which hosts are active and which are standbys.
> However, this aligns well with the future. KIP-441 adds the lag information
> to the rebalancing protocol. We could also sort replicas based on the
> report lags eventually. This is fully backwards compatible with existing
> clients. Only drawback I see is the naming of the existing method
> KafkaStreams::metadataForKey, not conveying the d

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-16 Thread Vinoth Chandar
Thanks for the updates on the KIP, Navinder!

Few comments

- AssignmentInfo is not public API?. But we will change it and thus need to
increment the version and test for version_probing etc. Good to separate
that from StreamsMetadata changes (which is public API)
- From what I see, there is going to be choice between the following

  A) introducing a new *KafkaStreams::allMetadataForKey() *API that
potentially returns List ordered from most upto date to
least upto date replicas. Today we cannot fully implement this ordering,
since all we know is which hosts are active and which are standbys.
However, this aligns well with the future. KIP-441 adds the lag information
to the rebalancing protocol. We could also sort replicas based on the
report lags eventually. This is fully backwards compatible with existing
clients. Only drawback I see is the naming of the existing method
KafkaStreams::metadataForKey, not conveying the distinction that it simply
returns the active replica i.e allMetadataForKey.get(0).
 B) Change KafkaStreams::metadataForKey() to return a List. Its a breaking
change.

I prefer A, since none of the semantics/behavior changes for existing
users. Love to hear more thoughts. Can we also work this into the KIP?
I already implemented A to unblock myself for now. Seems feasible to do.


On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar 
wrote:

> >>I get your point. But suppose there is a replica which has just become
> active, so in that case replica will still be building itself from scratch
> and this active will go to restoring state till it catches up with previous
> active, wouldn't serving from a restoring active make more sense than a
> replica in such case.
>
> KIP-441 will change this behavior such that promotion to active happens
> based on how caught up a replica is. So, once we have that (work underway
> already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the
> staleness window should not be that long as you describe. IMO if user wants
> availability for state, then should configure num.standby.replicas > 0. If
> not, then on a node loss, few partitions would be unavailable for a while
> (there are other ways to bring this window down, which I won't bring in
> here). We could argue for querying a restoring active (say a new node added
> to replace a faulty old node) based on AP vs CP principles. But not sure
> reading really really old values for the sake of availability is useful. No
> AP data system would be inconsistent for such a long time in practice.
>
> So, I still feel just limiting this to standby reads provides best
> semantics.
>
> Just my 2c. Would love to see what others think as well.
>
> On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
>  wrote:
>
>> Hi Vinoth,
>> Thanks for the feedback.
>>  Can we link the JIRA, discussion thread also to the KIP.>> Added.
>> Based on the discussion on KAFKA-6144, I was under the impression that
>> this KIP is also going to cover exposing of the standby information in
>> StreamsMetadata and thus subsume KAFKA-8994 . That would require a public
>> API change?>> Sure, I can add changes for 8994 in this KIP and link
>> KAFKA-6144 to KAFKA-8994 as well.
>>   KIP seems to be focussing on restoration when a new node is added.
>> KIP-441 is underway and has some major changes proposed for this. It would
>> be good to clarify dependencies if any. Without KIP-441, I am not very sure
>> if we should allow reads from nodes in RESTORING state, which could amount
>> to many minutes/few hours of stale reads?  This is different from allowing
>> querying standby replicas, which could be mostly caught up and the
>> staleness window could be much smaller/tolerable. (once again the focus on
>> KAFKA-8994).>> I get your point. But suppose there is a replica which has
>> just become active, so in that case replica will still be building itself
>> from scratch and this active will go to restoring state till it catches up
>> with previous active, wouldn't serving from a restoring active make more
>> sense than a replica in such case.
>>
>> Finally, we may need to introduce a configuration to control this. Some
>> users may prefer errors to stale data. Can we also add it to the KIP?>>
>> Will add this.
>>
>> Regards,
>> Navinder
>>
>>
>> On2019/10/14 16:56:49, Vinoth Chandar wrote:
>>
>> >Hi Navinder,>
>>
>> >
>>
>> >Thanks for sharing the KIP! Few thoughts>
>>
>> >
>>
>> >- Can we link the JIRA, discussion thread also to the KIP>
>>
>> >- Based on the discussion on KAFKA-6144, I was under the impression
>> that>
>>
>> >this

Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-15 Thread Vinoth Chandar
>>I get your point. But suppose there is a replica which has just become
active, so in that case replica will still be building itself from scratch
and this active will go to restoring state till it catches up with previous
active, wouldn't serving from a restoring active make more sense than a
replica in such case.

KIP-441 will change this behavior such that promotion to active happens
based on how caught up a replica is. So, once we have that (work underway
already for 2.5 IIUC) and user sets num.standby.replicas > 0, then the
staleness window should not be that long as you describe. IMO if user wants
availability for state, then should configure num.standby.replicas > 0. If
not, then on a node loss, few partitions would be unavailable for a while
(there are other ways to bring this window down, which I won't bring in
here). We could argue for querying a restoring active (say a new node added
to replace a faulty old node) based on AP vs CP principles. But not sure
reading really really old values for the sake of availability is useful. No
AP data system would be inconsistent for such a long time in practice.

So, I still feel just limiting this to standby reads provides best
semantics.

Just my 2c. Would love to see what others think as well.

On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
 wrote:

> Hi Vinoth,
> Thanks for the feedback.
>  Can we link the JIRA, discussion thread also to the KIP.>> Added.
> Based on the discussion on KAFKA-6144, I was under the impression that
> this KIP is also going to cover exposing of the standby information in
> StreamsMetadata and thus subsume KAFKA-8994 . That would require a public
> API change?>> Sure, I can add changes for 8994 in this KIP and link
> KAFKA-6144 to KAFKA-8994 as well.
>   KIP seems to be focussing on restoration when a new node is added.
> KIP-441 is underway and has some major changes proposed for this. It would
> be good to clarify dependencies if any. Without KIP-441, I am not very sure
> if we should allow reads from nodes in RESTORING state, which could amount
> to many minutes/few hours of stale reads?  This is different from allowing
> querying standby replicas, which could be mostly caught up and the
> staleness window could be much smaller/tolerable. (once again the focus on
> KAFKA-8994).>> I get your point. But suppose there is a replica which has
> just become active, so in that case replica will still be building itself
> from scratch and this active will go to restoring state till it catches up
> with previous active, wouldn't serving from a restoring active make more
> sense than a replica in such case.
>
> Finally, we may need to introduce a configuration to control this. Some
> users may prefer errors to stale data. Can we also add it to the KIP?>>
> Will add this.
>
> Regards,
> Navinder
>
>
> On2019/10/14 16:56:49, Vinoth Chandar wrote:
>
> >Hi Navinder,>
>
> >
>
> >Thanks for sharing the KIP! Few thoughts>
>
> >
>
> >- Can we link the JIRA, discussion thread also to the KIP>
>
> >- Based on the discussion on KAFKA-6144, I was under the impression that>
>
> >this KIP is also going to cover exposing of the standby information in>
>
> >StreamsMetadata and thus subsume KAFKA-8994 . That would require a
> public>
>
> >API change?>
>
> >- KIP seems to be focussing on restoration when a new node is added.>
>
> >KIP-441 is underway and has some major changes proposed for this. It
> would>
>
> >be good to clarify dependencies if any. Without KIP-441, I am not very
> sure>
>
> >if we should allow reads from nodes in RESTORING state, which could
> amount>
>
> >to many minutes/few hours of stale reads?  This is different
> fromallowing>
>
> >querying standby replicas, which could be mostly caught up and the>
>
> >staleness window could be much smaller/tolerable. (once again the focus
> on>
>
> >KAFKA-8994)>
>
> >- Finally, we may need to introduce a configuration to control this.
> Some>
>
> >users may prefer errors to stale data. Can we also add it to the KIP?>
>
> >
>
> >Thanks>
>
> >Vinoth>
>
> >
>
> >
>
> >
>
> >
>
> >On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar>
>
> >wrote:>
>
> >
>
> >> Hi,>
>
> >> Starting a discussion on the KIP to Allow state stores to serve stale>
>
> >> reads during rebalance(>
>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> >
>
> >> ).>
>
> >> Thanks & Regards,Navinder>
>
> >> LinkedIn>
>
> >>>
> >


Re: [DISCUSS] KIP-535: Allow state stores to serve stale reads during rebalance

2019-10-14 Thread Vinoth Chandar
Hi Navinder,

Thanks for sharing the KIP! Few thoughts

- Can we link the JIRA, discussion thread also to the KIP
- Based on the discussion on KAFKA-6144, I was under the impression that
this KIP is also going to cover exposing of the standby information in
StreamsMetadata and thus subsume KAFKA-8994 . That would require a public
API change?
- KIP seems to be focussing on restoration when a new node is added.
KIP-441 is underway and has some major changes proposed for this. It would
be good to clarify dependencies if any. Without KIP-441, I am not very sure
if we should allow reads from nodes in RESTORING state, which could amount
to many minutes/few hours of stale reads?  This is different from allowing
querying standby replicas, which could be mostly caught up and the
staleness window could be much smaller/tolerable. (once again the focus on
KAFKA-8994)
- Finally, we may need to introduce a configuration to control this. Some
users may prefer errors to stale data. Can we also add it to the KIP?

Thanks
Vinoth




On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar
 wrote:

> Hi,
> Starting a discussion on the KIP to Allow state stores to serve stale
> reads during rebalance(
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> ).
> Thanks & Regards,Navinder
> LinkedIn
>


[jira] [Created] (KAFKA-8994) Streams should expose standby replication information & allow stale reads of state store

2019-10-07 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-8994:
-

 Summary: Streams should expose standby replication information & 
allow stale reads of state store
 Key: KAFKA-8994
 URL: https://issues.apache.org/jira/browse/KAFKA-8994
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Vinoth Chandar


Currently Streams interactive queries (IQ) fail during the time period where 
there is a rebalance in progress. 

Consider the following scenario in a three node Streams cluster with node A, 
node S and node R, executing a stateful sub-topology/topic group with 1 
partition and `_num.standby.replicas=1_`  
 * *t0*: A is the active instance owning the partition, B is the standby that 
keeps replicating the A's state into its local disk, R just routes streams IQs 
to active instance using StreamsMetadata
 * *t1*: IQs pick node R as router, R forwards query to A, A responds back to R 
which reverse forwards back the results.
 * *t2:* Active A instance is killed and rebalance begins. IQs start failing to 
A
 * *t3*: Rebalance assignment happens and standby B is now promoted as active 
instance. IQs continue to fail
 * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
commit position, IQs continue to fail
 * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
start succeeding again

 

Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
take few seconds (~10 seconds based on defaults values). Depending on how laggy 
the standby B was prior to A being killed, t4 can take few seconds-minutes. 

 

While this behavior favors consistency over availability at all times, the long 
unavailability window might be undesirable for certain classes of applications 
(e.g simple caches or dashboards). 

 

This issue aims to also expose information about standby B to R, during each 
rebalance such that the queries can be routed by an application to a standby to 
serve stale reads, choosing availability over consistency. 

 

 

 

 

 

 

 

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8839) Improve logging in Kafka Streams around debugging task lifecycle

2019-10-07 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar resolved KAFKA-8839.
---
Resolution: Fixed

Closing since the PR has been merged

> Improve logging in Kafka Streams around debugging task lifecycle 
> -
>
> Key: KAFKA-8839
> URL: https://issues.apache.org/jira/browse/KAFKA-8839
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 2.4.0
>
>
> As a follow up to KAFKA-8831, this Jira will track efforts around improving 
> logging/docs around 
>  
>  * Being able to follow state of tasks from assignment to restoration 
>  * Better detection of misconfigured state store dir 
>  * Docs giving guidance for rebalance time and state store config



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8913) Document topic based configs & ISR settings for Streams apps

2019-09-18 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar resolved KAFKA-8913.
---
Resolution: Fixed

> Document topic based configs & ISR settings for Streams apps
> 
>
> Key: KAFKA-8913
> URL: https://issues.apache.org/jira/browse/KAFKA-8913
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
>
> Noticed that it was not clear how to configure the internal topics . 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8913) Document topic based configs & ISR settings for Streams apps

2019-09-16 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-8913:
-

 Summary: Document topic based configs & ISR settings for Streams 
apps
 Key: KAFKA-8913
 URL: https://issues.apache.org/jira/browse/KAFKA-8913
 Project: Kafka
  Issue Type: Improvement
  Components: documentation, streams
Reporter: Vinoth Chandar
Assignee: Vinoth Chandar


Noticed that it was not clear how to configure the internal topics . 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [VOTE] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-11 Thread Vinoth Chandar
+1 (non-binding).

On Fri, Sep 6, 2019 at 12:46 AM Bruno Cadonna  wrote:

> +1 (non-binding)
>
> On Fri, Sep 6, 2019 at 12:32 AM Guozhang Wang  wrote:
> >
> > +1 (binding).
> >
> > On Thu, Sep 5, 2019 at 2:47 PM John Roesler  wrote:
> >
> > > Hello, all,
> > >
> > > After a great discussion, I'd like to open voting on KIP-441,
> > > to avoid long restore times in Streams after rebalancing.
> > > Please cast your votes!
> > >
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > >
> > > Thanks,
> > > -John
> > >
> >
> >
> > --
> > -- Guozhang
>


[jira] [Created] (KAFKA-8870) Prevent dirty reads of Streams state store from Interactive queries

2019-09-04 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-8870:
-

 Summary: Prevent dirty reads of Streams state store from 
Interactive queries
 Key: KAFKA-8870
 URL: https://issues.apache.org/jira/browse/KAFKA-8870
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Vinoth Chandar


Today, Interactive Queries (IQ) against Streams state store could see 
uncommitted data, even with EOS processing guarantees (these are actually 
orthogonal, but clarifying since EOS may give the impression that everything is 
dandy). This is causes primarily because state updates in rocksdb are visible 
even before the kafka transaction is committed. Thus, if the instance fails, 
then the failed over instance will redo the uncommited old transaction and the 
following could be possible during recovery,.

Value for key K can go from *V0 → V1 → V2* on active instance A, IQ reads V1, 
instance A fails and any failure/rebalancing will leave the standy instance B 
rewinding offsets and reprocessing, during which time IQ can again see V0 or V1 
or any number of previous values for the same key.

In this issue, we will plan work towards providing consistency for IQ, for a 
single row in a single state store. i.e once a query sees V1, it can only see 
either V1 or V2.

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8839) Improve logging in Kafka Streams around debugging task lifecycle

2019-08-27 Thread Vinoth Chandar (Jira)
Vinoth Chandar created KAFKA-8839:
-

 Summary: Improve logging in Kafka Streams around debugging task 
lifecycle 
 Key: KAFKA-8839
 URL: https://issues.apache.org/jira/browse/KAFKA-8839
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Vinoth Chandar


As a follow up to KAFKA-8831, this Jira will track efforts around improving 
logging/docs around 

 
 * Being able to follow state of tasks from assignment to restoration 
 * Better detection of misconfigured state store dir 
 * Docs giving guidance for rebalance time and state store config



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (KAFKA-8831) Joining a new instance sometimes does not cause rebalancing

2019-08-26 Thread Vinoth Chandar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Chandar resolved KAFKA-8831.
---
Resolution: Not A Problem

I will think about better logging and put up a patch. Closing this issue

> Joining a new instance sometimes does not cause rebalancing
> ---
>
> Key: KAFKA-8831
> URL: https://issues.apache.org/jira/browse/KAFKA-8831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Chris Pettitt
>Assignee: Chris Pettitt
>Priority: Major
> Attachments: StandbyTaskTest.java, fail.log
>
>
> See attached log. The application is in a REBALANCING state. The second 
> instance joins a bit after the first instance (~250ms). The group coordinator 
> says it is going to rebalance but nothing happens. The first instance gets 
> all partitions (2). The application transitions to RUNNING.
> See attached test, which starts one client and then starts another about 
> 250ms later. This seems to consistently repro the issue for me.
> This is blocking my work on KAFKA-8755, so I'm inclined to pick it up



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8810) Add mechanism to detect topology mismatch between streams instances

2019-08-16 Thread Vinoth Chandar (JIRA)
Vinoth Chandar created KAFKA-8810:
-

 Summary: Add mechanism to detect topology mismatch between streams 
instances
 Key: KAFKA-8810
 URL: https://issues.apache.org/jira/browse/KAFKA-8810
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Vinoth Chandar


Noticed this while reading through the StreamsPartitionAssignor related code. 
If an user accidentally deploys a different topology on one of the instances, 
there is no mechanism to detect this and refuse assignment/take action. Given 
Kafka Streams is designed as an embeddable library, I feel this is rather an 
important scenario to handle. For e.g, kafka streams is embedded into a web 
front end tier and operators deploy a hot fix for a site issue to a few 
instances that are leaking memory and that accidentally also deploys some 
topology changes with it. 


Please feel free to close the issue, if its a duplicate. (Could not find a 
ticket for this) 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8799) Support ability to pass global user data to consumers during Assignment

2019-08-13 Thread Vinoth Chandar (JIRA)
Vinoth Chandar created KAFKA-8799:
-

 Summary: Support ability to pass global user data to consumers 
during Assignment
 Key: KAFKA-8799
 URL: https://issues.apache.org/jira/browse/KAFKA-8799
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Vinoth Chandar


This is a follow up from KAFKA-7149 

*Background :* 

Although we reduced the size of the AssignmentInfo object sent during each 
rebalance from leader to all followers in KAFKA-7149, we still repeat the same 
_partitionsByHost_ map for each host (all this when interactive queries are 
enabled) and thus still end up sending redundant bytes to the broker and also 
logging a large kafka message.

With 100s of streams instances, this overhead can grow into tens of megabytes 
easily.  

*Proposal :*

Extend the group assignment protocol to be able to support passing of an 
additional byte[], which can now contain the HostInfo -> 
partitions/partitionsByHost data just one time. 

{code}
final class GroupAssignment {
private final Map assignments;

// bytes sent to each consumer from leader
private final byte[] globalUserData
...
}
{code}
 
This can generally be handy to any other application like Streams, that does 
some stateful processing or lightweight cluster management 
 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


Add to the contributor list

2019-08-02 Thread Vinoth Chandar
I am interested in picking up KAFKA-7149
Can I be added to the list? My jira id : vinoth


[jira] [Created] (KAFKA-7820) distinct count kafka streams api

2019-01-14 Thread Vinoth Rajasekar (JIRA)
Vinoth Rajasekar created KAFKA-7820:
---

 Summary: distinct count kafka streams api
 Key: KAFKA-7820
 URL: https://issues.apache.org/jira/browse/KAFKA-7820
 Project: Kafka
  Issue Type: New Feature
  Components: core
Reporter: Vinoth Rajasekar


we are using Kafka streams for our real-time analytic use cases. most of our 
use cases involved with doing distinct count on certain fields.

currently we do distinct count by storing the hash map value of the data in a 
set and do a count as event flows in. There are lot of challenges doing this 
using application memory, because storing the hashmap value and counting them 
is limited by the allotted memory size. When we get high volume  or spike in 
traffic hash map of the distinct count fields grows beyond allotted memory size 
leading to issues.

other issue is when  we scale the app, we need to use global ktables so we get 
all the values for doing distinct count and this adds back pressure in the 
cluster or we have to re-partition the topic and do count on the key.

Can we have feature, where the distinct count is supported by through streams 
api at the framework level, rather than dealing it with application level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2017-05-09 Thread Vinoth Rajasekar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16002747#comment-16002747
 ] 

Vinoth Rajasekar commented on KAFKA-4273:
-

we have a use case where we need to do a distinct count on the useragent 
strings by ip.In a day, on an average we have to do these type of aggregation 
on 50 million unique ips for many fields.  At times, as some ips get more than 
5 unique useragents, its tough to fit these records in memory and do a 
ktable aggregation.

For distinct counts the original values needs to be retained in a hashset per 
record for unique count. To avoid memory issues, I'm making use of the 
low-level processor API and writing the aggregated output to RocksDB datastore. 
I want to keeps these record active only for 24hr period and delete/expire them 
later on from the rocksdb datastore. There is no option now to set custom ttl 
values to expire the records. 

This is a nice to have feature. Any idea when this will be available in 
production?

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-11-02 Thread Vinoth Chandar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14985707#comment-14985707
 ] 

Vinoth Chandar commented on KAFKA-2580:
---

[~jkreps] ah ok. point taken :) 

> Kafka Broker keeps file handles open for all log files (even if its not 
> written to/read from)
> -
>
> Key: KAFKA-2580
> URL: https://issues.apache.org/jira/browse/KAFKA-2580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: Vinoth Chandar
>Assignee: Grant Henke
>
> We noticed this in one of our clusters where we stage logs for a longer 
> amount of time. It appears that the Kafka broker keeps file handles open even 
> for non active (not written to or read from) files. (in fact, there are some 
> threads going back to 2013 
> http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 
> Needless to say, this is a problem and forces us to either artificially bump 
> up ulimit (its already at 100K) or expand the cluster (even if we have 
> sufficient IO and everything). 
> Filing this ticket, since I could find anything similar. Very interested to 
> know if there are plans to address this (given how Samza's changelog topic is 
> meant to be a persistent large state use case).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-10-31 Thread Vinoth Chandar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14984243#comment-14984243
 ] 

Vinoth Chandar commented on KAFKA-2580:
---

Based on this, looks like we can close this? 

>> So a lot of this comes down to the implementation. A naive 10k item LRU 
>> cache could easily be far more memory hungry than having 50k open FDs, plus 
>> being in heap this would add a huge number of objects to manage.

[~jkreps] I am a little confused. What I meant by LRU cache was simply limiting 
the number of "java.io.File" objects (or equivalent in Kafka codebase) that 
represents the handle to the segment. So, if there are 10K such objects in a 
(properly sized) ConcurrentHashMap, how would that add to the memory overhead 
so much, compared to holding 50K/200K objects anyway?

> Kafka Broker keeps file handles open for all log files (even if its not 
> written to/read from)
> -
>
> Key: KAFKA-2580
> URL: https://issues.apache.org/jira/browse/KAFKA-2580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: Vinoth Chandar
>Assignee: Grant Henke
>
> We noticed this in one of our clusters where we stage logs for a longer 
> amount of time. It appears that the Kafka broker keeps file handles open even 
> for non active (not written to or read from) files. (in fact, there are some 
> threads going back to 2013 
> http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 
> Needless to say, this is a problem and forces us to either artificially bump 
> up ulimit (its already at 100K) or expand the cluster (even if we have 
> sufficient IO and everything). 
> Filing this ticket, since I could find anything similar. Very interested to 
> know if there are plans to address this (given how Samza's changelog topic is 
> meant to be a persistent large state use case).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-10-19 Thread Vinoth Chandar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14963574#comment-14963574
 ] 

Vinoth Chandar commented on KAFKA-2580:
---

[~jjkoshy] Good point. if I understand correctly, even if say all consumers 
start bootstrapping with startTime=earliest, which can just force opening of 
all file handles, an LRU based scheme would keep closing the file handles 
internally from oldest to latest file, which still is good behaviour. In order 
to lessen the impact of fs.close() on old file by delegating to a background 
thread, which takes a config that caps the number of items in the file handle 
cache. 

I like the cache approach better since it will be one place thru which all 
access go,so future feature transparently play nicely with overall system 
limits. 

> Kafka Broker keeps file handles open for all log files (even if its not 
> written to/read from)
> -
>
> Key: KAFKA-2580
> URL: https://issues.apache.org/jira/browse/KAFKA-2580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: Vinoth Chandar
>Assignee: Grant Henke
>
> We noticed this in one of our clusters where we stage logs for a longer 
> amount of time. It appears that the Kafka broker keeps file handles open even 
> for non active (not written to or read from) files. (in fact, there are some 
> threads going back to 2013 
> http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 
> Needless to say, this is a problem and forces us to either artificially bump 
> up ulimit (its already at 100K) or expand the cluster (even if we have 
> sufficient IO and everything). 
> Filing this ticket, since I could find anything similar. Very interested to 
> know if there are plans to address this (given how Samza's changelog topic is 
> meant to be a persistent large state use case).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-10-02 Thread Vinoth Chandar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14941910#comment-14941910
 ] 

Vinoth Chandar commented on KAFKA-2580:
---

A LRU file handle is something very commonly employed in databases, which works 
pretty well in practice. (considering that it involves random access).  So +1  
on that path. 

[~granthenke] would you have cycles for this? If no one is working on this 
currently, we (uber) can take a stab at this, later this quarter. 


> Kafka Broker keeps file handles open for all log files (even if its not 
> written to/read from)
> -
>
> Key: KAFKA-2580
> URL: https://issues.apache.org/jira/browse/KAFKA-2580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: Vinoth Chandar
>
> We noticed this in one of our clusters where we stage logs for a longer 
> amount of time. It appears that the Kafka broker keeps file handles open even 
> for non active (not written to or read from) files. (in fact, there are some 
> threads going back to 2013 
> http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 
> Needless to say, this is a problem and forces us to either artificially bump 
> up ulimit (its already at 100K) or expand the cluster (even if we have 
> sufficient IO and everything). 
> Filing this ticket, since I could find anything similar. Very interested to 
> know if there are plans to address this (given how Samza's changelog topic is 
> meant to be a persistent large state use case).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-09-25 Thread Vinoth Chandar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14908910#comment-14908910
 ] 

Vinoth Chandar commented on KAFKA-2580:
---

Thanks for jumping in  [~guozhang] .we have 256MB segment sizes and 100K 
descriptors.. 

>> Adding a feature that closes inactive segments' open file handlers and 
>> re-open them upon being read / written again is possible, but would be 
>> tricky.

Can you please elaborate? Looks straightforward to me from the outside :) 


> Kafka Broker keeps file handles open for all log files (even if its not 
> written to/read from)
> -
>
> Key: KAFKA-2580
> URL: https://issues.apache.org/jira/browse/KAFKA-2580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>    Affects Versions: 0.8.2.1
>Reporter: Vinoth Chandar
>
> We noticed this in one of our clusters where we stage logs for a longer 
> amount of time. It appears that the Kafka broker keeps file handles open even 
> for non active (not written to or read from) files. (in fact, there are some 
> threads going back to 2013 
> http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 
> Needless to say, this is a problem and forces us to either artificially bump 
> up ulimit (its already at 100K) or expand the cluster (even if we have 
> sufficient IO and everything). 
> Filing this ticket, since I could find anything similar. Very interested to 
> know if there are plans to address this (given how Samza's changelog topic is 
> meant to be a persistent large state use case).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-09-25 Thread Vinoth Chandar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14908298#comment-14908298
 ] 

Vinoth Chandar commented on KAFKA-2580:
---

More context on how we determined this

{code}
vinoth@kafka-agg:~$ sudo ls -l /proc//fd | wc -l
50820
vinoth@kafka-agg::~$ ls -R /var/kafka-spool/data | grep -e ".log" -e ".index" | 
wc -l
97242
vinoth@kafka-agg::~$ ls -R /var/kafka-spool/data | grep -e ".index" | wc -l
48456
vinoth@kafka-agg::~$ ls -R /var/kafka-spool/data | grep -e ".log"  | wc -l
48788


vinoth@kafka-changelog-cluster:~$ sudo ls -l /proc//fd | wc -l
59128
vinoth@kafka-changelog-cluster:~$ ls -R /var/kafka-spool/data | grep -e ".log" 
-e ".index" | wc -l
117548
vinoth@kafka-changelog-cluster:~$ ls -R /var/kafka-spool/data | grep  -e 
".index" | wc -l 
58774
vinoth@kafka-changelog-cluster:~$ ls -R /var/kafka-spool/data | grep  -e ".log" 
| wc -l
58774
{code}

> Kafka Broker keeps file handles open for all log files (even if its not 
> written to/read from)
> -
>
> Key: KAFKA-2580
> URL: https://issues.apache.org/jira/browse/KAFKA-2580
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.1
>Reporter: Vinoth Chandar
>
> We noticed this in one of our clusters where we stage logs for a longer 
> amount of time. It appears that the Kafka broker keeps file handles open even 
> for non active (not written to or read from) files. (in fact, there are some 
> threads going back to 2013 
> http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 
> Needless to say, this is a problem and forces us to either artificially bump 
> up ulimit (its already at 100K) or expand the cluster (even if we have 
> sufficient IO and everything). 
> Filing this ticket, since I could find anything similar. Very interested to 
> know if there are plans to address this (given how Samza's changelog topic is 
> meant to be a persistent large state use case).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2580) Kafka Broker keeps file handles open for all log files (even if its not written to/read from)

2015-09-24 Thread Vinoth Chandar (JIRA)
Vinoth Chandar created KAFKA-2580:
-

 Summary: Kafka Broker keeps file handles open for all log files 
(even if its not written to/read from)
 Key: KAFKA-2580
 URL: https://issues.apache.org/jira/browse/KAFKA-2580
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2.1
Reporter: Vinoth Chandar


We noticed this in one of our clusters where we stage logs for a longer amount 
of time. It appears that the Kafka broker keeps file handles open even for non 
active (not written to or read from) files. (in fact, there are some threads 
going back to 2013 
http://grokbase.com/t/kafka/users/132p65qwcn/keeping-logs-forever) 

Needless to say, this is a problem and forces us to either artificially bump up 
ulimit (its already at 100K) or expand the cluster (even if we have sufficient 
IO and everything). 

Filing this ticket, since I could find anything similar. Very interested to 
know if there are plans to address this (given how Samza's changelog topic is 
meant to be a persistent large state use case).  




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)