[ 
https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112567#comment-16112567
 ] 

Paolo Patierno commented on KAFKA-5684:
---------------------------------------

The {{KStreamPrint}} constructor receives a {{ForeachAction}} as parameter 
(even if today is a concrete {{PrintForeachAction}}) so it has no idea about 
the mapper being provided or not by the user with the {{print}} call. There is 
no way to access it using a {{ForeachAction}}.
Today, this mapper  is always not null because it's filled with a default 
mapper if not provided by the user, so {{KStreamPrint}} doesn't have such 
information.

A possible solution could be :

- from the {{print}} method allowing to pass a null mapper to the 
{{PrintForeachAction}}
- having the {{KStreamPrint}} not a {{ForeachAction}} as parameter but the 
today used concrete implementation {{PrintForeachAction}}
- in this way we could expose the mapper from the {{PrintForeachAction}} (with 
get/set) having the {{KStreamPrint}} providing a default one with serdes 
(default from context or provided by user)
- in this case the check "instaceof byte[]" will be only in the default mapper

This solution breaks some "internal" interfaces like the {{KStreamPrint}} 
constructor having a concrete {{PrintForeachAction}} but the question is ...
... do we want to use the {{KStreamPrint}} with other {{ForeachAction}} 
implementations or only {{PrintForeachAction}} ? Maybe not because in that case 
the {{KStreamPeek}} should fit well ?

Does it sound reasonable to you ?

> KStreamPrintProcessor as customized KStreamPeekProcessor
> --------------------------------------------------------
>
>                 Key: KAFKA-5684
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5684
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Paolo Patierno
>            Assignee: Paolo Patierno
>            Priority: Minor
>
> Hi,
> the {{KStreamPrintProcessor}} is implemented from scratch (from the 
> {{AbstractProcessor}}) and the same for the related supplier.
> It looks to me that it's just a special {{KStreamPeekProcessor}} with 
> forwardDownStream to false and that allows the possibility to specify Serdes 
> instances used if key/values are bytes.
> At same time used by a {{print()}} method it provides a fast way to print 
> data flowing through the pipeline (while using just {{peek()}} you need to 
> write the code).
> I think that it could be useful to refactoring the {{KStreamPrintProcessor}} 
> as derived from the {{KStreamPeekProcessor}} customizing its behavior.
> Thanks,
> Paolo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to