[jira] [Commented] (KAFKA-3522) Consider adding version information into rocksDB storage format
[ https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747388#comment-16747388 ] ASF GitHub Bot commented on KAFKA-3522: --- mjsax commented on pull request #6150: KAFKA-3522: Add internal RecordConverter interface URL: https://github.com/apache/kafka/pull/6150 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consider adding version information into rocksDB storage format > --- > > Key: KAFKA-3522 > URL: https://issues.apache.org/jira/browse/KAFKA-3522 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Matthias J. Sax >Priority: Major > Labels: architecture > > Kafka Streams does not introduce any modifications to the data format in the > underlying Kafka protocol, but it does use RocksDB for persistent state > storage, and currently its data format is fixed and hard-coded. We want to > consider the evolution path in the future we we change the data format, and > hence having some version info stored along with the storage file / directory > would be useful. > And this information could be even out of the storage file; for example, we > can just use a small "version indicator" file in the rocksdb directory for > this purposes. Thoughts? [~enothereska] [~jkreps] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7018) persist memberId for consumer restart
[ https://issues.apache.org/jira/browse/KAFKA-7018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747366#comment-16747366 ] ASF GitHub Bot commented on KAFKA-7018: --- abbccdda commented on pull request #6177: KAFKA-7018 & KIP-345 part-one: Add static membership logic to JoinGroup protocol URL: https://github.com/apache/kafka/pull/6177 This is the first diff for the implementation of JoinGroup logic for static membership. The goal of this diff contains: 1. Add `group.instance.id` to be unique identifier for consumer instances, provided by end user; 2. Modify group coordinator to accept JoinGroupRequest with/without static membership, refactor the logic for readability and code reusability. 3. Add client side support for incorporating static membership changes, including new config for `group.instance.id`, apply stream thread client id by default, and new join group exception handling. 4. Remove `internal.leave.on.close` config by checking whether `group.instance.id` is defined. Effectively speaking, only dynamic member will send LeaveGroupRequest while static membership expiration is only controlled through session timeout. 5. Increase max session timeout to 30 min for more user flexibility if they are inclined to tolerate partial unavailability than burdening rebalance. 6. Unit tests for each module changes, especially on the group coordinator logic. Crossing the possibilities like: 6.1 Dynamic/Static member 6.2 Known/Unknown member id 6.3 Group stable/unstable The hope here is to merge this logic before 2.2 code freeze so that we (as Pinterest) could start experimenting on the core logic ASAP. The rest of the 345 change will be broken down to 4 separate diffs: 1. Avoid kicking out members through rebalance.timeout, only do the kick out through session timeout. 2. Changes around LeaveGroup logic, including version bumping, broker logic, client logic, etc. 3. Admin client changes to add ability to batch remove static members 4. Deprecate group.initial.rebalance.delay Let me know your thoughts @guozhangwang @hachikuji @stanislavkozlovski @MayureshGharat @kkonstantine @lindong28 @Ishiihara @shawnsnguyen , thanks! ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > persist memberId for consumer restart > - > > Key: KAFKA-7018 > URL: https://issues.apache.org/jira/browse/KAFKA-7018 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In group coordinator, there is a logic to neglect join group request from > existing follower consumers: > {code:java} > case Empty | Stable => > if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { > // if the member id is unknown, register the member to the group > addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, > clientHost, protocolType, protocols, group, responseCallback) > } else { > val member = group.get(memberId) > if (group.isLeader(memberId) || !member.matches(protocols)) { > // force a rebalance if a member has changed metadata or if the leader > sends JoinGroup. > // The latter allows the leader to trigger rebalances for changes > affecting assignment > // which do not affect the member metadata (such as topic metadata > changes for the consumer) > updateMemberAndRebalance(group, member, protocols, responseCallback) > } else { > // for followers with no actual change to their metadata, just return > group information > // for the current generation which will allow them to issue SyncGroup > responseCallback(JoinGroupResult( > members = Map.empty, > memberId = memberId, > generationId = group.generationId, > subProtocol = group.protocolOrNull, > leaderId = group.leaderOrNull, > error = Errors.NONE)) > } > {code} > While looking at the AbstractCoordinator, I found that the generation was > hard-coded as > NO_GENERATION on restart, which means we will send UNKNOWN_MEMBER_ID in the > first join group request. This means we will treat the restarted consumer as > a new member, so the rebalance will be triggered until session timeout. > I'm trying to c
[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7641: --- Component/s: core > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Boyang Chen >Assignee: Stanislav Kozlovski >Priority: Major > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4453) add request prioritization
[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4453: --- Component/s: core > add request prioritization > -- > > Key: KAFKA-4453 > URL: https://issues.apache.org/jira/browse/KAFKA-4453 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Onur Karaman >Assignee: Mayuresh Gharat >Priority: Major > Fix For: 2.2.0 > > > Today all requests (client requests, broker requests, controller requests) to > a broker are put into the same queue. They all have the same priority. So a > backlog of requests ahead of the controller request will delay the processing > of controller requests. This causes requests infront of the controller > request to get processed based on stale state. > Side effects may include giving clients stale metadata\[1\], rejecting > ProduceRequests and FetchRequests\[2\], and data loss (for some > unofficial\[3\] definition of data loss in terms of messages beyond the high > watermark)\[4\]. > We'd like to minimize the number of requests processed based on stale state. > With request prioritization, controller requests get processed before regular > queued up requests, so requests can get processed with up-to-date state. > \[1\] Say a client's MetadataRequest is sitting infront of a controller's > UpdateMetadataRequest on a given broker's request queue. Suppose the > MetadataRequest is for a topic whose partitions have recently undergone > leadership changes and that these leadership changes are being broadcasted > from the controller in the later UpdateMetadataRequest. Today the broker > processes the MetadataRequest before processing the UpdateMetadataRequest, > meaning the metadata returned to the client will be stale. The client will > waste a roundtrip sending requests to the stale partition leader, get a > NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the > topic metadata again. > \[2\] Clients can issue ProduceRequests to the wrong broker based on stale > metadata, causing rejected ProduceRequests. Based on how long the client acts > based on the stale metadata, the impact may or may not be visible to a > producer application. If the number of rejected ProduceRequests does not > exceed the max number of retries, the producer application would not be > impacted. On the other hand, if the retries are exhausted, the failed produce > will be visible to the producer application. > \[3\] The official definition of data loss in kafka is when we lose a > "committed" message. A message is considered "committed" when all in sync > replicas for that partition have applied it to their log. > \[4\] Say a number of ProduceRequests are sitting infront of a controller's > LeaderAndIsrRequest on a given broker's request queue. Suppose the > ProduceRequests are for partitions whose leadership has recently shifted out > from the current broker to another broker in the replica set. Today the > broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning > the ProduceRequests are getting processed on the former partition leader. As > part of becoming a follower for a partition, the broker truncates the log to > the high-watermark. With weaker ack settings such as acks=1, the leader may > successfully write to its own log, respond to the user with a success, > process the LeaderAndIsrRequest making the broker a follower of the > partition, and truncate the log to a point before the user's produced > messages. So users have a false sense that their produce attempt succeeded > while in reality their messages got erased. While technically part of what > they signed up for with acks=1, it can still come as a surprise. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7641: --- Description: In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason concluded an edge case of current consumer protocol which could cause memory burst on broker side: ```the case we observed in practice was caused by a consumer that was slow to rejoin the group after a rebalance had begun. At the same time, there were new members that were trying to join the group for the first time. The request timeout was significantly lower than the rebalance timeout, so the JoinGroup of the new members kept timing out. The timeout caused a retry and the group size eventually become quite large because we could not detect the fact that the new members were no longer there.``` Since many disorganized join group requests are spamming the group metadata, we should define a cap on broker side to avoid one consumer group from growing too large. So far I feel it's appropriate to introduce this as a server config since most times this value is only dealing with error scenarios, client users shouldn't worry about this config. KIP-389: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit] was: In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, Jason concluded an edge case of current consumer protocol which could cause memory burst on broker side: ```the case we observed in practice was caused by a consumer that was slow to rejoin the group after a rebalance had begun. At the same time, there were new members that were trying to join the group for the first time. The request timeout was significantly lower than the rebalance timeout, so the JoinGroup of the new members kept timing out. The timeout caused a retry and the group size eventually become quite large because we could not detect the fact that the new members were no longer there.``` Since many disorganized join group requests are spamming the group metadata, we should define a cap on broker side to avoid one consumer group from growing too large. So far I feel it's appropriate to introduce this as a server config since most times this value is only dealing with error scenarios, client users shouldn't worry about this config. > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Boyang Chen >Assignee: Stanislav Kozlovski >Priority: Major > Labels: kip > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > KIP-389: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker
[ https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7641: --- Labels: kip (was: ) > Add `consumer.group.max.size` to cap consumer metadata size on broker > - > > Key: KAFKA-7641 > URL: https://issues.apache.org/jira/browse/KAFKA-7641 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Boyang Chen >Assignee: Stanislav Kozlovski >Priority: Major > Labels: kip > > In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, > Jason concluded an edge case of current consumer protocol which could cause > memory burst on broker side: > ```the case we observed in practice was caused by a consumer that was slow to > rejoin the group after a rebalance had begun. At the same time, there were > new members that were trying to join the group for the first time. The > request timeout was significantly lower than the rebalance timeout, so the > JoinGroup of the new members kept timing out. The timeout caused a retry and > the group size eventually become quite large because we could not detect the > fact that the new members were no longer there.``` > Since many disorganized join group requests are spamming the group metadata, > we should define a cap on broker side to avoid one consumer group from > growing too large. So far I feel it's appropriate to introduce this as a > server config since most times this value is only dealing with error > scenarios, client users shouldn't worry about this config. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4453) add request prioritization
[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4453: --- Description: Today all requests (client requests, broker requests, controller requests) to a broker are put into the same queue. They all have the same priority. So a backlog of requests ahead of the controller request will delay the processing of controller requests. This causes requests infront of the controller request to get processed based on stale state. Side effects may include giving clients stale metadata[1], rejecting ProduceRequests and FetchRequests[2], and data loss (for some unofficial[3] definition of data loss in terms of messages beyond the high watermark)[4]. We'd like to minimize the number of requests processed based on stale state. With request prioritization, controller requests get processed before regular queued up requests, so requests can get processed with up-to-date state. [1] Say a client's MetadataRequest is sitting infront of a controller's UpdateMetadataRequest on a given broker's request queue. Suppose the MetadataRequest is for a topic whose partitions have recently undergone leadership changes and that these leadership changes are being broadcasted from the controller in the later UpdateMetadataRequest. Today the broker processes the MetadataRequest before processing the UpdateMetadataRequest, meaning the metadata returned to the client will be stale. The client will waste a roundtrip sending requests to the stale partition leader, get a NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the topic metadata again. [2] Clients can issue ProduceRequests to the wrong broker based on stale metadata, causing rejected ProduceRequests. Based on how long the client acts based on the stale metadata, the impact may or may not be visible to a producer application. If the number of rejected ProduceRequests does not exceed the max number of retries, the producer application would not be impacted. On the other hand, if the retries are exhausted, the failed produce will be visible to the producer application. [3] The official definition of data loss in kafka is when we lose a "committed" message. A message is considered "committed" when all in sync replicas for that partition have applied it to their log. [4] Say a number of ProduceRequests are sitting infront of a controller's LeaderAndIsrRequest on a given broker's request queue. Suppose the ProduceRequests are for partitions whose leadership has recently shifted out from the current broker to another broker in the replica set. Today the broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning the ProduceRequests are getting processed on the former partition leader. As part of becoming a follower for a partition, the broker truncates the log to the high-watermark. With weaker ack settings such as acks=1, the leader may successfully write to its own log, respond to the user with a success, process the LeaderAndIsrRequest making the broker a follower of the partition, and truncate the log to a point before the user's produced messages. So users have a false sense that their produce attempt succeeded while in reality their messages got erased. While technically part of what they signed up for with acks=1, it can still come as a surprise. KIP-291: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-291%3A+Separating+controller+connections+and+requests+from+the+data+plane] was: Today all requests (client requests, broker requests, controller requests) to a broker are put into the same queue. They all have the same priority. So a backlog of requests ahead of the controller request will delay the processing of controller requests. This causes requests infront of the controller request to get processed based on stale state. Side effects may include giving clients stale metadata\[1\], rejecting ProduceRequests and FetchRequests\[2\], and data loss (for some unofficial\[3\] definition of data loss in terms of messages beyond the high watermark)\[4\]. We'd like to minimize the number of requests processed based on stale state. With request prioritization, controller requests get processed before regular queued up requests, so requests can get processed with up-to-date state. \[1\] Say a client's MetadataRequest is sitting infront of a controller's UpdateMetadataRequest on a given broker's request queue. Suppose the MetadataRequest is for a topic whose partitions have recently undergone leadership changes and that these leadership changes are being broadcasted from the controller in the later UpdateMetadataRequest. Today the broker processes the MetadataRequest before processing the UpdateMetadataRequest, meaning the metadata returned to the client will be stale. The client will waste a roundtrip sending requests to the stale partitio
[jira] [Updated] (KAFKA-4453) add request prioritization
[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4453: --- Labels: kip (was: ) > add request prioritization > -- > > Key: KAFKA-4453 > URL: https://issues.apache.org/jira/browse/KAFKA-4453 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Onur Karaman >Assignee: Mayuresh Gharat >Priority: Major > Labels: kip > Fix For: 2.2.0 > > > Today all requests (client requests, broker requests, controller requests) to > a broker are put into the same queue. They all have the same priority. So a > backlog of requests ahead of the controller request will delay the processing > of controller requests. This causes requests infront of the controller > request to get processed based on stale state. > Side effects may include giving clients stale metadata\[1\], rejecting > ProduceRequests and FetchRequests\[2\], and data loss (for some > unofficial\[3\] definition of data loss in terms of messages beyond the high > watermark)\[4\]. > We'd like to minimize the number of requests processed based on stale state. > With request prioritization, controller requests get processed before regular > queued up requests, so requests can get processed with up-to-date state. > \[1\] Say a client's MetadataRequest is sitting infront of a controller's > UpdateMetadataRequest on a given broker's request queue. Suppose the > MetadataRequest is for a topic whose partitions have recently undergone > leadership changes and that these leadership changes are being broadcasted > from the controller in the later UpdateMetadataRequest. Today the broker > processes the MetadataRequest before processing the UpdateMetadataRequest, > meaning the metadata returned to the client will be stale. The client will > waste a roundtrip sending requests to the stale partition leader, get a > NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the > topic metadata again. > \[2\] Clients can issue ProduceRequests to the wrong broker based on stale > metadata, causing rejected ProduceRequests. Based on how long the client acts > based on the stale metadata, the impact may or may not be visible to a > producer application. If the number of rejected ProduceRequests does not > exceed the max number of retries, the producer application would not be > impacted. On the other hand, if the retries are exhausted, the failed produce > will be visible to the producer application. > \[3\] The official definition of data loss in kafka is when we lose a > "committed" message. A message is considered "committed" when all in sync > replicas for that partition have applied it to their log. > \[4\] Say a number of ProduceRequests are sitting infront of a controller's > LeaderAndIsrRequest on a given broker's request queue. Suppose the > ProduceRequests are for partitions whose leadership has recently shifted out > from the current broker to another broker in the replica set. Today the > broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning > the ProduceRequests are getting processed on the former partition leader. As > part of becoming a follower for a partition, the broker truncates the log to > the high-watermark. With weaker ack settings such as acks=1, the leader may > successfully write to its own log, respond to the user with a success, > process the LeaderAndIsrRequest making the broker a follower of the > partition, and truncate the log to a point before the user's produced > messages. So users have a false sense that their produce attempt succeeded > while in reality their messages got erased. While technically part of what > they signed up for with acks=1, it can still come as a surprise. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3522) Consider adding version information into rocksDB storage format
[ https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747233#comment-16747233 ] ASF GitHub Bot commented on KAFKA-3522: --- mjsax commented on pull request #6175: KAFKA-3522: Add public interfaces for timestamped stores URL: https://github.com/apache/kafka/pull/6175 Add public interfaces for KIP-258. - `RecordConverter` is omitted because it added via #6150 - `Stores` is omitted, because we need to add corresponding store builder classed first Additionally, adds some internal "facade" classed to allow accessing TimestampedStores with non-timestamped interfaces. This is part of the backward compatibility story of the KIP. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consider adding version information into rocksDB storage format > --- > > Key: KAFKA-3522 > URL: https://issues.apache.org/jira/browse/KAFKA-3522 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Matthias J. Sax >Priority: Major > Labels: architecture > > Kafka Streams does not introduce any modifications to the data format in the > underlying Kafka protocol, but it does use RocksDB for persistent state > storage, and currently its data format is fixed and hard-coded. We want to > consider the evolution path in the future we we change the data format, and > hence having some version info stored along with the storage file / directory > would be useful. > And this information could be even out of the storage file; for example, we > can just use a small "version indicator" file in the rocksdb directory for > this purposes. Thoughts? [~enothereska] [~jkreps] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6455) Improve timestamp propagation at DSL level
[ https://issues.apache.org/jira/browse/KAFKA-6455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747217#comment-16747217 ] ASF GitHub Bot commented on KAFKA-6455: --- mjsax commented on pull request #6147: KAFKA-6455: Extend CacheFlushListener to forward timestamp URL: https://github.com/apache/kafka/pull/6147 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve timestamp propagation at DSL level > -- > > Key: KAFKA-6455 > URL: https://issues.apache.org/jira/browse/KAFKA-6455 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Labels: needs-kip > > At DSL level, we inherit the timestamp propagation "contract" from the > Processor API. This contract in not optimal at DSL level, and we should > define a DSL level contract that matches the semantics of the corresponding > DSL operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place
[ https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16747025#comment-16747025 ] ASF GitHub Bot commented on KAFKA-7502: --- dongjinleekr commented on pull request #6174: KAFKA-7502: Cleanup KTable materialization logic in a single place URL: https://github.com/apache/kafka/pull/6174 This is a draft cleanup for KAFKA-7502. Here is the details: 1. Make `KTableKTableJoinNode` abstract, and define its child classes (`[NonMaterialized,Materialized]KTableKTableJoinNode`) instead: now, all materialization-related routines are separated into the other classes. 2. `KTableKTableJoinNodeBuilder#build` now instantiates `[NonMaterialized,Materialized]KTableKTableJoinNode` classes instead of `KTableKTableJoinNode`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup KTable materialization logic in a single place > -- > > Key: KAFKA-7502 > URL: https://issues.apache.org/jira/browse/KAFKA-7502 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Minor > > Today since we pre-create all the `KTableXXX` operator along with the logical > node, we are effectively duplicating the logic to determine whether the > resulted KTable should be materialized. For example, in > `KTableKTableJoinNode` and in `KTableImpl#doJoin`. This is bug-vulnerable > since we may update the logic in one class but forgot to update the other > class. -- This message was sent by Atlassian JIRA (v7.6.3#76005)