[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128423#comment-16128423 ] Paolo Patierno commented on KAFKA-5684: --- I'm going to close this JIRA (and the related PR). This issue will be addressed through the KIP-182. > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128422#comment-16128422 ] ASF GitHub Bot commented on KAFKA-5684: --- Github user ppatierno closed the pull request at: https://github.com/apache/kafka/pull/3636 > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116540#comment-16116540 ] ASF GitHub Bot commented on KAFKA-5684: --- GitHub user ppatierno opened a pull request: https://github.com/apache/kafka/pull/3636 KAFKA-5684: KStreamPrintProcessor as customized KStreamPeekProcessor This PR is intended for having KStreamPrint derived from KStreamPeek and avoiding the "instance of" check on byte[ ] every process call. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppatierno/kafka kafka-5684 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3636.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3636 commit a516a08bf22cdd53feb4ada50c73dc5a08715b74 Author: Paolo PatiernoDate: 2017-08-07T12:55:16Z Refactored KStreamPrint as derived from KStreamPeek Removed the "instance of" check for byte[] in every KStreamPrint process, it's up to a default mapper now Updated KStreamPrint tests adapting to the new internal structure for KStreamPrint and PrintForeachAction > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114563#comment-16114563 ] Paolo Patierno commented on KAFKA-5684: --- Just to add that the {{KStreamPrint}} needs to be aware of the mapper (null or not) so it means that even if removing the mapper from the {{PrintForeachAction}} constructor we need to add it to the {{KStreamPrint}} constructor so that in the {{init}} method we can set it to the {{PrintForeachAction}} or creating and then setting the default one. > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113499#comment-16113499 ] Guozhang Wang commented on KAFKA-5684: -- I have a slightly different idea: 1. Remove the `defaultKeyValueMapper` from {{KStreamImpl}}, let the overload `print` and `writeAsText` functions without the mapper parameter to pass in the null values (note in general we do NOT prefer to pass nulls but replace it with default mapper as early as possible in the call trace, but this is a exception since in {{KStreamImpl}} we cannot access to {{context}} object for getting the default serdes). 2. Remove the `KeyValueMapper` from the constructor of {{PrintForeachAction}}, but add an API to set the {{KeyValueMapper}} after the object is constructed. 2. Still keep the {{KStreamPrint}} class, but let {{KStreamPrint}} extend {{KStreamPeek}} and let {{KStreamPrintProcessor}} extend {{KStreamPeekProcessor}}: let the {{KStreamPrint}} to pass in the user-specified serdes to the constructor of {{KStreamPrintProcessor}}. 4. Override {{KStreamPrintProcessor}}' `init` function to construct the default mapper (which potentially based on the default values of serdes from context) if it is not passed in, and then call `setMapper` in the action field. > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111779#comment-16111779 ] Guozhang Wang commented on KAFKA-5684: -- 1. regarding using `peek` to implement `print`: yes we have been trying to do so in https://issues.apache.org/jira/browse/KAFKA-4772, but we did not yet collapse {{KStreamPeek}} and {{KStreamPrint}} into the same class, mainly because we were treating serdes unnecessarily specially. 2. as Paolo mentioned, since we know at construction time if the mapper is provided or not, we can just wrap the serdes into the default mapper ONCE in the {{init}} call with one condition check only; by doing this we can get rid of the per-call condition check as {{instanceof}}. > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110849#comment-16110849 ] Paolo Patierno commented on KAFKA-5684: --- Maybe I made a mistake here because the default serdes are available only from the {{ProcessorContext}} (using the Streams configuration). The same is for the topic which is needed for deserialization. So in any case the checks for a) and b) are something that should happen inside the processor node. > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110636#comment-16110636 ] Paolo Patierno commented on KAFKA-5684: --- So it seems to me that the serdes parameters aren't related to the processor node anymore (the {{KStreamPrintProcessor}} in this case) but are related to the mapper. Referring on the two above cases from the {{print}} method point of view : a) the mapper is just provided to the {{PrintForeachAction}} and serdes aren ignored b) we have to build a mapper which use the provided serdes (or default ones) and then passing it to {{PrintForeachAction}}. Today there is a {{defaultKeyValueMapper}} but it doesn't fit well in this case because it's not aware of serdes At same time if serdes parameters are not related to the processor node, the {{KStreamPrintProcessor}} is really like the {{KStreamPeekProcessor}} with "forwardDownStream = false". The only difference is on the {{close}} method which is needed to flush the {{PrintForeachAction}}. In any case I think that {{KStreamPrintProcessor}} can be refactored as derived from {{KStreamPeekProcessor}}. What do you think [~guozhang] [~james.c] ? I'm starting to develop a proposal PR for that. Thanks, Paolo. > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110481#comment-16110481 ] james chien commented on KAFKA-5684: [~guozhang] I agree with your point, we must to check K/V are byte[] if mapper is not provided(that means we do not need to execute {{KStreamPrint#maybeDeserialize}} . > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108992#comment-16108992 ] Damian Guy commented on KAFKA-5684: --- Yep - sorry i missed that. You are right, this JIRA doesn't need a KIP > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108984#comment-16108984 ] Paolo Patierno commented on KAFKA-5684: --- This JIRA won't change the API. It's just a different way to implement the print processor on top of peek processor but all the external stuff will remain the same. The question (on dev list) related to why print() is not fluent does need a KIP but I'd like to know the reason why from fluent it was changed to be no fluent. I think that people who made this change thought about that for a good reason. > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > Labels: needs-kip > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor
[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108903#comment-16108903 ] Paolo Patierno commented on KAFKA-5684: --- [~damianguy] [~mjsax] [~bbejeck] what do you think ? > KStreamPrintProcessor as customized KStreamPeekProcessor > > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)