[jira] [Commented] (KAFKA-8816) RecordCollector offsets updated indirectly by StreamTask

2019-08-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16916135#comment-16916135
 ] 

ASF GitHub Bot commented on KAFKA-8816:
---

guozhangwang commented on pull request #7223: KAFKA-8816: Make offsets 
immutable to users of RecordCollector.offsets
URL: https://github.com/apache/kafka/pull/7223
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RecordCollector offsets updated indirectly by StreamTask
> 
>
> Key: KAFKA-8816
> URL: https://issues.apache.org/jira/browse/KAFKA-8816
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Chris Pettitt
>Assignee: Chris Pettitt
>Priority: Major
>
> Currently it is possible to indirectly update the offsets in 
> RecordCollectorImpl via the offset read function:
> {code:java}
> @Override
> public Map offsets() {
> return offsets;
> } {code}
> The offsets here is the a private final field in RecordCollectorImpl. It 
> appears that the intent is for this field to be updated only when the 
> producer acknowledges an offset. However, because it is handed back in a 
> mutable form, it is possible to update offsets through this call, as actually 
> happens today in StreamTask:
> {code:java}
> protected Map activeTaskCheckpointableOffsets() {
> final Map checkpointableOffsets = 
> recordCollector.offsets();
> for (final Map.Entry entry : 
> consumedOffsets.entrySet()) {
> checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
> }
> return checkpointableOffsets;
> }{code}
> Here it is possible to set a new checkpoint if the topic partition is not 
> already in the offsets map, which happens for the input topic when we're 
> using optimized topologies and a KTable. The effect is that we continue to 
> checkpoint the first offset seen (putIfAbsent).
> It seems the correct behavior would be to return a read only view of the 
> offsets from RecordCollectorImpl and create a copy of the returned map in 
> activeTaskCheckpointableOffsets before we mutate it.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8816) RecordCollector offsets updated indirectly by StreamTask

2019-08-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16910707#comment-16910707
 ] 

ASF GitHub Bot commented on KAFKA-8816:
---

cpettitt-confluent commented on pull request #7223: KAFKA-8816: Make offsets 
immutable to users of RecordCollector.offsets
URL: https://github.com/apache/kafka/pull/7223
 
 
   Make offsets immutable to users of RecordCollector.offsets. Fix up an
   existing case where offsets could be modified in this way. Add a simple
   test to verify offsets cannot be changed externally.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RecordCollector offsets updated indirectly by StreamTask
> 
>
> Key: KAFKA-8816
> URL: https://issues.apache.org/jira/browse/KAFKA-8816
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Chris Pettitt
>Assignee: Chris Pettitt
>Priority: Major
>
> Currently it is possible to indirectly update the offsets in 
> RecordCollectorImpl via the offset read function:
> {code:java}
> @Override
> public Map offsets() {
> return offsets;
> } {code}
> The offsets here is the a private final field in RecordCollectorImpl. It 
> appears that the intent is for this field to be updated only when the 
> producer acknowledges an offset. However, because it is handed back in a 
> mutable form, it is possible to update offsets through this call, as actually 
> happens today in StreamTask:
> {code:java}
> protected Map activeTaskCheckpointableOffsets() {
> final Map checkpointableOffsets = 
> recordCollector.offsets();
> for (final Map.Entry entry : 
> consumedOffsets.entrySet()) {
> checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue());
> }
> return checkpointableOffsets;
> }{code}
> Here it is possible to set a new checkpoint if the topic partition is not 
> already in the offsets map, which happens for the input topic when we're 
> using optimized topologies and a KTable. The effect is that we continue to 
> checkpoint the first offset seen (putIfAbsent).
> It seems the correct behavior would be to return a read only view of the 
> offsets from RecordCollectorImpl and create a copy of the returned map in 
> activeTaskCheckpointableOffsets before we mutate it.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)