[ 
https://issues.apache.org/jira/browse/SPARK-4960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14260144#comment-14260144
 ] 

Cody Koeninger commented on SPARK-4960:
---------------------------------------

You're saying for an implementation that currently extends e.g.

Receiver[T]

that a user can provide a function

T => Iterable[M]

But in the case of Kafka, T is currently fixed to (K, V). So for kafka we don't 
need a user provided function of type 

(K, V) => Iterable[M] 

We need a user provided function of type 

MessageAndMetadata => Iterable[M]  

In other words, a third type parameter.

I'm also not clear on how your proposed solution deals with the 9 different 
overloads of store(), including the one that takes raw serialized bytes.  

At that point I'm not sure that having an interceptor setter defined on a 
parent class makes a lot of sense, because it's the particular subclass that 
knows what its intermediate third type is (MessageAndMetadata in this case), as 
well as which store method(s) it cares about.

Thats part of why I think constructor arguments are actually a cleaner way to 
handle this - kafka can have an "interceptor" argument that defaults to a 
function MessageAndMetadata => Iterable[(K,V)], other implementations can have 
a type signature for the interceptor that makes sense for them.


As an aside, I think it should actually be TraversableOnce, not Iterable.  All 
we care about is being able to call foreach on it once, and the classes that 
implement TraversableOnce are a superset of those that implement iterable.

> Interceptor pattern in receivers
> --------------------------------
>
>                 Key: SPARK-4960
>                 URL: https://issues.apache.org/jira/browse/SPARK-4960
>             Project: Spark
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Tathagata Das
>
> Sometimes it is good to intercept a message received through a receiver and 
> modify / do something with the message before it is stored into Spark. This 
> is often referred to as the interceptor pattern. There should be general way 
> to specify an interceptor function that gets applied to all receivers. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to