Just realized that from my email it was not clear what were my comments versus 
Stig’s. I have put mine in [] below.

On 6/9/17, 5:42 PM, "Priyank Shah" <ps...@hortonworks.com> wrote:

    I want to add a few things other than the issues raised by Sriharsha and 
Hugo. I am pasting one of the other emails that I sent sometime back about 
cleaning up KafkaSpoutConfig. Stig responsed to that email. Trying to answer 
that in this email so that it is all in one place. Answers in line.
    
    
    
    Hi Priyank,
    
    For question 1 I can think of a couple of reasons (not sure how important
    they are though): Using FQCN makes it impossible to check the generic type
    of the deserializer, so you'd be able to pass the wrong type of
    deserializer to the spout (e.g. a spout that otherwise expects String but
    is passed an Integer deserializer). Passing class instances instead of
    using FQCNs makes it possible to set up some configuration for the
    serializer (e.g. call a constructor with a parameter). I would assume
    that's why the KafkaConsumer API also supports passing instances instead of
    FQCNs. We have SerializableDeserializer as a bit of a service to the user.
    Kafka's Deserializer isn't inherently serializable, and if you configure
    your spout to use a deserializer that isn't serializable, the topology
    submission will fail when Nimbus tries to serialize the spout.
    
  Priyank [ Not sure where and how do we check the type of Deserializer to make 
sure that it’s a String and not Integer by error. Can you elaborate on that? It 
will be anyway throwing a RuntimeException in the worker. Developers are 
already aware that message needs to be deserialized with correct deserializer. 
And if that’s not the case then it should be fixed. As far as configuration 
state for deserializer object is concerned, api already has configure method 
here 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L37
 Do you see any strong reason why one would use an instance of Deserializer 
with some constructor args and setter methods to create the state and not just 
use the string value for FQCN. The state can be passed through consumer 
properties in configure method as mentioned before. Getting rid of 4 instance 
variables keyDes, keyDesClass, valueDes, valueDesClazz and the related 
interface SerializableDeserializer and the different combinations of overloaded 
constructors because of that will be a big cleanup. Instead of doing a service 
to the user we are making the interface of KafkaSpoutConfig and hence 
KafkaSpout more complicated. On the same lines, I still don’t see a reason for 
creating a deserializer object on client side and serialize it to send it over 
to Nimbus. Let me know if you have a concrete example of where one would need 
this.]
    
    For question 2, I think we didn't want to require people to subclass
    KafkaTuple. If they aren't emitting to multiple streams, it's unnecessary
    for the tuple to subclass KafkaTuple. I'm almost certain we don't want to
    change it to List<KafkaTuple>. Splitting a Kafka message into multiple
    tuples can already be done by adding a splitter bolt that does that after
    the KafkaSpout in the topology. I don't really see a good reason for
    putting this functionality in the spout, especially since it complicates
    ack/commit management a bit more if we have to keep track of multiple
    tuples per Kafka message. Is there a reason you'd like the message split in
    the spout, rather than in a downstream bolt?
    
Priyank [   I agree we don’t want return type to be List<KafkaTuple> because of 
offset management you mentioned. I think making it return KafkaTuple still 
makes sense. I would rather prefer forcing user to subclass KafkaTuple. Any 
strong reasons for not requiring users to subclass it? The way it is currently, 
it can misguide the user. List<Object> can be anything, including 
List<KafkaTuple>. I think having the instanceof check in KafkaSpout for object 
returned in apply method is unnecessary. We should change the apply method of 
the interface to return KafkaTuple and have DefaultRecordTranslator handle that 
i.e. return a KafkaTuple with getStream returning the default stream. The only 
contract user needs to bind to is that the stream in KafkaTuple has to be one 
of the streams returned by RecordTranslator that is used in declareOutputFields.
    
    Talking about manual assignment, I remember already doing something like 
that in KinesisSpout. If everyone is okay with the above changes I want to take 
up the task of making the changes as discussed in this email thread(whatever 
conclusion we reach) and even switch the spout to manual assignments. I know 
it’s going to be backward incompatible but I prefer that since we will be 
cleaning up lot of stuff. We can decide which release to pick if at all we vote 
for these backward incompatible changes. ]
    
    2017-05-10 2:47 GMT+02:00 Priyank Shah <ps...@hortonworks.com>:
    
    I was going through new kafka spout code and had a couple of questions.
    
    
    1.       https://github.com/apache/storm/blob/master/external/
    storm-kafka-client/src/main/java/org/apache/storm/kafka/
    spout/KafkaSpoutConfig.java#L98 The instance variable at that line and
    following 3 lines. Why do we need them? Because of that we have Builder
    constructors with different parameters for key and value deserializers. We
    even have https://github.com/apache/storm/blob/master/external/
    storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/
    SerializableDeserializer.java Not sure if we really need it.
    https://github.com/apache/kafka/blob/trunk/clients/src/
    main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L555
    already has a constructor that takes in Properties or a Map and if
    key.deserializer and value.deserialzer keys are set to fqcns then it will
    instantiate them and take care of them at https://github.com/apache/
    kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/
    KafkaConsumer.java#L642 . And we already have setProp method on builder
    to set different kafka configs that will be passed to KafkaConsumer
    constructor. We can get rid of the SerializableDeserializer interface and a
    bunch of constructors and instance variables related to Key and Value
    Deserializable.
    
    2.       We have a RecordTranslator interface that is used to
    declareOutputFields at https://github.com/apache/
    storm/blob/master/external/storm-kafka-client/src/main/
    java/org/apache/storm/kafka/spout/KafkaSpout.java#L486 and then we have
    this special instanceof check here https://github.com/apache/
    storm/blob/master/external/storm-kafka-client/src/main/
    java/org/apache/storm/kafka/spout/KafkaSpout.java#L333 Why is return type
    of apply a List<Object> ? Should we change it to List<KafkaTuple>? That way
    we can get rid of instanceof check and support multiple tuples emitted for
    one kafka message.
    
    
    Fixes for above two might not be backward compatible but if everyone is
    okay with above changes then I can create a patch.
    
    
    On 6/9/17, 4:09 PM, "Hugo Da Cruz Louro" <hlo...@hortonworks.com> wrote:
    
        +1 for simplifying KafkaSpoutConfig. Too many constructors and too many 
methods.. I am not sure it’s justifiable to have any methods that simply set 
KafkaConsumer properties. All of these properties should just go in a 
Map<String, Object>, which is what KafkaConsumer receives, and what was 
supported in the initial implementation. The names of the properties can be 
retrieved from org.apache.kafka.clients.consumer.ConsumerConfig. At this point 
we may have to keep in mind backwards compatibility.
        
        Not sure we should completely discontinue dynamic partition assignment, 
as it is one of primary features of the new Storm Kafka Client API. With this 
said, manual partition assignment should be supported and would solve a lot of 
potential problems arising from dynamic partition assignment.
        
        Hugo
        
        > On Jun 9, 2017, at 3:33 PM, Harsha <st...@harsha.io> wrote:
        > 
        > I think question why we need all those settings when a user can pass 
it
        > via Properties with consumer properties defined or via Map conf 
object.
        > Having the methods on top of consumer config means every time Kafka
        > consumer property added or changed one needs add a builder method.  We
        > need to get out of the way and let the user configure it like they do 
it
        > for typical Kafka Consumer instead we've 10s of methods that sets
        > properties for ConsumerConfig. 
        > 
        > Examples:
        > 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L317
        > 
        > 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L309
        > etc.. all of these are specific to KafkaConsumer  config, users should
        > be able to pass it via Properties all of these.
        > 
        > 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L327
        > 
        > whats the benefit of adding that method? and we are forcing that to 
set
        > the protocol to "SSL" in this method
        > 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L318
        > 
        > Users can set the ssl properties and then can select the
        > securityProtocol "SASL_SSL" which requires both kerberos and ssl 
configs
        > to be set. In above case making a call setSSLTruststore changes the
        > security.protocol to "SSL". This could easily run into issues if the
        > users sets securityProtocol first with "SASL_SSL" then later calls
        > setSSLTruststore which changes it to "SSL".
        > 
        > We are over-taking these settings instead of letting user to figure 
out
        > from Kafka consumer config page.
        > 
        > In contrast we've KafkaProducer which does this
        > 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java#L121
        > . I would add Properties object instead of deriving it from 
topologyConf
        > but this is much more easier to understand for the users. The contract
        > here is put whatever the producer configs that users wants in the conf
        > object and we create producer out of that config. 
        > 
        > Honestly these interfaces needs to be simple and let the user have
        > control instead of adding our interpretation. 
        > 
        > 
        > 
        > Thanks,
        > Harsha
        > On Jun 9, 2017, 2:08 PM -0700, Stig Døssing 
<generalbas....@gmail.com>,
        > wrote:
        > I'd be happy with a simpler KafkaSpoutConfig, but I think most of the
        > configuration parameters have good reasons for being there. Any 
examples
        > of
        > parameters you think we should remove?
        > 
        > 2017-06-09 21:34 GMT+02:00 Harsha <st...@harsha.io>:
        > 
        > +1 on using the manual assignment for the reasons specified below. We
        > will see duplicates even in stable conditions which
        > is not good. I don’t see any reason not to switch to manual 
assignment.
        > While we are at it we should refactor the KafkaConfig part.
        > It should be as simple as accepting the kafka consumer config or
        > properties file and forwarding it to KafkaConsumer. We made
        > it overly complex and unnecessary.
        > 
        > Thanks,
        > Harsha
        > 
        
        
    
    

Reply via email to