[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

2020-09-19 Thread GitBox


guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r491646523



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -217,7 +217,7 @@ public StateStore getStateStore(final String name) {
 forward((ProcessorNode) child, key, value);
 }
 } else {
-final ProcessorNode child = 
currentNode().getChild(sendTo);
+final ProcessorNode child = currentNode().getChild(sendTo);

Review comment:
   This is because I added the typing in line 98 of ProcessorNode below, I 
will revert it back and update the line below in `forward`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

2020-09-19 Thread GitBox


guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r491647837



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##
@@ -118,6 +120,10 @@ public void init(final InternalProcessorContext context) {
 } catch (final Exception e) {
 throw new StreamsException(String.format("failed to initialize 
processor %s", name), e);
 }
+
+// revived tasks could re-initialize the topology,
+// in which case we should reset the flag
+closed = false;

Review comment:
   SG.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1310,9 +1310,11 @@ public void 
shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
 
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
 offset)).anyTimes();
 stateManager.checkpoint();
 EasyMock.expectLastCall().once();
+
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition));

Review comment:
   My bad, will revert.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

2020-09-22 Thread GitBox


guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492944504



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
 forward((ProcessorNode) child, key, value);
 }
 } else {
-final ProcessorNode child = 
currentNode().getChild(sendTo);

Review comment:
   I made the change in ProcessorNode to add back the template types: 
https://github.com/apache/kafka/pull/9083/files/82b6f6f5d238401097e0906c8135c5c189524666#diff-705bfd0ed3f214048b76d775708cc7d2L96
   
   But since `currentNode()`'s template is `` its templated 
`getChild` and that's why I need to weaken it here --- as you can see from the 
above `if` branch, it now aligns consistently on the typing.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

2020-09-22 Thread GitBox


guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492950073



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
##
@@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final 
ValueTransformerWithKeySupplier 
implements Processor {
+public static class KStreamFlatTransformValuesProcessor 
extends AbstractProcessor {

Review comment:
   As described in at the top, `Let all built-in processors to extend from 
AbstractProcessor.` The main reason is that AbstractProcessor provides some 
basic functionalities and hence it's better to let our own impl to base on them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed

2020-09-22 Thread GitBox


guozhangwang commented on a change in pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#discussion_r492944504



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##
@@ -217,12 +217,12 @@ public StateStore getStateStore(final String name) {
 forward((ProcessorNode) child, key, value);
 }
 } else {
-final ProcessorNode child = 
currentNode().getChild(sendTo);

Review comment:
   I made the change in ProcessorNode to add back the template types: 
https://github.com/apache/kafka/pull/9083/files/82b6f6f5d238401097e0906c8135c5c189524666#diff-705bfd0ed3f214048b76d775708cc7d2L96
   
   But since `currentNode()`'s template is `` its templated 
`getChild` and that's why I need to weaken it here --- as you can see from the 
above `if` branch, it now aligns consistently on the typing.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
##
@@ -44,19 +45,18 @@ public KStreamFlatTransformValues(final 
ValueTransformerWithKeySupplier 
implements Processor {
+public static class KStreamFlatTransformValuesProcessor 
extends AbstractProcessor {

Review comment:
   As described in at the top, `Let all built-in processors to extend from 
AbstractProcessor.` The main reason is that AbstractProcessor provides some 
basic functionalities and hence it's better to let our own impl to base on them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org