[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
guozhangwang commented on a change in pull request #9083: URL: https://github.com/apache/kafka/pull/9083#discussion_r491646523 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -217,7 +217,7 @@ public StateStore getStateStore(final String name) { forward((ProcessorNode) child, key, value); } } else { -final ProcessorNode child = currentNode().getChild(sendTo); +final ProcessorNode child = currentNode().getChild(sendTo); Review comment: This is because I added the typing in line 98 of ProcessorNode below, I will revert it back and update the line below in `forward`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
guozhangwang commented on a change in pull request #9083: URL: https://github.com/apache/kafka/pull/9083#discussion_r491647837 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java ## @@ -118,6 +120,10 @@ public void init(final InternalProcessorContext context) { } catch (final Exception e) { throw new StreamsException(String.format("failed to initialize processor %s", name), e); } + +// revived tasks could re-initialize the topology, +// in which case we should reset the flag +closed = false; Review comment: SG. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -1310,9 +1310,11 @@ public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() { EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes(); stateManager.checkpoint(); EasyMock.expectLastCall().once(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition)); Review comment: My bad, will revert. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
guozhangwang commented on a change in pull request #9083: URL: https://github.com/apache/kafka/pull/9083#discussion_r492944504 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) { forward((ProcessorNode) child, key, value); } } else { -final ProcessorNode child = currentNode().getChild(sendTo); Review comment: I made the change in ProcessorNode to add back the template types: https://github.com/apache/kafka/pull/9083/files/82b6f6f5d238401097e0906c8135c5c189524666#diff-705bfd0ed3f214048b76d775708cc7d2L96 But since `currentNode()`'s template is `` its templated `getChild` and that's why I need to weaken it here --- as you can see from the above `if` branch, it now aligns consistently on the typing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
guozhangwang commented on a change in pull request #9083: URL: https://github.com/apache/kafka/pull/9083#discussion_r492950073 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java ## @@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier implements Processor { +public static class KStreamFlatTransformValuesProcessor extends AbstractProcessor { Review comment: As described in at the top, `Let all built-in processors to extend from AbstractProcessor.` The main reason is that AbstractProcessor provides some basic functionalities and hence it's better to let our own impl to base on them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed
guozhangwang commented on a change in pull request #9083: URL: https://github.com/apache/kafka/pull/9083#discussion_r492944504 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ## @@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) { forward((ProcessorNode) child, key, value); } } else { -final ProcessorNode child = currentNode().getChild(sendTo); Review comment: I made the change in ProcessorNode to add back the template types: https://github.com/apache/kafka/pull/9083/files/82b6f6f5d238401097e0906c8135c5c189524666#diff-705bfd0ed3f214048b76d775708cc7d2L96 But since `currentNode()`'s template is `` its templated `getChild` and that's why I need to weaken it here --- as you can see from the above `if` branch, it now aligns consistently on the typing. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java ## @@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier implements Processor { +public static class KStreamFlatTransformValuesProcessor extends AbstractProcessor { Review comment: As described in at the top, `Let all built-in processors to extend from AbstractProcessor.` The main reason is that AbstractProcessor provides some basic functionalities and hence it's better to let our own impl to base on them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org