[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-16 Thread Paolo Patierno (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128423#comment-16128423
 ] 

Paolo Patierno commented on KAFKA-5684:
---

I'm going to close this JIRA (and the related PR). This issue will be addressed 
through the KIP-182.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16128422#comment-16128422
 ] 

ASF GitHub Bot commented on KAFKA-5684:
---

Github user ppatierno closed the pull request at:

https://github.com/apache/kafka/pull/3636


> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116540#comment-16116540
 ] 

ASF GitHub Bot commented on KAFKA-5684:
---

GitHub user ppatierno opened a pull request:

https://github.com/apache/kafka/pull/3636

KAFKA-5684: KStreamPrintProcessor as customized KStreamPeekProcessor

This PR is intended for having KStreamPrint derived from KStreamPeek and 
avoiding the "instance of" check on byte[ ] every process call.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ppatierno/kafka kafka-5684

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3636.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3636


commit a516a08bf22cdd53feb4ada50c73dc5a08715b74
Author: Paolo Patierno 
Date:   2017-08-07T12:55:16Z

Refactored KStreamPrint as derived from KStreamPeek
Removed the "instance of" check for byte[] in every KStreamPrint process, 
it's up to a default mapper now
Updated KStreamPrint tests adapting to the new internal structure for 
KStreamPrint and PrintForeachAction




> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-04 Thread Paolo Patierno (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16114563#comment-16114563
 ] 

Paolo Patierno commented on KAFKA-5684:
---

Just to add that the {{KStreamPrint}} needs to be aware of the mapper (null or 
not) so it means that even if removing the mapper from the 
{{PrintForeachAction}} constructor we need to add it to the {{KStreamPrint}} 
constructor so that in the {{init}} method we can set it to the 
{{PrintForeachAction}} or creating and then setting the default one.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-03 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16113499#comment-16113499
 ] 

Guozhang Wang commented on KAFKA-5684:
--

I have a slightly different idea:

1. Remove the `defaultKeyValueMapper` from {{KStreamImpl}}, let the overload 
`print` and `writeAsText` functions without the mapper parameter to pass in the 
null values (note in general we do NOT prefer to pass nulls but replace it with 
default mapper as early as possible in the call trace, but this is a exception 
since in {{KStreamImpl}} we cannot access to {{context}} object for getting the 
default serdes).

2. Remove the `KeyValueMapper` from the constructor of {{PrintForeachAction}}, 
but add an API to set the {{KeyValueMapper}} after the object is constructed.

2. Still keep the {{KStreamPrint}} class, but let {{KStreamPrint}} extend 
{{KStreamPeek}} and let {{KStreamPrintProcessor}} extend 
{{KStreamPeekProcessor}}: let the {{KStreamPrint}} to pass in the 
user-specified serdes to the constructor of {{KStreamPrintProcessor}}.

4. Override {{KStreamPrintProcessor}}' `init` function to construct the default 
mapper (which potentially based on the default values of serdes from context) 
if it is not passed in, and then call `setMapper` in the action field.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-02 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16111779#comment-16111779
 ] 

Guozhang Wang commented on KAFKA-5684:
--

1. regarding using `peek` to implement `print`: yes we have been trying to do 
so in https://issues.apache.org/jira/browse/KAFKA-4772, but we did not yet 
collapse {{KStreamPeek}} and {{KStreamPrint}} into the same class, mainly 
because we were treating serdes unnecessarily specially.

2. as Paolo mentioned, since we know at construction time if the mapper is 
provided or not, we can just wrap the serdes into the default mapper ONCE in 
the {{init}} call with one condition check only; by doing this we can get rid 
of the per-call condition check as {{instanceof}}.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-02 Thread Paolo Patierno (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110849#comment-16110849
 ] 

Paolo Patierno commented on KAFKA-5684:
---

Maybe I made a mistake here because the default serdes are available only from 
the {{ProcessorContext}} (using the Streams configuration). The same is for the 
topic which is needed for deserialization. So in any case the checks for a) and 
b) are something that should happen inside the processor node.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-02 Thread Paolo Patierno (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110636#comment-16110636
 ] 

Paolo Patierno commented on KAFKA-5684:
---

So it seems to me that the serdes parameters aren't related to the processor 
node anymore (the {{KStreamPrintProcessor}} in this case) but are related to 
the mapper.
Referring on the two above cases from the {{print}} method point of view :

a) the mapper is just provided to the {{PrintForeachAction}} and serdes aren 
ignored
b) we have to build a mapper which use the provided serdes (or default ones) 
and then passing it to {{PrintForeachAction}}. Today there is a 
{{defaultKeyValueMapper}} but it doesn't fit well in this case because it's not 
aware of serdes

At same time if serdes parameters are not related to the processor node, the 
{{KStreamPrintProcessor}} is really like the {{KStreamPeekProcessor}} with 
"forwardDownStream = false".
The only difference is on the {{close}} method which is needed to flush the 
{{PrintForeachAction}}.
In any case I think that {{KStreamPrintProcessor}} can be refactored as derived 
from {{KStreamPeekProcessor}}.

What do you think [~guozhang] [~james.c] ?
I'm starting to develop a proposal PR for that.

Thanks,
Paolo.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-02 Thread james chien (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110481#comment-16110481
 ] 

james chien commented on KAFKA-5684:


[~guozhang] I agree with your point, we must to check K/V are byte[] if mapper 
is not provided(that means we do not need to execute 
{{KStreamPrint#maybeDeserialize}} .

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-01 Thread Damian Guy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108992#comment-16108992
 ] 

Damian Guy commented on KAFKA-5684:
---

Yep  - sorry i missed that. You are right, this JIRA doesn't need a KIP

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-01 Thread Paolo Patierno (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108984#comment-16108984
 ] 

Paolo Patierno commented on KAFKA-5684:
---

This JIRA won't change the API. It's just a different way to implement the 
print processor on top of peek processor but all the external stuff will remain 
the same.
The question (on dev list) related to why print() is not fluent does need a KIP 
but I'd like to know the reason why from fluent it was changed to be no fluent. 
I think that people who made this change thought about that for a good reason.

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>  Labels: needs-kip
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5684) KStreamPrintProcessor as customized KStreamPeekProcessor

2017-08-01 Thread Paolo Patierno (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16108903#comment-16108903
 ] 

Paolo Patierno commented on KAFKA-5684:
---

[~damianguy] [~mjsax] [~bbejeck] what do you think ?

> KStreamPrintProcessor as customized KStreamPeekProcessor
> 
>
> Key: KAFKA-5684
> URL: https://issues.apache.org/jira/browse/KAFKA-5684
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)