[jira] [Updated] (KAFKA-10429) Group Coordinator unavailability leads to missing events

2020-08-24 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-10429:
--
Summary: Group Coordinator unavailability leads to missing events  (was: 
Group Coordinator is unavailable leads to missing events)

> Group Coordinator unavailability leads to missing events
> 
>
> Key: KAFKA-10429
> URL: https://issues.apache.org/jira/browse/KAFKA-10429
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Navinder Brar
>Priority: Major
>
> We are regularly getting this Exception in logs.
> [2020-08-25 03:24:59,214] INFO [Consumer 
> clientId=appId-StreamThread-1-consumer, groupId=dashavatara] Group 
> coordinator ip:9092 (id: 1452096777 rack: null) is *unavailable* or invalid, 
> will attempt rediscovery 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  
> And after sometime it becomes discoverable:
> [2020-08-25 03:25:02,218] INFO [Consumer 
> clientId=appId-c3d1d186-e487-4993-ae3d-5fed75887e6b-StreamThread-1-consumer, 
> groupId=appId] Discovered group coordinator ip:9092 (id: 1452096777 rack: 
> null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
>  
> Now, the doubt I have is why this unavailability doesn't trigger a rebalance 
> in the cluster. We have few hours of retention on the source Kafka Topics and 
> sometimes this unavailability stays over for more than few hours and since it 
> doesn't trigger a rebalance or stops processing on other nodes(which are 
> connected to GC) we never come to know that some issue has happened and till 
> then we lose events from our source topics. 
>  
> There are some resolutions mentioned on stackoverflow but those configs are 
> already set in our kafka:
> default.replication.factor=3
> offsets.topic.replication.factor=3
>  
> It would be great to understand why this issue is happening and why it 
> doesn't trigger a rebalance and is there any known solution for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10429) Group Coordinator is unavailable leads to missing events

2020-08-24 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-10429:
-

 Summary: Group Coordinator is unavailable leads to missing events
 Key: KAFKA-10429
 URL: https://issues.apache.org/jira/browse/KAFKA-10429
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.1.1
Reporter: Navinder Brar


We are regularly getting this Exception in logs.

[2020-08-25 03:24:59,214] INFO [Consumer 
clientId=appId-StreamThread-1-consumer, groupId=dashavatara] Group coordinator 
ip:9092 (id: 1452096777 rack: null) is *unavailable* or invalid, will attempt 
rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

 

And after sometime it becomes discoverable:

[2020-08-25 03:25:02,218] INFO [Consumer 
clientId=appId-c3d1d186-e487-4993-ae3d-5fed75887e6b-StreamThread-1-consumer, 
groupId=appId] Discovered group coordinator ip:9092 (id: 1452096777 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

 

Now, the doubt I have is why this unavailability doesn't trigger a rebalance in 
the cluster. We have few hours of retention on the source Kafka Topics and 
sometimes this unavailability stays over for more than few hours and since it 
doesn't trigger a rebalance or stops processing on other nodes(which are 
connected to GC) we never come to know that some issue has happened and till 
then we lose events from our source topics. 

 

There are some resolutions mentioned on stackoverflow but those configs are 
already set in our kafka:

default.replication.factor=3

offsets.topic.replication.factor=3

 

It would be great to understand why this issue is happening and why it doesn't 
trigger a rebalance and is there any known solution for it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9757) Add documentation change for KIP-535

2020-03-31 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071906#comment-17071906
 ] 

Navinder Brar commented on KAFKA-9757:
--

Opened a PR for this: [https://github.com/apache/kafka/pull/8395]

> Add documentation change for KIP-535
> 
>
> Key: KAFKA-9757
> URL: https://issues.apache.org/jira/browse/KAFKA-9757
> Project: Kafka
>  Issue Type: Sub-task
>  Components: docs, streams
>Affects Versions: 2.5.0
>Reporter: Boyang Chen
>Assignee: Navinder Brar
>Priority: Major
>
> Just a reminder to add documentations for KIP-535 in both the release notes 
> and streams documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9757) Add documentation change for KIP-535

2020-03-30 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17071251#comment-17071251
 ] 

Navinder Brar commented on KAFKA-9757:
--

Hey [~vinoth], if you are busy I can take this up. I am also updating the docs 
for KIP-562, so I can do both together.

> Add documentation change for KIP-535
> 
>
> Key: KAFKA-9757
> URL: https://issues.apache.org/jira/browse/KAFKA-9757
> Project: Kafka
>  Issue Type: Sub-task
>  Components: docs, streams
>Affects Versions: 2.5.0
>Reporter: Boyang Chen
>Assignee: Vinoth Chandar
>Priority: Major
>
> Just a reminder to add documentations for KIP-535 in both the release notes 
> and streams documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7480) GlobalThread should honor custom auto.offset.reset policy

2020-03-16 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059997#comment-17059997
 ] 

Navinder Brar commented on KAFKA-7480:
--

Sure, thanks Matthias. I will assign this to myself.

> GlobalThread should honor custom auto.offset.reset policy
> -
>
> Key: KAFKA-7480
> URL: https://issues.apache.org/jira/browse/KAFKA-7480
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> With KAFKA-6121 we improved Kafka Streams resilience and correctness with 
> regard to consumer auto.offset.reset and state cleanup.
> Back than, we decided to let GlobalStreamThread die and not handle 
> InvalidOffsetException during regular processing, because this error 
> indicates a fatal issue and the user should be notified about it. However, as 
> reported on the user mailing list, the only thing a user can do is, to 
> restart the application (and investigate the root cause). During restart, the 
> state will be cleaned up and bootstrapped correctly.
> Thus, we might want to allow users to specify a more resilient configuration 
> for this case and log an ERROR message if the error occurs. To ensure 
> consistency, we might not allow to set reset policy "latest" though (need 
> discussion)? By default, we can still keep "none" and fail.
> Note: `Topology.addGlobalStore` does not allow to set a reset policy. Thus, 
> this might require a KIP to extend `Topology.addGlobalStore` accordingly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7480) GlobalThread should honor custom auto.offset.reset policy

2020-03-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar reassigned KAFKA-7480:


Assignee: Navinder Brar

> GlobalThread should honor custom auto.offset.reset policy
> -
>
> Key: KAFKA-7480
> URL: https://issues.apache.org/jira/browse/KAFKA-7480
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Navinder Brar
>Priority: Major
>  Labels: needs-kip
>
> With KAFKA-6121 we improved Kafka Streams resilience and correctness with 
> regard to consumer auto.offset.reset and state cleanup.
> Back than, we decided to let GlobalStreamThread die and not handle 
> InvalidOffsetException during regular processing, because this error 
> indicates a fatal issue and the user should be notified about it. However, as 
> reported on the user mailing list, the only thing a user can do is, to 
> restart the application (and investigate the root cause). During restart, the 
> state will be cleaned up and bootstrapped correctly.
> Thus, we might want to allow users to specify a more resilient configuration 
> for this case and log an ERROR message if the error occurs. To ensure 
> consistency, we might not allow to set reset policy "latest" though (need 
> discussion)? By default, we can still keep "none" and fail.
> Note: `Topology.addGlobalStore` does not allow to set a reset policy. Thus, 
> this might require a KIP to extend `Topology.addGlobalStore` accordingly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7480) GlobalThread should honor custom auto.offset.reset policy

2020-03-15 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059905#comment-17059905
 ] 

Navinder Brar commented on KAFKA-7480:
--

Hi [~mjsax] can I take up this up?

I didn't understand why this one needs KIP, are we planning on introducing a 
new config like "global.auto.offset.reset" or we want to use the same 
"auto.offset.reset" which is available for other source topics.

> GlobalThread should honor custom auto.offset.reset policy
> -
>
> Key: KAFKA-7480
> URL: https://issues.apache.org/jira/browse/KAFKA-7480
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> With KAFKA-6121 we improved Kafka Streams resilience and correctness with 
> regard to consumer auto.offset.reset and state cleanup.
> Back than, we decided to let GlobalStreamThread die and not handle 
> InvalidOffsetException during regular processing, because this error 
> indicates a fatal issue and the user should be notified about it. However, as 
> reported on the user mailing list, the only thing a user can do is, to 
> restart the application (and investigate the root cause). During restart, the 
> state will be cleaned up and bootstrapped correctly.
> Thus, we might want to allow users to specify a more resilient configuration 
> for this case and log an ERROR message if the error occurs. To ensure 
> consistency, we might not allow to set reset policy "latest" though (need 
> discussion)? By default, we can still keep "none" and fail.
> Note: `Topology.addGlobalStore` does not allow to set a reset policy. Thus, 
> this might require a KIP to extend `Topology.addGlobalStore` accordingly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9588) Add rocksdb event listeners in KS

2020-02-20 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9588:
-
Description: Rocsdb is coming up with the support of event listeners(like 
onCompactionCompleted) in jni which would be really helpful in KS to trigger 
checkpointing on flush completed due to filling up of memtables, rather than 
doing it periodically etc. This task is currently blocked on 
https://issues.apache.org/jira/browse/KAFKA-8897.  (was: Rocsdb is coming up 
with the support of event listeners(like onCompactionCompleted) in jni which 
would be really helpful in KS to trigger checkpointing on flush completed due 
to filling up of memtables, rather than doing it periodically etc. This task is 
currently blocked on https://issues.apache.org/jira/browse/KAFKA-8897.

 

Linking this task to https://issues.apache.org/jira/browse/KAFKA-9450 as well 
for tracking.)

> Add rocksdb event listeners in KS
> -
>
> Key: KAFKA-9588
> URL: https://issues.apache.org/jira/browse/KAFKA-9588
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> Rocsdb is coming up with the support of event listeners(like 
> onCompactionCompleted) in jni which would be really helpful in KS to trigger 
> checkpointing on flush completed due to filling up of memtables, rather than 
> doing it periodically etc. This task is currently blocked on 
> https://issues.apache.org/jira/browse/KAFKA-8897.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9588) Add rocksdb event listeners in KS

2020-02-20 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9588:
-
Description: Rocsdb is coming up with the support of event listeners(like 
onCompactionCompleted) in jni 
([https://github.com/facebook/rocksdb/issues/6343]) which would be really 
helpful in KS to trigger checkpointing on flush completed due to filling up of 
memtables, rather than doing it periodically etc. This task is currently 
blocked on https://issues.apache.org/jira/browse/KAFKA-8897.  (was: Rocsdb is 
coming up with the support of event listeners(like onCompactionCompleted) in 
jni which would be really helpful in KS to trigger checkpointing on flush 
completed due to filling up of memtables, rather than doing it periodically 
etc. This task is currently blocked on 
https://issues.apache.org/jira/browse/KAFKA-8897.)

> Add rocksdb event listeners in KS
> -
>
> Key: KAFKA-9588
> URL: https://issues.apache.org/jira/browse/KAFKA-9588
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> Rocsdb is coming up with the support of event listeners(like 
> onCompactionCompleted) in jni 
> ([https://github.com/facebook/rocksdb/issues/6343]) which would be really 
> helpful in KS to trigger checkpointing on flush completed due to filling up 
> of memtables, rather than doing it periodically etc. This task is currently 
> blocked on https://issues.apache.org/jira/browse/KAFKA-8897.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9588) Add rocksdb event listeners in KS

2020-02-20 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-9588:


 Summary: Add rocksdb event listeners in KS
 Key: KAFKA-9588
 URL: https://issues.apache.org/jira/browse/KAFKA-9588
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar


Rocsdb is coming up with the support of event listeners(like 
onCompactionCompleted) in jni which would be really helpful in KS to trigger 
checkpointing on flush completed due to filling up of memtables, rather than 
doing it periodically etc. This task is currently blocked on 
https://issues.apache.org/jira/browse/KAFKA-8897.

 

Linking this task to https://issues.apache.org/jira/browse/KAFKA-9450 as well 
for tracking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing

2020-02-13 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17036093#comment-17036093
 ] 

Navinder Brar commented on KAFKA-9450:
--

Sure, [~ableegoldman]. I will create a ticket. Checked with Rocksdb, they will 
not cherry-pick to 5.x version. So, we will have to wait I guess.

> Decouple inner state flushing from committing
> -
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.
> Note that this is especially problematic with EOS due to the necessarily-low 
> commit interval, but still hurts even with at-least-once and a much larger 
> commit interval. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9487) Followup : KAFKA-9445(Allow fetching a key from a single partition); addressing code review comments

2020-02-11 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar resolved KAFKA-9487.
--
Resolution: Fixed

> Followup : KAFKA-9445(Allow fetching a key from a single partition); 
> addressing code review comments
> 
>
> Key: KAFKA-9487
> URL: https://issues.apache.org/jira/browse/KAFKA-9487
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Blocker
> Fix For: 2.5.0
>
>
> A few code review comments are left to be addressed from Kafka 9445, which I 
> will be addressing in this PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-02-09 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17025550#comment-17025550
 ] 

Navinder Brar edited comment on KAFKA-9450 at 2/9/20 9:19 AM:
--

Checked Rocksdb code, event listeners are not available in the Rocksdb version 
that we are using. I had added a feature request and it is merged now 
[https://github.com/facebook/rocksdb/issues/6343]. 


was (Author: navibrar):
Checked Rocksdb code, event listeners are not available in the jni. It's 
probably in the plan but not available in any of the versions yet.

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9487) Followup : KAFKA-9445(Allow fetching a key from a single partition); addressing code review comments

2020-02-02 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9487:
-
Summary: Followup : KAFKA-9445(Allow fetching a key from a single 
partition); addressing code review comments  (was: Followup : KAFKA-9445(Allow 
fetching a key from a single partition), addressing code review comments)

> Followup : KAFKA-9445(Allow fetching a key from a single partition); 
> addressing code review comments
> 
>
> Key: KAFKA-9487
> URL: https://issues.apache.org/jira/browse/KAFKA-9487
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Blocker
> Fix For: 2.5.0
>
>
> A few code review comments are left to be addressed from Kafka 9445, which I 
> will be addressing in this PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9487) Followup : KAFKA-9445(Allow fetching a key from a single partition), addressing code review comments

2020-02-02 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9487:
-
Summary: Followup : KAFKA-9445(Allow fetching a key from a single 
partition), addressing code review comments  (was: Followup : KAFKA-9445)

> Followup : KAFKA-9445(Allow fetching a key from a single partition), 
> addressing code review comments
> 
>
> Key: KAFKA-9487
> URL: https://issues.apache.org/jira/browse/KAFKA-9487
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Blocker
> Fix For: 2.5.0
>
>
> A few code review comments are left to be addressed from Kafka 9445, which I 
> will be addressing in this PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9487) Followup : KAFKA-9445

2020-01-31 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9487:
-
Description: A few code review comments are left to be addressed from Kafka 
9445, which I will be addressing in this PR.

> Followup : KAFKA-9445
> -
>
> Key: KAFKA-9487
> URL: https://issues.apache.org/jira/browse/KAFKA-9487
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>
> A few code review comments are left to be addressed from Kafka 9445, which I 
> will be addressing in this PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-9487) Followup : KAFKA-9445

2020-01-31 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9487:
-
Comment: was deleted

(was: A few code review comments are left to be addressed from Kafka 9445, 
which I will be addressing in this PR.)

> Followup : KAFKA-9445
> -
>
> Key: KAFKA-9487
> URL: https://issues.apache.org/jira/browse/KAFKA-9487
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>
> A few code review comments are left to be addressed from Kafka 9445, which I 
> will be addressing in this PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9487) Followup : KAFKA-9445

2020-01-31 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17027318#comment-17027318
 ] 

Navinder Brar commented on KAFKA-9487:
--

A few code review comments are left to be addressed from Kafka 9445, which I 
will be addressing in this PR.

> Followup : KAFKA-9445
> -
>
> Key: KAFKA-9487
> URL: https://issues.apache.org/jira/browse/KAFKA-9487
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9487) Followup : KAFKA-9445

2020-01-31 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-9487:


 Summary: Followup : KAFKA-9445
 Key: KAFKA-9487
 URL: https://issues.apache.org/jira/browse/KAFKA-9487
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar
Assignee: Navinder Brar






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-28 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17025550#comment-17025550
 ] 

Navinder Brar commented on KAFKA-9450:
--

Checked Rocksdb code, event listeners are not available in the jni. It's 
probably in the plan but not available in any of the versions yet.

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-27 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17024902#comment-17024902
 ] 

Navinder Brar commented on KAFKA-9450:
--

Or do you suggest to never (ie, for EOS and non-EOS case) call 
`innerByteStore#flush()`? This might be possible but would have a negative 
impact on non-EOS as it would make current fault-tolerance mechanism for 
non-EOS less efficient (we would not have a guarantee on commit that data is 
flushed to disk and might need to recover more data from the changelog topic in 
case of failure). 

>>> [~mjsax] do you mean we still write to checkpoint file(for non-EOS) on 
>>> every commit but remove flush. That would be dangerous right? As if the 
>>> data from Rockdsdb is not flushed for the checkpoints that have been 
>>> written in checkpoint file we have lost the data and moved ahead as well. 

 

Can we add event listeners on Rocksdb(EventListener::OnFlushCompleted()) and 
whenever a particular store is flushed, commit the checkpoint for that 
particular store(changelog) in the checkpoint file. Currently, we are 
overriding most performance-based Rocksdb configs(memtable size, max writer 
buffers) by making the commit based on time. If this seems reasonable, I can 
work on this.

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-25 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17023643#comment-17023643
 ] 

Navinder Brar edited comment on KAFKA-9450 at 1/25/20 8:38 PM:
---

Great that I caught hold of this Jira. I had been meaning to start some 
discussion around decoupling flush and committing. This ticket only deals with 
EOS, is it because the commit interval for non EOS is 30 seconds? Rocksdb 
flushing at 30 seconds also is a pretty big issue for us. I think the default 
"Level0FileNumCompactionTrigger" in Rocksdb is 4. Since the default max writers 
in rocksdbStore.java are 3, so 2 memtables get flushed at every 30 seconds. 
When someone has multiple stores in topology this means that there at least is 
one store that is undergoing compaction every 30 seconds(at most 1 minute). I 
was tracking CPU usage a few days ago while facing increased latencies in the 
95th percentile every 30 seconds which was exactly overlapping with flush(which 
kind of leads to compaction) and there is huge bump in CPU usage every 30 
seconds. So, to overcome for now I have increased the commit interval in our 
system to 30 minutes(and even in 30 minutes our memtables are not full) that 
also is inefficient. Let me know if we can discuss non EOS also here or should 
I create a separate ticket for it. 


was (Author: navibrar):
Great that I caught hold of this Jira. I had been meaning to start some 
discussion around decoupling flush and committing. This ticket only deals with 
EOS, is it because the commit interval for non EOS is 30 seconds? Rocksdb 
flushing at 30 seconds also is a pretty big issue for us. I think the default 
"Level0FileNumCompactionTrigger" in Rocksdb is 4. Since the default max writers 
in rocksdbStore.java are 3, so 2 memtables get flushed at every 30 seconds. 
When someone has multiple stores in topology this means that there at least is 
one store that is undergoing compaction every 30 seconds(at most 1 minute). I 
was tracking CPU usage a few days ago while facing increased latencies in the 
99th percentile every 30 seconds which was exactly overlapping with flush(which 
kind of leads to compaction) and there is huge bump in CPU usage every 30 
seconds. So, to overcome for now I have increased the commit interval in our 
system to 30 minutes(and even in 30 minutes our memtables are not full) that 
also is inefficient. Let me know if we can discuss non EOS also here or should 
I create a separate ticket for it. 

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-25 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17023643#comment-17023643
 ] 

Navinder Brar commented on KAFKA-9450:
--

Great that I caught hold of this Jira. I had been meaning to start some 
discussion around decoupling flush and committing. This ticket only deals with 
EOS, is it because the commit interval for non EOS is 30 seconds? Rocksdb 
flushing at 30 seconds also is a pretty big issue for us. I think the default 
"Level0FileNumCompactionTrigger" in Rocksdb is 4. Since the default max writers 
in rocksdbStore.java are 3, so 2 memtables get flushed at every 30 seconds. 
When someone has multiple stores in topology this means that there at least is 
one store that is undergoing compaction every 30 seconds(at most 1 minute). I 
was tracking CPU usage a few days ago while facing increased latencies in the 
99th percentile every 30 seconds which was exactly overlapping with flush(which 
kind of leads to compaction) and there is huge bump in CPU usage every 30 
seconds. So, to overcome for now I have increased the commit interval in our 
system to 30 minutes(and even in 30 minutes our memtables are not full) that 
also is inefficient. Let me know if we can discuss non EOS also here or should 
I create a separate ticket for it. 

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9445:
-
Labels: KIP-562  (was: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance)

> Allow fetching a key from a single partition rather than iterating over all 
> the stores on an instance
> -
>
> Key: KAFKA-9445
> URL: https://issues.apache.org/jira/browse/KAFKA-9445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>  Labels: KIP-562
>
> Whenever a call is made to get a particular key from a Kafka Streams 
> instance, currently it returns a Queryable store that contains a list of the 
> stores for all the running and restoring/replica(with KIP-535) on the 
> instance via StreamThreadStateStoreProvider#stores(). This list of stores is 
> then provided to CompositeReadOnlyKeyValueStore#get() which looks into each 
> store one by one. With the changes that went in as a part of KIP-535 since we 
> have access to the information that a key belongs to which partition, we 
> should have a capability to fetch store for that particular partition and 
> look for key in store for that partition only. It would be a good improvement 
> for improving latencies for applications that contain multiple partitions on 
> a single instance and don't have bloom filters enabled internally for Rocksdb.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9445:
-
Labels: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
  (was: )

> Allow fetching a key from a single partition rather than iterating over all 
> the stores on an instance
> -
>
> Key: KAFKA-9445
> URL: https://issues.apache.org/jira/browse/KAFKA-9445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>  Labels: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
>
> Whenever a call is made to get a particular key from a Kafka Streams 
> instance, currently it returns a Queryable store that contains a list of the 
> stores for all the running and restoring/replica(with KIP-535) on the 
> instance via StreamThreadStateStoreProvider#stores(). This list of stores is 
> then provided to CompositeReadOnlyKeyValueStore#get() which looks into each 
> store one by one. With the changes that went in as a part of KIP-535 since we 
> have access to the information that a key belongs to which partition, we 
> should have a capability to fetch store for that particular partition and 
> look for key in store for that partition only. It would be a good improvement 
> for improving latencies for applications that contain multiple partitions on 
> a single instance and don't have bloom filters enabled internally for Rocksdb.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9445:
-
Description: Whenever a call is made to get a particular key from a Kafka 
Streams instance, currently it returns a Queryable store that contains a list 
of the stores for all the running and restoring/replica(with KIP-535) on the 
instance via StreamThreadStateStoreProvider#stores(). This list of stores is 
then provided to CompositeReadOnlyKeyValueStore#get() which looks into each 
store one by one. With the changes that went in as a part of KIP-535 since we 
have access to the information that a key belongs to which partition, we should 
have a capability to fetch store for that particular partition and look for key 
in store for that partition only. It would be a good improvement for improving 
latencies for applications that contain multiple partitions on a single 
instance and don't have bloom filters enabled internally for Rocksdb.

> Allow fetching a key from a single partition rather than iterating over all 
> the stores on an instance
> -
>
> Key: KAFKA-9445
> URL: https://issues.apache.org/jira/browse/KAFKA-9445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>
> Whenever a call is made to get a particular key from a Kafka Streams 
> instance, currently it returns a Queryable store that contains a list of the 
> stores for all the running and restoring/replica(with KIP-535) on the 
> instance via StreamThreadStateStoreProvider#stores(). This list of stores is 
> then provided to CompositeReadOnlyKeyValueStore#get() which looks into each 
> store one by one. With the changes that went in as a part of KIP-535 since we 
> have access to the information that a key belongs to which partition, we 
> should have a capability to fetch store for that particular partition and 
> look for key in store for that partition only. It would be a good improvement 
> for improving latencies for applications that contain multiple partitions on 
> a single instance and don't have bloom filters enabled internally for Rocksdb.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9445:
-
Description: (was: Currently when expanding the KS cluster, the new 
node's partitions will be unavailable during the rebalance, which for large 
states can take a very long time, or for small state stores even more than a 
few ms can be a deal-breaker for micro service use cases.

One workaround is to allow stale data to be read from the state stores when use 
case allows. Adding the use case from KAFKA-8994 as it is more descriptive.

"Consider the following scenario in a three node Streams cluster with node A, 
node S and node R, executing a stateful sub-topology/topic group with 1 
partition and `_num.standby.replicas=1_`  
 * *t0*: A is the active instance owning the partition, B is the standby that 
keeps replicating the A's state into its local disk, R just routes streams IQs 
to active instance using StreamsMetadata
 * *t1*: IQs pick node R as router, R forwards query to A, A responds back to R 
which reverse forwards back the results.
 * *t2:* Active A instance is killed and rebalance begins. IQs start failing to 
A
 * *t3*: Rebalance assignment happens and standby B is now promoted as active 
instance. IQs continue to fail
 * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
commit position, IQs continue to fail
 * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
start succeeding again

 

Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
take few seconds (~10 seconds based on defaults values). Depending on how laggy 
the standby B was prior to A being killed, t4 can take few seconds-minutes. 

While this behavior favors consistency over availability at all times, the long 
unavailability window might be undesirable for certain classes of applications 
(e.g simple caches or dashboards). 

This issue aims to also expose information about standby B to R, during each 
rebalance such that the queries can be routed by an application to a standby to 
serve stale reads, choosing availability over consistency.")

> Allow fetching a key from a single partition rather than iterating over all 
> the stores on an instance
> -
>
> Key: KAFKA-9445
> URL: https://issues.apache.org/jira/browse/KAFKA-9445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>  Labels: kip-535
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-9445:
-
Labels:   (was: kip-535)

> Allow fetching a key from a single partition rather than iterating over all 
> the stores on an instance
> -
>
> Key: KAFKA-9445
> URL: https://issues.apache.org/jira/browse/KAFKA-9445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Assignee: Navinder Brar
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9445) Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-16 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-9445:


 Summary: Allow fetching a key from a single partition rather than 
iterating over all the stores on an instance
 Key: KAFKA-9445
 URL: https://issues.apache.org/jira/browse/KAFKA-9445
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar
Assignee: Navinder Brar


Currently when expanding the KS cluster, the new node's partitions will be 
unavailable during the rebalance, which for large states can take a very long 
time, or for small state stores even more than a few ms can be a deal-breaker 
for micro service use cases.

One workaround is to allow stale data to be read from the state stores when use 
case allows. Adding the use case from KAFKA-8994 as it is more descriptive.

"Consider the following scenario in a three node Streams cluster with node A, 
node S and node R, executing a stateful sub-topology/topic group with 1 
partition and `_num.standby.replicas=1_`  
 * *t0*: A is the active instance owning the partition, B is the standby that 
keeps replicating the A's state into its local disk, R just routes streams IQs 
to active instance using StreamsMetadata
 * *t1*: IQs pick node R as router, R forwards query to A, A responds back to R 
which reverse forwards back the results.
 * *t2:* Active A instance is killed and rebalance begins. IQs start failing to 
A
 * *t3*: Rebalance assignment happens and standby B is now promoted as active 
instance. IQs continue to fail
 * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
commit position, IQs continue to fail
 * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
start succeeding again

 

Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
take few seconds (~10 seconds based on defaults values). Depending on how laggy 
the standby B was prior to A being killed, t4 can take few seconds-minutes. 

While this behavior favors consistency over availability at all times, the long 
unavailability window might be undesirable for certain classes of applications 
(e.g simple caches or dashboards). 

This issue aims to also expose information about standby B to R, during each 
rebalance such that the queries can be routed by an application to a standby to 
serve stale reads, choosing availability over consistency."



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9169) Standby Tasks point ask for incorrect offsets on resuming post suspension

2019-11-16 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16975693#comment-16975693
 ] 

Navinder Brar commented on KAFKA-9169:
--

I can confirm that this has been there since 1.1 version.

> Standby Tasks point ask for incorrect offsets on resuming post suspension
> -
>
> Key: KAFKA-9169
> URL: https://issues.apache.org/jira/browse/KAFKA-9169
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Navinder Brar
>Assignee: John Roesler
>Priority: Critical
> Fix For: 2.5.0
>
>
> In versions(check 2.0) where standby tasks are suspended on each rebalance 
> the checkpoint file is updated post the flush and the expected behaviour is 
> that post assignment the same standby task gets assigned back on the machine 
> it will start reading data from changelog from the same offset from it left 
> off. 
>  
> But there looks like a bug in the code, every time post rebalance it starts 
> reading from the offset from where it read the first time the task was 
> assigned on this machine. This has 2 repercussions:
>  # After every rebalance the standby tasks start restoring huge amount of 
> data which they have already restored earlier(Verified this via 300x increase 
> Network IO on all streams instances post rebalance even when no change in 
> assignment) .
>  # If changelog has time retention those offsets will not be available in the 
> changelog, which leads to offsetOutOfRange exceptions and the stores get 
> deleted and recreated again.
>  
> I have gone through the code and I think I know the issue.
> In TaskManager# updateNewAndRestoringTasks(), the function 
> assignStandbyPartitions() gets called for all the running standby tasks where 
> it populates the Map: checkpointedOffsets from the 
> standbyTask.checkpointedOffsets() which is only updated at the time of 
> initialization of a StandbyTask(i.e. in it's constructor). 
>  
> This has an easy fix.
> Post resumption we are reading standbyTask.checkpointedOffsets() to know the 
> offset from where the standby task should start running and not from 
> stateMgr.checkpointed() which gets updated on every commit to the checkpoint 
> file. In the former case it's always reading from the same offset, even those 
> which it had already read earlier and in cases where changelog topic has a 
> retention time, it gives offsetOutOfRange exception. So, 
> standbyTask.checkpointedOffsets() is quite useless and we should use 
> stateMgr.checkpointed() instead to return offsets to task manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9169) Standby Tasks point ask for incorrect offsets on resuming post suspension

2019-11-10 Thread Navinder Brar (Jira)
Navinder Brar created KAFKA-9169:


 Summary: Standby Tasks point ask for incorrect offsets on resuming 
post suspension
 Key: KAFKA-9169
 URL: https://issues.apache.org/jira/browse/KAFKA-9169
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar


In versions(check 2.0) where standby tasks are suspended on each rebalance the 
checkpoint file is updated post the flush and the expected behaviour is that 
post assignment the same standby task gets assigned back on the machine it will 
start reading data from changelog from the same offset from it left off. 

 

But there looks like a bug in the code, every time post rebalance it starts 
reading from the offset from where it read the first time the task was assigned 
on this machine. This has 2 repercussions:
 # After every rebalance the standby tasks start restoring huge amount of data 
which they have already restored earlier(Verified this via 300x increase 
Network IO on all streams instances post rebalance even when no change in 
assignment) .
 # If changelog has time retention those offsets will not be available in the 
changelog, which leads to offsetOutOfRange exceptions and the stores get 
deleted and recreated again.

 

I have gone through the code and I think I know the issue.

In TaskManager# updateNewAndRestoringTasks(), the function 
assignStandbyPartitions() gets called for all the running standby tasks where 
it populates the Map: checkpointedOffsets from the 
standbyTask.checkpointedOffsets() which is only updated at the time of 
initialization of a StandbyTask(i.e. in it's constructor). 

 

This has an easy fix.

Post resumption we are reading standbyTask.checkpointedOffsets() to know the 
offset from where the standby task should start running and not from 
stateMgr.checkpointed() which gets updated on every commit to the checkpoint 
file. In the former case it's always reading from the same offset, even those 
which it had already read earlier and in cases where changelog topic has a 
retention time, it gives offsetOutOfRange exception. So, 
standbyTask.checkpointedOffsets() is quite useless and we should use 
stateMgr.checkpointed() instead to return offsets to task manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6144) Allow serving interactive queries from in-sync Standbys

2019-10-21 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6144:
-
Description: 
Currently when expanding the KS cluster, the new node's partitions will be 
unavailable during the rebalance, which for large states can take a very long 
time, or for small state stores even more than a few ms can be a deal-breaker 
for micro service use cases.

One workaround is to allow stale data to be read from the state stores when use 
case allows. Adding the use case from KAFKA-8994 as it is more descriptive.

"Consider the following scenario in a three node Streams cluster with node A, 
node S and node R, executing a stateful sub-topology/topic group with 1 
partition and `_num.standby.replicas=1_`  
 * *t0*: A is the active instance owning the partition, B is the standby that 
keeps replicating the A's state into its local disk, R just routes streams IQs 
to active instance using StreamsMetadata
 * *t1*: IQs pick node R as router, R forwards query to A, A responds back to R 
which reverse forwards back the results.
 * *t2:* Active A instance is killed and rebalance begins. IQs start failing to 
A
 * *t3*: Rebalance assignment happens and standby B is now promoted as active 
instance. IQs continue to fail
 * *t4*: B fully catches up to changelog tail and rewinds offsets to A's last 
commit position, IQs continue to fail
 * *t5*: IQs to R, get routed to B, which is now ready to serve results. IQs 
start succeeding again

 

Depending on Kafka consumer group session/heartbeat timeouts, step t2,t3 can 
take few seconds (~10 seconds based on defaults values). Depending on how laggy 
the standby B was prior to A being killed, t4 can take few seconds-minutes. 

While this behavior favors consistency over availability at all times, the long 
unavailability window might be undesirable for certain classes of applications 
(e.g simple caches or dashboards). 

This issue aims to also expose information about standby B to R, during each 
rebalance such that the queries can be routed by an application to a standby to 
serve stale reads, choosing availability over consistency."

  was:
Currently when expanding the KS cluster, the new node's partitions will be 
unavailable during the rebalance, which for large states can take a very long 
time, or for small state stores even more than a few ms can be a deal breaker 
for micro service use cases.

One workaround is to allow stale data to be read from the state stores when use 
case allows.

Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
potentially a two phase rebalance


This is the description from KAFKA-6031 (keeping this JIRA as the title is more 
descriptive):

{quote}
Currently reads for a key are served by single replica, which has 2 drawbacks:
 - if replica is down there is a down time in serving reads for keys it was 
responsible for until a standby replica takes over
 - in case of semantic partitioning some replicas might become hot and there is 
no easy way to scale the read load

If standby replicas would have endpoints that are exposed in StreamsMetadata it 
would enable serving reads from several replicas, which would mitigate the 
above drawbacks. 
Due to the lag between replicas reading from multiple replicas simultaneously 
would have weaker (eventual) consistency comparing to reads from single 
replica. This however should be acceptable tradeoff in many cases.
{quote}


> Allow serving interactive queries from in-sync Standbys
> ---
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Navinder Brar
>Priority: Major
>  Labels: kip-535
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal-breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows. Adding the use case from KAFKA-8994 as it is more 
> descriptive.
> "Consider the following scenario in a three node Streams cluster with node A, 
> node S and node R, executing a stateful sub-topology/topic group with 1 
> partition and `_num.standby.replicas=1_`  
>  * *t0*: A is the active instance owning the partition, B is the standby that 
> keeps replicating the A's state into its local disk, R just routes streams 
> IQs to active instance using StreamsMetadata
>  * *t1*: IQs pick node R as router, R forwards query to A, A responds 

[jira] [Updated] (KAFKA-6144) Allow serving interactive queries from in-sync Standbys

2019-10-19 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6144:
-
Summary: Allow serving interactive queries from in-sync Standbys  (was: 
Allow state stores to serve stale reads during rebalance)

> Allow serving interactive queries from in-sync Standbys
> ---
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Navinder Brar
>Priority: Major
>  Labels: kip-535
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2019-10-19 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6144:
-
Summary: Allow state stores to serve stale reads during rebalance  (was: 
Allow serving interactive queries from in-sync Standbys)

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Navinder Brar
>Priority: Major
>  Labels: kip-535
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6144) Allow serving interactive queries from in-sync Standbys

2019-10-19 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6144:
-
Labels: kip-535  (was: needs-kip)

> Allow serving interactive queries from in-sync Standbys
> ---
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Navinder Brar
>Priority: Major
>  Labels: kip-535
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6144) Allow serving interactive queries from in-sync Standbys

2019-10-19 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6144:
-
Summary: Allow serving interactive queries from in-sync Standbys  (was: 
Allow state stores to serve stale reads during rebalance)

> Allow serving interactive queries from in-sync Standbys
> ---
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Navinder Brar
>Priority: Major
>  Labels: needs-kip
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2019-10-17 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16954259#comment-16954259
 ] 

Navinder Brar commented on KAFKA-6144:
--

Can I assign this to myself as KIP is already created. 

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2019-10-13 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16950420#comment-16950420
 ] 

Navinder Brar commented on KAFKA-6144:
--

Thanks, [~mjsax]. Did the needful. 

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2019-10-13 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16950326#comment-16950326
 ] 

Navinder Brar commented on KAFKA-6144:
--

[~vinoth] started a KIP today: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance].
 

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2019-10-09 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947780#comment-16947780
 ] 

Navinder Brar edited comment on KAFKA-6144 at 10/9/19 3:18 PM:
---

Sure [~vinoth] I can do that. In my current project I have already enabled read 
from replicas, so we can discuss that as well if you want to. For that, I had 
to make this change in metadata.            
!image-2019-10-09-20-47-38-096.png|width=146,height=237!


was (Author: navibrar):
Sure [~vinoth] I can do that. In my current project I have already enabled read 
from replicas, so we can discuss that as well if you want to. For that I had to 
make this change in metadata. 
!image-2019-10-09-20-33-37-423.png|width=161,height=233!

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
> Attachments: image-2019-10-09-20-33-37-423.png, 
> image-2019-10-09-20-47-38-096.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2019-10-09 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947780#comment-16947780
 ] 

Navinder Brar commented on KAFKA-6144:
--

Sure [~vinoth] I can do that. In my current project I have already enabled read 
from replicas, so we can discuss that as well if you want to. For that I had to 
make this change in metadata. 
!image-2019-10-09-20-33-37-423.png|width=161,height=233!

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
> Attachments: image-2019-10-09-20-33-37-423.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2019-10-09 Thread Navinder Brar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6144:
-
Attachment: image-2019-10-09-20-33-37-423.png

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
> Attachments: image-2019-10-09-20-33-37-423.png
>
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2019-10-08 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947349#comment-16947349
 ] 

Navinder Brar edited comment on KAFKA-6144 at 10/9/19 4:28 AM:
---

[~vinoth] sorry completely forgot about this one. I had started writing it up 
and lost track. I will complete this weekend, if it's not done by then you can 
take it up.


was (Author: navibrar):
[~vinoth] sorry completely forgot about this one. I had started writing it up 
and lost track. I will complete this weekend?

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2019-10-08 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16947349#comment-16947349
 ] 

Navinder Brar commented on KAFKA-6144:
--

[~vinoth] sorry completely forgot about this one. I had started writing it up 
and lost track. I will complete this weekend?

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file

2019-10-08 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16946698#comment-16946698
 ] 

Navinder Brar commented on KAFKA-5413:
--

Can we manually delete .index and .log files for that partition if we don't 
care about data loss as our consumer group is disbaled now, which was catered 
by the consumer offsets partition which is having this issue.

> Log cleaner fails due to large offset in segment file
> -
>
> Key: KAFKA-5413
> URL: https://issues.apache.org/jira/browse/KAFKA-5413
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0
>Reporter: Nicholas Ngorok
>Assignee: Kelvin Rutt
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.2, 0.11.0.0
>
> Attachments: .index.cleaned, 
> .log, .log.cleaned, 
> .timeindex.cleaned, 002147422683.log, 
> kafka-5413.patch
>
>
> The log cleaner thread in our brokers is failing with the trace below
> {noformat}
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 
> 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp 
> Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. 
> (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: largest offset in 
> message set can not be safely converted to relative offset.
> at scala.Predef$.require(Predef.scala:224)
> at kafka.log.LogSegment.append(LogSegment.scala:109)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {noformat}
> This seems to point at the specific line [here| 
> https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92]
>  in the kafka src where the difference is actually larger than MAXINT as both 
> baseOffset and offset are of type long. It was introduced in this [pr| 
> https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631]
> These were the outputs of dumping the first two log segments
> {noformat}
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0.log
> Dumping /kafka-logs/__consumer_offsets-12/.log
> Starting offset: 0
> offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 
> -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0002147343575.log
> Dumping /kafka-logs/__consumer_offsets-12/002147343575.log
> Starting offset: 2147343575
> offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo
> adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34
> {noformat}
> My guess is that since 2147539884 is larger than MAXINT, we are hitting this 
> exception. Was there a specific reason, this check was added in 0.10.2?
> E.g. if the first offset is a key = "key 0" and then we have MAXINT + 1 of 
> "key 1" following, wouldn't we run into this situation whenever the log 
> cleaner runs?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2019-08-05 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16900634#comment-16900634
 ] 

Navinder Brar commented on KAFKA-7149:
--

[~vinoth] yeah feel free to reassign this to yourself.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Navinder Brar
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2019-01-16 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744195#comment-16744195
 ] 

Navinder Brar commented on KAFKA-7149:
--

[~mjsax] The PR has passed all the checks and code review comments have been 
implemented. Please take further action.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Navinder Brar
>Priority: Major
> Fix For: 2.2.0
>
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2019-01-15 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16742857#comment-16742857
 ] 

Navinder Brar commented on KAFKA-7149:
--

[~mjsax] yeah I had submitted a PR, there were some core test cases which were 
failing I guess because I had not done a rebase, I will rebase and send a PR 
again and update here.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Navinder Brar
>Priority: Major
> Fix For: 2.2.0
>
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2018-12-10 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715471#comment-16715471
 ] 

Navinder Brar commented on KAFKA-6144:
--

Sure [~NIzhikov] , I will start writing a KIP.

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2018-12-09 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16714346#comment-16714346
 ] 

Navinder Brar commented on KAFKA-6144:
--

I have made an account by id: navinder_b...@yahoo.com. You can add me and I can 
start working on KIP. Thanks in advance.

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: needs-kip
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2018-12-04 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16709086#comment-16709086
 ] 

Navinder Brar commented on KAFKA-6144:
--

Is someone working on a patch for this? If I understand correctly we need to 
add replica topic partitions in the StreamsMetadata which will be used during 
get requests in getting host for key(which currently is just looking at active 
partitions). I have made these changes in my local fork if you want me to send 
a PR for this, I can do it.

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: needs-kip
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-14 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615354#comment-16615354
 ] 

Navinder Brar commented on KAFKA-7149:
--

Yes, I agree with all you explained above. But for all the consumers 
*tasksByHost* map is common, so I am suggesting instead of sending it in 
encoded *AssignmentInfo* for each consumer, send it once in a map in 
*Assignment* object as I suggested above to the broker coordinator and do code 
changes in the broker coordinator to send *tasksByHost* map along with the 
individual assignments to each consumer. So, all the consumers will receive 
global tasksByHost along with their own assignments.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-14 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615271#comment-16615271
 ] 

Navinder Brar edited comment on KAFKA-7149 at 9/14/18 7:24 PM:
---

Hi [~guozhang] What I mean is currently the Assignment which is shared to Group 
Coordinator looks like this:
{code:java}
[{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: 
{activePartitions2, assignmentInfo2}},  ]{code}
where
{code:java}
AssignmentInfo=
{List activeTasks, Map> standbyTasks, 
Map> partitionsByHost}
 
{code}
Now in the first version, I am changing this AssignmentInfo to:

*V1:*
{code:java}
AssignmentInfo=
{List activeTasks, Map> standbyTasks, 
Map> tasksByHost}
{code}
 

But, my point is if there are 500 consumers, the tasksByHost map will be same 
for all, which will contain global Assignment. But we are unnecessarily sending 
this same map inside the Assignment array for all the consumers. Instead, we 
can some an object like something below which is shared with GroupCoordinator.

*V2:*
{code:java}
Assignment= {Map> tasksByHost, [{consumer1: 
{activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, 
assignmentInfo2}},  ]}{code}
where
{code:java}
AssignmentInfo= {List activeTasks, Map> 
standbyTasks}{code}


was (Author: navibrar):
Hi [~guozhang] What I mean is currently the Assignment which is shared to Group 
Coordinator looks like this:
{code:java}
[{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: 
{activePartitions2, assignmentInfo2}},  ]{code}
where
{code:java}
AssignmentInfo=
{List activeTasks, Map> standbyTasks, 
Map> partitionsByHost}
 
{code}
Now in the first version, I am changing this AssignmentInfo to:

*V1:*

 
{code:java}
AssignmentInfo=
{List activeTasks, Map> standbyTasks, 
Map> tasksByHost}
{code}
 

 

But, my point is if there are 500 consumers, the tasksByHost map will be same 
for all, which will contain global Assignment. But we are unnecessarily sending 
this same map inside the Assignment array for all the consumers. Instead, we 
can some an object like something below which is shared with GroupCoordinator.

*V2:* 
{code:java}
Assignment= {Map> tasksByHost, [{consumer1: 
{activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, 
assignmentInfo2}},  ]}{code}
where
{code:java}
AssignmentInfo= {List activeTasks, Map> 
standbyTasks}{code}

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-14 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16615271#comment-16615271
 ] 

Navinder Brar commented on KAFKA-7149:
--

Hi [~guozhang] What I mean is currently the Assignment which is shared to Group 
Coordinator looks like this:
{code:java}
[{consumer1: {activePartitions1, assignmentInfo1}}, {consumer2: 
{activePartitions2, assignmentInfo2}},  ]{code}
where
{code:java}
AssignmentInfo=
{List activeTasks, Map> standbyTasks, 
Map> partitionsByHost}
 
{code}
Now in the first version, I am changing this AssignmentInfo to:

*V1:*

 
{code:java}
AssignmentInfo=
{List activeTasks, Map> standbyTasks, 
Map> tasksByHost}
{code}
 

 

But, my point is if there are 500 consumers, the tasksByHost map will be same 
for all, which will contain global Assignment. But we are unnecessarily sending 
this same map inside the Assignment array for all the consumers. Instead, we 
can some an object like something below which is shared with GroupCoordinator.

*V2:* 
{code:java}
Assignment= {Map> tasksByHost, [{consumer1: 
{activePartitions1, assignmentInfo1}}, {consumer2: {activePartitions2, 
assignmentInfo2}},  ]}{code}
where
{code:java}
AssignmentInfo= {List activeTasks, Map> 
standbyTasks}{code}

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-14 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611610#comment-16611610
 ] 

Navinder Brar edited comment on KAFKA-7149 at 9/14/18 8:01 AM:
---

[~guozhang] [~mjsax] I am working with [~asurana] on raising this PR, will send 
it in a couple of days. Currently, the changes I have made is using taskIds 
instead of topicPartitions in AssignmentInfo. But another thing I observed is 
we are sending the same assignmentInfo to all consumers, so we are replicating 
the complete assignment(of all hosts and partitions) to all the consumers. 
Maybe we can take out partitionsByHost(newly TaskIdsByHost) map from the 
consumers array so that it is not replicated for all the hosts and is sent just 
once. With the current changes(changing TopicPartitions to TaskIDs and using 
GZIP compression) I have reduced assignment size(on 400 hosts with 3 threads 
each, having 512 partitions) from 196 MBs to 8 MB). If we can stop the 
replication of partitionsByHost on each consumer, the assignment size can be 
reduced to a few hundred kbs). Please share your thoughts.


was (Author: navibrar):
[~guozhang] I am working with [~asurana] on raising this PR, will send it in a 
couple of days. Currently, the changes I have made is using taskIds instead of 
topicPartitions in AssignmentInfo. But another thing I observed is we are 
sending the same assignmentInfo to all consumers, so we are replicating the 
complete assignment(of all hosts and partitions) to all the consumers. Maybe we 
can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array 
so that it is not replicated for all the hosts and is sent just once. With the 
current changes(changing TopicPartitions to TaskIDs and using GZIP compression) 
I have reduced assignment size(on 400 hosts with 3 threads each, having 512 
partitions) from 196 MBs to 8 MB). If we can stop the replication of 
partitionsByHost on each consumer, the assignment size can be reduced to a few 
hundred kbs). Please share your thoughts.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-12 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611610#comment-16611610
 ] 

Navinder Brar edited comment on KAFKA-7149 at 9/12/18 7:00 AM:
---

[~guozhang] I am working with [~asurana] on raising this PR, will send it in a 
couple of days. Currently, the changes I have made is using taskIds instead of 
topicPartitions in AssignmentInfo. But another thing I observed is we are 
sending the same assignmentInfo to all consumers, so we are replicating the 
complete assignment(of all hosts and partitions) to all the consumers. Maybe we 
can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array 
so that it is not replicated for all the hosts and is sent just once. With the 
current changes(changing TopicPartitions to TaskIDs and using GZIP compression) 
I have reduced assignment size(on 400 hosts with 3 threads each, having 512 
partitions) from 196 MBs to 8 MB). If we can stop the replication of 
partitionsByHost on each consumer, the assignment size can be reduced to a few 
hundred kbs). Please share your thoughts.


was (Author: navibrar):
I am working with [~asurana] on raising this PR, will send it in a couple of 
days. Currently, the changes I have made is using taskIds instead of 
topicPartitions in AssignmentInfo. But another thing I observed is we are 
sending the same assignmentInfo to all consumers, so we are replicating the 
complete assignment(of all hosts and partitions) to all the consumers. Maybe we 
can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array 
so that it is not replicated for all the hosts and is sent just once. With the 
current changes(changing TopicPartitions to TaskIDs and using GZIP compression) 
I have reduced assignment size(on 400 hosts with 3 threads each, having 512 
partitions) from 196 MBs to 8 MB). If we can stop the replication of 
partitionsByHost on each consumer, the assignment size can be reduced to a few 
hundred kbs). Please share your thoughts.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-09-11 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16611610#comment-16611610
 ] 

Navinder Brar commented on KAFKA-7149:
--

I am working with [~asurana] on raising this PR, will send it in a couple of 
days. Currently, the changes I have made is using taskIds instead of 
topicPartitions in AssignmentInfo. But another thing I observed is we are 
sending the same assignmentInfo to all consumers, so we are replicating the 
complete assignment(of all hosts and partitions) to all the consumers. Maybe we 
can take out partitionsByHost(newly TaskIdsByHost) map from the consumers array 
so that it is not replicated for all the hosts and is sent just once. With the 
current changes(changing TopicPartitions to TaskIDs and using GZIP compression) 
I have reduced assignment size(on 400 hosts with 3 threads each, having 512 
partitions) from 196 MBs to 8 MB). If we can stop the replication of 
partitionsByHost on each consumer, the assignment size can be reduced to a few 
hundred kbs). Please share your thoughts.

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ashish Surana
>Assignee: Ashish Surana
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6924) Making state store queryable on replicas

2018-05-22 Thread Navinder Brar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16483611#comment-16483611
 ] 

Navinder Brar commented on KAFKA-6924:
--

Hi [~guozhang] thanks for pointing it out. I believe it is same. We can close 
it and track there.

> Making state store queryable on replicas
> 
>
> Key: KAFKA-6924
> URL: https://issues.apache.org/jira/browse/KAFKA-6924
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> State store in Kafka streams are currently only queryable when StreamTask is 
> in RUNNING state. The idea is to make it queryable even for StandbyTasks to 
> decrease the downtime if client is not able to fetch data from Active machine.
> Suppose the coordinator is not able to connect to machine which had active 
> partition due to some reason. So, rather than failing that request we could 
> serve request from replica which could be on some other machine. Although 
> state on replica might be little behind the active but it could still be 
> beneficial in some cases to serve request from replica than failing the 
> request.
> It's very important improvement as it could simply improve the availability 
> of microservices developed using kafka streams.
> I am working on a patch for this change. Any feedback or comments are welcome.
> Also, I have gone thorugh https://issues.apache.org/jira/browse/KAFKA-6031, 
> has this been implemented?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has LIMITED retention time

2018-03-13 Thread Navinder Brar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6643:
-
Description: 
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.

I have few proposals in that respect.
1. Use a new kafka topic related to each partition which we need to warm up on 
the fly(when node containing that partition crashes. Produce into this topic 
from another replica/active and built new replica through this topic.
2. Use peer to peer file transfer(such as SFTP) as rocksdb can create backups, 
which can be transferred from source node to destination node when a new 
replica has to be built from scratch.
3. Use HDFS in intermediate instead of kafka topic where we can keep scheduled 
backups for each partition and use those to build new replicas.

  was:
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.

I have few proposals in that respect.
1. Use a new kafka topic related to each partition which we need to warm up on 
the fly(when node containing that partition crashes. Produce into this topic 
from another replica/active and built new replica through this topic.
2. Use peer to peer file transfer as rocksdb can create backups, which can be 
transferred from source node to destination node when a new replica has to be 
built from scratch.
3. Use HDFS in intermediate instead of kafka topic where we can keep scheduled 
backups for each partition and use those to build new replicas.


> Warm up new replicas from scratch when changelog topic has LIMITED retention 
> time
> -
>
> Key: KAFKA-6643
> URL: https://issues.apache.org/jira/browse/KAFKA-6643
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> In the current scenario, Kafka Streams has changelog Kafka topics(internal 
> topics having all the data for the store) which are used to build the state 
> of replicas. So, if we keep the number of standby replicas as 1, we still 
> have more availability for persistent state stores as changelog Kafka topics 
> are also replicated depending upon broker replication policy but that also 
> means we are using at least 4 times the space(1 master store, 1 replica 
> store, 1 changelog, 1 changelog replica). 
> Now if we have an year's data in persistent stores(rocksdb), we don't want 
> the changelog topics to have an year's data as it will put an unnecessary 
> burden on brokers(in terms of space). If we have to scale our kafka streams 
> application(having 200-300 TB's of data) we have to scale the kafka brokers 
> as well. We want to reduce this dependency and find out ways to just use 
> changelog topic as a queue, having just 2 or 3 days of data and warm up the 
> replicas from scratch in some other way.
> I have few proposals in that respect.
> 1. Use a new kafka topic related to each partition which we need to warm up 
> on 

[jira] [Updated] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has LIMITED retention time

2018-03-12 Thread Navinder Brar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6643:
-
Description: 
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.

I have few proposals in that respect.
1. Use a new kafka topic related to each partition which we need to warm up on 
the fly(when node containing that partition crashes. Produce into this topic 
from another replica/active and built new replica through this topic.
2. Use peer to peer file transfer as rocksdb can create backups, which can be 
transferred from source node to destination node when a new replica has to be 
built from scratch.
3. Use HDFS in intermediate instead of kafka topic where we can keep scheduled 
backups for each partition and use those to build new replicas.

  was:
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.

I have few proposals in that respect.
1. Use a new kafka topic related to each partition whi


> Warm up new replicas from scratch when changelog topic has LIMITED retention 
> time
> -
>
> Key: KAFKA-6643
> URL: https://issues.apache.org/jira/browse/KAFKA-6643
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> In the current scenario, Kafka Streams has changelog Kafka topics(internal 
> topics having all the data for the store) which are used to build the state 
> of replicas. So, if we keep the number of standby replicas as 1, we still 
> have more availability for persistent state stores as changelog Kafka topics 
> are also replicated depending upon broker replication policy but that also 
> means we are using at least 4 times the space(1 master store, 1 replica 
> store, 1 changelog, 1 changelog replica). 
> Now if we have an year's data in persistent stores(rocksdb), we don't want 
> the changelog topics to have an year's data as it will put an unnecessary 
> burden on brokers(in terms of space). If we have to scale our kafka streams 
> application(having 200-300 TB's of data) we have to scale the kafka brokers 
> as well. We want to reduce this dependency and find out ways to just use 
> changelog topic as a queue, having just 2 or 3 days of data and warm up the 
> replicas from scratch in some other way.
> I have few proposals in that respect.
> 1. Use a new kafka topic related to each partition which we need to warm up 
> on the fly(when node containing that partition crashes. Produce into this 
> topic from another replica/active and built new replica through this topic.
> 2. Use peer to peer file transfer as rocksdb can create backups, which can be 
> transferred from source node to destination node when a new replica has to be 
> built from scratch.
> 3. Use HDFS in intermediate instead of kafka topic where we can keep 
> scheduled backups for each partition and use those to build new replicas.



--
This message was sent 

[jira] [Updated] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has LIMITED retention time

2018-03-12 Thread Navinder Brar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6643:
-
Summary: Warm up new replicas from scratch when changelog topic has LIMITED 
retention time  (was: Warm up new replicas from scratch when changelog topic 
has retention time)

> Warm up new replicas from scratch when changelog topic has LIMITED retention 
> time
> -
>
> Key: KAFKA-6643
> URL: https://issues.apache.org/jira/browse/KAFKA-6643
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> In the current scenario, Kafka Streams has changelog Kafka topics(internal 
> topics having all the data for the store) which are used to build the state 
> of replicas. So, if we keep the number of standby replicas as 1, we still 
> have more availability for persistent state stores as changelog Kafka topics 
> are also replicated depending upon broker replication policy but that also 
> means we are using at least 4 times the space(1 master store, 1 replica 
> store, 1 changelog, 1 changelog replica). 
> Now if we have an year's data in persistent stores(rocksdb), we don't want 
> the changelog topics to have an year's data as it will put an unnecessary 
> burden on brokers(in terms of space). If we have to scale our kafka streams 
> application(having 200-300 TB's of data) we have to scale the kafka brokers 
> as well. We want to reduce this dependency and find out ways to just use 
> changelog topic as a queue, having just 2 or 3 days of data and warm up the 
> replicas from scratch in some other way.
> I have few proposals in that respect.
> 1. Use a new kafka topic related to each partition whi



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has retention time

2018-03-12 Thread Navinder Brar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Navinder Brar updated KAFKA-6643:
-
Description: 
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.

I have few proposals in that respect.
1. Use a new kafka topic related to each partition whi

  was:
In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.


> Warm up new replicas from scratch when changelog topic has retention time
> -
>
> Key: KAFKA-6643
> URL: https://issues.apache.org/jira/browse/KAFKA-6643
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Navinder Brar
>Priority: Major
>
> In the current scenario, Kafka Streams has changelog Kafka topics(internal 
> topics having all the data for the store) which are used to build the state 
> of replicas. So, if we keep the number of standby replicas as 1, we still 
> have more availability for persistent state stores as changelog Kafka topics 
> are also replicated depending upon broker replication policy but that also 
> means we are using at least 4 times the space(1 master store, 1 replica 
> store, 1 changelog, 1 changelog replica). 
> Now if we have an year's data in persistent stores(rocksdb), we don't want 
> the changelog topics to have an year's data as it will put an unnecessary 
> burden on brokers(in terms of space). If we have to scale our kafka streams 
> application(having 200-300 TB's of data) we have to scale the kafka brokers 
> as well. We want to reduce this dependency and find out ways to just use 
> changelog topic as a queue, having just 2 or 3 days of data and warm up the 
> replicas from scratch in some other way.
> I have few proposals in that respect.
> 1. Use a new kafka topic related to each partition whi



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6643) Warm up new replicas from scratch when changelog topic has retention time

2018-03-12 Thread Navinder Brar (JIRA)
Navinder Brar created KAFKA-6643:


 Summary: Warm up new replicas from scratch when changelog topic 
has retention time
 Key: KAFKA-6643
 URL: https://issues.apache.org/jira/browse/KAFKA-6643
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Navinder Brar


In the current scenario, Kafka Streams has changelog Kafka topics(internal 
topics having all the data for the store) which are used to build the state of 
replicas. So, if we keep the number of standby replicas as 1, we still have 
more availability for persistent state stores as changelog Kafka topics are 
also replicated depending upon broker replication policy but that also means we 
are using at least 4 times the space(1 master store, 1 replica store, 1 
changelog, 1 changelog replica). 

Now if we have an year's data in persistent stores(rocksdb), we don't want the 
changelog topics to have an year's data as it will put an unnecessary burden on 
brokers(in terms of space). If we have to scale our kafka streams 
application(having 200-300 TB's of data) we have to scale the kafka brokers as 
well. We want to reduce this dependency and find out ways to just use changelog 
topic as a queue, having just 2 or 3 days of data and warm up the replicas from 
scratch in some other way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)