[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16112567#comment-16112567 ]
Paolo Patierno commented on KAFKA-5684: --------------------------------------- The {{KStreamPrint}} constructor receives a {{ForeachAction}} as parameter (even if today is a concrete {{PrintForeachAction}}) so it has no idea about the mapper being provided or not by the user with the {{print}} call. There is no way to access it using a {{ForeachAction}}. Today, this mapper is always not null because it's filled with a default mapper if not provided by the user, so {{KStreamPrint}} doesn't have such information. A possible solution could be : - from the {{print}} method allowing to pass a null mapper to the {{PrintForeachAction}} - having the {{KStreamPrint}} not a {{ForeachAction}} as parameter but the today used concrete implementation {{PrintForeachAction}} - in this way we could expose the mapper from the {{PrintForeachAction}} (with get/set) having the {{KStreamPrint}} providing a default one with serdes (default from context or provided by user) - in this case the check "instaceof byte[]" will be only in the default mapper This solution breaks some "internal" interfaces like the {{KStreamPrint}} constructor having a concrete {{PrintForeachAction}} but the question is ... ... do we want to use the {{KStreamPrint}} with other {{ForeachAction}} implementations or only {{PrintForeachAction}} ? Maybe not because in that case the {{KStreamPeek}} should fit well ? Does it sound reasonable to you ? > KStreamPrintProcessor as customized KStreamPeekProcessor > -------------------------------------------------------- > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Paolo Patierno > Assignee: Paolo Patierno > Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)