[jira] [Commented] (KAFKA-3522) Consider adding version information into rocksDB storage format

2019-01-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747388#comment-16747388
 ] 

ASF GitHub Bot commented on KAFKA-3522:
---

mjsax commented on pull request #6150: KAFKA-3522: Add internal RecordConverter 
interface
URL: https://github.com/apache/kafka/pull/6150
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consider adding version information into rocksDB storage format
> ---
>
> Key: KAFKA-3522
> URL: https://issues.apache.org/jira/browse/KAFKA-3522
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: architecture
>
> Kafka Streams does not introduce any modifications to the data format in the 
> underlying Kafka protocol, but it does use RocksDB for persistent state 
> storage, and currently its data format is fixed and hard-coded. We want to 
> consider the evolution path in the future we we change the data format, and 
> hence having some version info stored along with the storage file / directory 
> would be useful.
> And this information could be even out of the storage file; for example, we 
> can just use a small "version indicator" file in the rocksdb directory for 
> this purposes. Thoughts? [~enothereska] [~jkreps]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7018) persist memberId for consumer restart

2019-01-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747366#comment-16747366
 ] 

ASF GitHub Bot commented on KAFKA-7018:
---

abbccdda commented on pull request #6177: KAFKA-7018 & KIP-345 part-one: Add 
static membership logic to JoinGroup protocol
URL: https://github.com/apache/kafka/pull/6177
 
 
   This is the first diff for the implementation of JoinGroup logic for static 
membership. The goal of this diff contains:
   
   1. Add `group.instance.id` to be unique identifier for consumer instances, 
provided by end user;
   2. Modify group coordinator to accept JoinGroupRequest with/without static 
membership, refactor the logic for readability and code reusability.
   3. Add client side support for incorporating static membership changes, 
including new config for `group.instance.id`, apply stream thread client id by 
default, and new join group exception handling.
   4. Remove `internal.leave.on.close` config by checking whether 
`group.instance.id` is defined. Effectively speaking, only dynamic member will 
send LeaveGroupRequest while static membership expiration is only controlled 
through session timeout.
   5. Increase max session timeout to 30 min for more user flexibility if they 
are inclined to tolerate partial unavailability than burdening rebalance.
   6. Unit tests for each module changes, especially on the group coordinator 
logic. Crossing the possibilities like:
 6.1 Dynamic/Static member
 6.2 Known/Unknown member id
 6.3 Group stable/unstable
   
   The hope here is to merge this logic before 2.2 code freeze so that we (as 
Pinterest) could start experimenting on the core logic ASAP.
   
   The rest of the 345 change will be broken down to 4 separate diffs:
   
   1. Avoid kicking out members through rebalance.timeout, only do the kick out 
through session timeout.
   2. Changes around LeaveGroup logic, including version bumping, broker logic, 
client logic, etc.
   3. Admin client changes to add ability to batch remove static members
   4. Deprecate group.initial.rebalance.delay
   
   Let me know your thoughts @guozhangwang @hachikuji @stanislavkozlovski 
@MayureshGharat @kkonstantine @lindong28 @Ishiihara @shawnsnguyen , thanks!
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> persist memberId for consumer restart
> -
>
> Key: KAFKA-7018
> URL: https://issues.apache.org/jira/browse/KAFKA-7018
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In group coordinator, there is a logic to neglect join group request from 
> existing follower consumers:
> {code:java}
> case Empty | Stable =>
>   if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
> // if the member id is unknown, register the member to the group
> addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, 
> clientHost, protocolType, protocols, group, responseCallback)
>   } else {
> val member = group.get(memberId)
> if (group.isLeader(memberId) || !member.matches(protocols)) {
>   // force a rebalance if a member has changed metadata or if the leader 
> sends JoinGroup.
>   // The latter allows the leader to trigger rebalances for changes 
> affecting assignment
>   // which do not affect the member metadata (such as topic metadata 
> changes for the consumer)
>   updateMemberAndRebalance(group, member, protocols, responseCallback)
> } else {
>   // for followers with no actual change to their metadata, just return 
> group information
>   // for the current generation which will allow them to issue SyncGroup
>   responseCallback(JoinGroupResult(
> members = Map.empty,
> memberId = memberId,
> generationId = group.generationId,
> subProtocol = group.protocolOrNull,
> leaderId = group.leaderOrNull,
> error = Errors.NONE))
> }
> {code}
> While looking at the AbstractCoordinator, I found that the generation was 
> hard-coded as 
> NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the 
> first join group request. This means we will treat the restarted consumer as 
> a new member, so the rebalance will be triggered until session timeout.
> I'm trying to c

[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2019-01-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7641:
---
Component/s: core

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Boyang Chen
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4453) add request prioritization

2019-01-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4453:
---
Component/s: core

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
> Fix For: 2.2.0
>
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2019-01-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7641:
---
Description: 
In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason 
concluded an edge case of current consumer protocol which could cause memory 
burst on broker side:

```the case we observed in practice was caused by a consumer that was slow to 
rejoin the group after a rebalance had begun. At the same time, there were new 
members that were trying to join the group for the first time. The request 
timeout was significantly lower than the rebalance timeout, so the JoinGroup of 
the new members kept timing out. The timeout caused a retry and the group size 
eventually become quite large because we could not detect the fact that the new 
members were no longer there.```

Since many disorganized join group requests are spamming the group metadata, we 
should define a cap on broker side to avoid one consumer group from growing too 
large. So far I feel it's appropriate to introduce this as a server config 
since most times this value is only dealing with error scenarios, client users 
shouldn't worry about this config.

KIP-389: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit]

 

  was:
In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason 
concluded an edge case of current consumer protocol which could cause memory 
burst on broker side:

```the case we observed in practice was caused by a consumer that was slow to 
rejoin the group after a rebalance had begun. At the same time, there were new 
members that were trying to join the group for the first time. The request 
timeout was significantly lower than the rebalance timeout, so the JoinGroup of 
the new members kept timing out. The timeout caused a retry and the group size 
eventually become quite large because we could not detect the fact that the new 
members were no longer there.```

Since many disorganized join group requests are spamming the group metadata, we 
should define a cap on broker side to avoid one consumer group from growing too 
large. So far I feel it's appropriate to introduce this as a server config 
since most times this value is only dealing with error scenarios, client users 
shouldn't worry about this config.

 


> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Boyang Chen
>Assignee: Stanislav Kozlovski
>Priority: Major
>  Labels: kip
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
> KIP-389: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2019-01-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7641:
---
Labels: kip  (was: )

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Boyang Chen
>Assignee: Stanislav Kozlovski
>Priority: Major
>  Labels: kip
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4453) add request prioritization

2019-01-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4453:
---
Description: 
Today all requests (client requests, broker requests, controller requests) to a 
broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata[1], rejecting 
ProduceRequests and FetchRequests[2], and data loss (for some unofficial[3] 
definition of data loss in terms of messages beyond the high watermark)[4].

We'd like to minimize the number of requests processed based on stale state. 
With request prioritization, controller requests get processed before regular 
queued up requests, so requests can get processed with up-to-date state.

[1] Say a client's MetadataRequest is sitting infront of a controller's 
UpdateMetadataRequest on a given broker's request queue. Suppose the 
MetadataRequest is for a topic whose partitions have recently undergone 
leadership changes and that these leadership changes are being broadcasted from 
the controller in the later UpdateMetadataRequest. Today the broker processes 
the MetadataRequest before processing the UpdateMetadataRequest, meaning the 
metadata returned to the client will be stale. The client will waste a 
roundtrip sending requests to the stale partition leader, get a 
NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
topic metadata again.
 [2] Clients can issue ProduceRequests to the wrong broker based on stale 
metadata, causing rejected ProduceRequests. Based on how long the client acts 
based on the stale metadata, the impact may or may not be visible to a producer 
application. If the number of rejected ProduceRequests does not exceed the max 
number of retries, the producer application would not be impacted. On the other 
hand, if the retries are exhausted, the failed produce will be visible to the 
producer application.
 [3] The official definition of data loss in kafka is when we lose a 
"committed" message. A message is considered "committed" when all in sync 
replicas for that partition have applied it to their log.
 [4] Say a number of ProduceRequests are sitting infront of a controller's 
LeaderAndIsrRequest on a given broker's request queue. Suppose the 
ProduceRequests are for partitions whose leadership has recently shifted out 
from the current broker to another broker in the replica set. Today the broker 
processes the ProduceRequests before the LeaderAndIsrRequest, meaning the 
ProduceRequests are getting processed on the former partition leader. As part 
of becoming a follower for a partition, the broker truncates the log to the 
high-watermark. With weaker ack settings such as acks=1, the leader may 
successfully write to its own log, respond to the user with a success, process 
the LeaderAndIsrRequest making the broker a follower of the partition, and 
truncate the log to a point before the user's produced messages. So users have 
a false sense that their produce attempt succeeded while in reality their 
messages got erased. While technically part of what they signed up for with 
acks=1, it can still come as a surprise.

KIP-291: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%3A+Separating+controller+connections+and+requests+from+the+data+plane]

  was:
Today all requests (client requests, broker requests, controller requests) to a 
broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata\[1\], rejecting 
ProduceRequests and FetchRequests\[2\], and data loss (for some unofficial\[3\] 
definition of data loss in terms of messages beyond the high watermark)\[4\].

We'd like to minimize the number of requests processed based on stale state. 
With request prioritization, controller requests get processed before regular 
queued up requests, so requests can get processed with up-to-date state.

\[1\] Say a client's MetadataRequest is sitting infront of a controller's 
UpdateMetadataRequest on a given broker's request queue. Suppose the 
MetadataRequest is for a topic whose partitions have recently undergone 
leadership changes and that these leadership changes are being broadcasted from 
the controller in the later UpdateMetadataRequest. Today the broker processes 
the MetadataRequest before processing the UpdateMetadataRequest, meaning the 
metadata returned to the client will be stale. The client will waste a 
roundtrip sending requests to the stale partitio

[jira] [Updated] (KAFKA-4453) add request prioritization

2019-01-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4453:
---
Labels: kip  (was: )

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3522) Consider adding version information into rocksDB storage format

2019-01-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747233#comment-16747233
 ] 

ASF GitHub Bot commented on KAFKA-3522:
---

mjsax commented on pull request #6175: KAFKA-3522: Add public interfaces for 
timestamped stores
URL: https://github.com/apache/kafka/pull/6175
 
 
   Add public interfaces for KIP-258.
   
   - `RecordConverter` is omitted because it added via #6150 
   - `Stores` is omitted, because we need to add corresponding store builder 
classed first
   
   Additionally, adds some internal "facade" classed to allow accessing 
TimestampedStores with non-timestamped interfaces. This is part of the backward 
compatibility story of the KIP.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consider adding version information into rocksDB storage format
> ---
>
> Key: KAFKA-3522
> URL: https://issues.apache.org/jira/browse/KAFKA-3522
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: architecture
>
> Kafka Streams does not introduce any modifications to the data format in the 
> underlying Kafka protocol, but it does use RocksDB for persistent state 
> storage, and currently its data format is fixed and hard-coded. We want to 
> consider the evolution path in the future we we change the data format, and 
> hence having some version info stored along with the storage file / directory 
> would be useful.
> And this information could be even out of the storage file; for example, we 
> can just use a small "version indicator" file in the rocksdb directory for 
> this purposes. Thoughts? [~enothereska] [~jkreps]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6455) Improve timestamp propagation at DSL level

2019-01-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747217#comment-16747217
 ] 

ASF GitHub Bot commented on KAFKA-6455:
---

mjsax commented on pull request #6147: KAFKA-6455: Extend CacheFlushListener to 
forward timestamp
URL: https://github.com/apache/kafka/pull/6147
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve timestamp propagation at DSL level
> --
>
> Key: KAFKA-6455
> URL: https://issues.apache.org/jira/browse/KAFKA-6455
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> At DSL level, we inherit the timestamp propagation "contract" from the 
> Processor API. This contract in not optimal at DSL level, and we should 
> define a DSL level contract that matches the semantics of the corresponding 
> DSL operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place

2019-01-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747025#comment-16747025
 ] 

ASF GitHub Bot commented on KAFKA-7502:
---

dongjinleekr commented on pull request #6174: KAFKA-7502: Cleanup KTable 
materialization logic in a single place
URL: https://github.com/apache/kafka/pull/6174
 
 
   This is a draft cleanup for KAFKA-7502. Here is the details:
   
   1. Make `KTableKTableJoinNode` abstract, and define its child classes 
(`[NonMaterialized,Materialized]KTableKTableJoinNode`) instead: now, all 
materialization-related routines are separated into the other classes.
   2. `KTableKTableJoinNodeBuilder#build` now instantiates 
`[NonMaterialized,Materialized]KTableKTableJoinNode` classes instead of 
`KTableKTableJoinNode`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup KTable materialization logic in a single place
> --
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Minor
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. For example, in 
> `KTableKTableJoinNode` and in `KTableImpl#doJoin`. This is bug-vulnerable 
> since we may update the logic in one class but forgot to update the other 
> class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)