>From the perspective of a user who wraps a config driven product around Storm, the setup of the KafkaSpout, and its configuration, has been a moving target that is hard to track. In addition, the lack of supporting usage examples across the README vs examples/ has also made it difficult. My opinion is that Kafka's configs will continue to be ever changing. For the strict consumer APIs, it makes sense to pass a map of configuration that can be ported from any Java based client and easily understood from the Apache Kafka config docs by all. This will protect Storm from that ever changing config landscape. The builder pattern can be used where Storm specifics are need. For example, the polling offset configuration where Kafka exposes earliest/latest only but the builder pattern can say "ignore all of my offsets and start from the beginning of the topic." This is achieved in the Consumer API only by telling the consumer object to seekToBeginning on each partition.
I have been trying to go to production with a Apache storm intrinsic spout that supports SSL for a long time now but there isn't one that can be used. I might have to tell my users 'sorry about the duplicates' (this can be as many as 15k for 1m messages and two spout instances across two workers just from the initial spout startup lag difference) and use what is available via subscription but I would like to help with a PR to 1.x that supports at least once semantics, 'assign' API and prevent the initial duplicates. I don't understand why anyone would not use a 1:1 ratio of tasks to executors in the spout so the risk to me is minimal of a large blocking time for one partition reassignment and potential consumer poll timeout in other threads. There appears to be competing efforts of work - perhaps I'm wrong but the work against this problem is very difficult to track: STORM-2542 with PR https://github.com/apache/storm/pull/2151 removes subscription from Stig. STORM-2541 with PR https://github.com/apache/storm/pull/2150 STORM-2554 which says "Incorporate changes done in STORM-2541 <https://issues.apache.org/jira/browse/STORM-2541> and do some refactoring to internal state partition management to make it cleaner and more properly handle partitions reassignment." STORM-2538 with PR https://github.com/apache/storm/pull/2147/files - Which appears to have a start-up delay to apparently align the initial partition assignment more closely with multiple tasks. I can't say that a change in subscribe to assign APIs would be considered for 1.x at this point but the config changes might.. Can the underlying partition assignment be fixed only? Assuming these all can't go in, what is the main effort going to be that I can help test to get over this problem? Kris On Sun, Jun 18, 2017 at 1:07 AM, Priyank Shah <ps...@hortonworks.com> wrote: > Hugo, I agree about the benefits of immutability and encapsulation. But > are they so important for the case we are discussing? As far as code being > super fragile, hard to debug and being thread unsafe, I don’t really think > its applicable here. Can you elaborate on how does it make the code super > fragile and hard to debug? As far as it being thread unsafe is concerned, > KafkaSpoutConfig is anyway going to be serialized and reconstructed in > worker. And it’s always going to be one instance of it per executor. > Correct me if I am misunderstanding anything. > > Stig, we could probably modify flux to do something like that. I need to > put more thought to see if any better semantic can be applied for > supporting builder pattern. For now, since you both prefer to keep the > builder pattern we can do with the current work around of public > constructor like you said. > > On 6/14/17, 2:21 PM, "Stig Døssing" <generalbas....@gmail.com> wrote: > > It would obviously be ideal if Flux could be made to support object > creation via builders, but if that's not possible I think leaving the > KafkaSpoutConfig constructor public is a decent workaround. We are > still > getting some of the benefits of the Builder pattern, even if Flux > doesn't > use builder.build() to construct the KafkaSpoutConfig (immutability of > KafkaSpoutConfig, no telescoping constructors). I would prefer not to > get > rid of it to work around a limitation in Flux. > > My knowledge of Flux is very limited, but is there any reason it can't > be > changed to support taking the result of a method call as a constructor > parameter? e.g. > components: > - id: "SpoutConfigBuilder" > className: "org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder" > configMethods: > - name: "setProp" > args: > - "max.poll.records" > - 250 > > - id: "Spout" > className: "org.apache.storm.kafka.spout.KafkaSpout" > constructorArgs: > - ref: "SpoutConfigBuilder" > call: "build" > > 2017-06-14 22:09 GMT+02:00 Hugo Da Cruz Louro <hlo...@hortonworks.com > >: > > > Hi, > > > > Flux is simply a mechanism to enabling Java objects creation using a > > descriptor file. If Flux does not support creating classes that > follow the > > Builder design pattern, that is a Flux limitation and has to be > fixed. It > > is not reasonable to impose that no one can write a builder because > Flux > > does not support it. My suggested approach was a simple solution to > quickly > > get around it. Let’s identify the proper way to fix it. > > > > I do not think it is reasonable not to respect immutability and > > encapsulation unless there is a very strong reason to do so. It > makes the > > code super fragile, hard to debug, and thread unsafe. > > > > > On Jun 14, 2017, at 12:22 PM, Priyank Shah <ps...@hortonworks.com> > > wrote: > > > > > > Hi Stig/Hugo, > > > > > > That constructor is indeed public. I actually made that change but > > forgot about it. https://github.com/apache/storm/commit/ > > 5ff7865cf0b86f40e99b54e789fa60b8843191aa The reason for making that > > change is to make it work with flux. > > > > > > I think changing flux code to access private constructor is a hack > and I > > prefer not doing that. On the other hand, having a public constructor > > defeats the purpose of Builder pattern since builder.build() is > supposed to > > create the object. I personally don’t think immutability of > > KafkaSpoutConfig is that important here. I would rather get rid of > the > > builder and have it with one or two constructors with some setXXX > methods. > > Let me know what you guys think. > > > > > > > > > On 6/14/17, 9:46 AM, "Hugo Da Cruz Louro" <hlo...@hortonworks.com> > > wrote: > > > > > > @Harsha @Stig, I agree with you. Let’s make the de facto > > implementation manual partition assignment. I have already adjusted > the > > KafkaTridentSpout code to reflect @Stig’s changes and things seem to > be > > working very well for Trident as well. I am tracking that on > > https://issues.apache.org/jira/browse/STORM-2554 and I will submit > a PR > > soon. There were a couple minor fixes that I had to provide to > > https://github.com/apache/storm/pull/2150 to make it work; I will > mention > > them as comment in the PR. > > > > > > @Priyank, the KafkaSpoutConfig class should be immutable, as it > is a > > configuration class, which should not be possible to change once it > is > > passed onto the KafkaSpout or KafkaTridentSpout. The builder that > @Stig > > referenced should indeed be private or at most package protected if > needed > > for unit tests, not public. If we have to leave it public for now to > make > > Flux work, so be it. However, the right fix for this would be to fix > the > > Flux code to work with builders. Flux uses mostly Java reflection, > so the > > fix may be as simple as allowing invocation of private constructors > as > > described in here<https://stackoverflow. > com/questions/11282265/how-to- > > call-a-private-method-from-outside-a-java-class>. > > > > > > We should try to eliminate as many constructors possible. There > > should be one or two constructors that enforce the dependencies that > are > > absolutely required and for which there are no reasonable defaults. > Any > > other optional, or non default, configuration setting should go in a > setter > > method. All the KafkaConsumer properties, as we seem to all agree, > should > > be passed in a Map<String, Object> which is what KafkaConsumer needs > in its > > constructor. > > > > > > Hugo > > > > > > > > > On Jun 14, 2017, at 8:38 AM, Stig Døssing < > generalbas....@gmail.com< > > mailto:generalbas....@gmail.com>> wrote: > > > > > > It looks public to me? > > > https://github.com/apache/storm/blob/ > 38e997ed96ce6627cabb4054224d70 > > 44fd2b40f9/external/storm-kafka-client/src/main/java/ > > org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L461 > > > > > > I think its good to be able to avoid telescoping constructors, > while > > at the > > > same time not having a bunch of setters on the KafkaSpoutConfig. > > That's the > > > main purpose I think the builder has, allowing KafkaSpoutConfig > to be > > > immutable. > > > > > > I'd be happy to fiddle with it if you have an example to work > from? > > > > > > 2017-06-14 1:11 GMT+02:00 Priyank Shah <ps...@hortonworks.com>: > > > > > > Hi Stig, > > > > > > I think KafkaSpoutConfig constructor is private and it's > throwing > > errors > > > while using the approach that you mentioned. Making it public > defeats > > be > > > purpose of the builder. Can you give it a shot and confirm at > your > > end if > > > it's possible? > > > > > > Thanks > > > Priyank > > > > > > Sent from my iPhone > > > > > > On Jun 13, 2017, at 9:36 AM, Stig Døssing < > generalbas....@gmail.com> > > > wrote: > > > > > > Hi Priyank, > > > > > > I changed my mind since those mails were sent. I don't think > > setKey/Value > > > are very useful. They couldn't be used with the default Kafka > > > deserializers, only for deserializers implemented by the user, > and > > then > > > only if they were declared to implement > SerializableDeserializer. I > > agree > > > that we should remove them, and I'm not going to undo anything > > currently > > > in > > > the PR (unless there are objections on the PR of course) > > > > > > With regard to getting rid of the builder pattern, I think it > is a > > pretty > > > nice pattern for Java. It looks to me like it should be > possible to > > > declare > > > and configure the builder with "component:", and then pass it > to the > > > KafkaSpoutConfig constructor with "ref:" after (which lets you > avoid > > > calling build()). Doesn't this work? > > > > > > 2017-06-12 23:32 GMT+02:00 Priyank Shah <ps...@hortonworks.com > >: > > > > > > Hi Stig, > > > > > > I think PR https://github.com/apache/storm/pull/2155/files you > > created > > > gets rid of setKey and setValue. I am fine with it and in fact > that’s > > > what > > > I was suggesting in first place. However, your last two email > replies > > > suggest something else. Just making sure you are not going to > undo > > > anything > > > in the PR and that we are same page about this change. i.e. no > setKey > > or > > > setValue. Either for SerializableDeserializer implementations > or Kafka > > > Deserializer interface. Only string in fqcn as a property. > > > > > > The other thing I propose is to get rid of the builder class. > Reason > > is > > > constructing an object with builder pattern requires > builder.build and > > > that > > > does work well with flux yaml. I think we should be careful > about > > > implementing new connectors and make sure they work with yaml as > > well. I > > > have commented on the PR as well. Unless, someone else has a > different > > > opinion can you address that as well? > > > > > > On 6/10/17, 2:05 AM, "Stig Døssing" <generalbas....@gmail.com> > wrote: > > > > > > Priyank, I was a bit too hasty in the last response. The > > setKey/Value > > > functions are necessary when users want to set only the key > or the > > > value > > > deserializer. I think we should keep those. It may be > possible to > > > deduplicate the functionality on the API by removing the > Builder > > > constructors that takes deserializers, and by getting rid of > the > > > setKey/Value functions that take Class instances, since those > seem > > > like a > > > duplication of the consumer config functionality. This should > get > > rid > > > of a > > > lot of the overloads. > > > > > > 2017-06-10 10:20 GMT+02:00 Stig Døssing < > generalbas....@gmail.com>: > > > > > > Harsha, > > > > > > +1 for simplifying away those methods that are just setting > consumer > > > config. The properties I think we should keep are all the spout > > > configuration (timeouts, retry handling, tuple construction). > Maybe > > > we > > > deprecate the consumer config functions on 1.x and remove them > on > > > master? > > > > > > Priyank, > > > > > > When the spout is declared, it takes type parameters to define > the > > > key and > > > value types of the consumer. We are able to check at compile > time > > > that the > > > deserializers match those expected types. > > > e.g. > > > SerializableDeserializer<Integer> des = ... > > > > > > KafkaSpoutConfig<Integer, String> config = > KafkaSpoutConfig.builder(" > > > dummy", > > > "dummy") > > > .setKey(des) > > > .build(); > > > > > > KafkaSpout<String, String> wrongTypeSpout = new > KafkaSpout<>(config); > > > > > > will not compile, while > > > > > > SerializableDeserializer<String> des = ... > > > > > > KafkaSpoutConfig<String, String> config = > KafkaSpoutConfig.builder(" > > > dummy", > > > "dummy") > > > .setKey(des) > > > .build(); > > > > > > KafkaSpout<String, String> spout = new KafkaSpout<>(config); > > > > > > will. If we want to simplify the API, maybe we should just > mirror the > > > KafkaConsumer API more closely and remove the Builder > setKey/Value > > > methods. > > > I can't think of a reason why a user should need to create a > Builder > > > of one > > > type, and then change the type later with setKey/Value. The > > > deserializers > > > can just go in through the Builder constructor. > > > > > > About KafkaTuple, I think it was done this way originally since > > > requiring > > > users to subclass KafkaTuple would be a breaking change. If we > want > > > to do > > > it it should go in 2.x only. I'm not necessarily opposed to > doing > > > it, but I > > > don't really have a strong opinion on it. > > > > > > Hugo, > > > > > > I appreciate that the subscribe API is a major new feature of > the 0.9 > > > consumer, but I can't come up with any reason to use it in > Storm. I > > > don't > > > think we should support it just because it is there. As > mentioned > > > upthread, > > > the features offered by that API are already covered by Storm, > so > > > I'm not > > > seeing the value to having it. If we can't come up with a use > case > > > for it I > > > don't see a reason to allow users to configure it, especially > given > > > the > > > non-obvious problems users who choose it are likely to run into. > > > > > > > > > 2017-06-10 <20%2017%2006%2010> 6:03 GMT+02:00 Harsha < > > > st...@harsha.io>: > > > > > > Dynamic assignment is what causing all the issues that we see > now. > > > 1. Duplicates at the start of the KafkaSpout and upon any > rebalance > > > 2. Trident Kafka Spout not holding the transactional batches. > > > Many corner cases can easily produce duplicates. > > > > > > There is no point in keeping dynamic assignment given all the > issues > > > that are showing up. > > > Here is the excerpt from Kafka consumer docs > > > https://www-us.apache.org/dist/kafka/0.10.0.1/javadoc/org/ > > > apache/kafka/clients/consumer/KafkaConsumer.html > > > "If the process itself is highly available and will be > restarted if > > > it > > > fails (perhaps using a cluster management framework like YARN, > > > Mesos, or > > > AWS facilities, or as part of a stream processing framework). In > > > this > > > case there is no need for Kafka to detect the failure and > reassign > > > the > > > partition since the consuming process will be restarted on > another > > > machine." > > > > > > Manual assignment is the right way to go. > > > > > > -Harsha > > > > > > On Jun 9, 2017, 4:09 PM -0700, Hugo Da Cruz Louro > > > <hlo...@hortonworks.com>, wrote: > > > +1 for simplifying KafkaSpoutConfig. Too many constructors and > too > > > many > > > methods.. I am not sure it’s justifiable to have any methods > that > > > simply > > > set KafkaConsumer properties. All of these properties should > just > > > go in > > > a Map<String, Object>, which is what KafkaConsumer receives, and > > > what > > > was supported in the initial implementation. The names of the > > > properties > > > can be retrieved from org.apache.kafka.clients. > > > consumer.ConsumerConfig. > > > At this point we may have to keep in mind backwards > compatibility. > > > > > > Not sure we should completely discontinue dynamic partition > > > assignment, > > > as it is one of primary features of the new Storm Kafka Client > API. > > > With > > > this said, manual partition assignment should be supported and > would > > > solve a lot of potential problems arising from dynamic partition > > > assignment. > > > > > > Hugo > > > > > > On Jun 9, 2017, at 3:33 PM, Harsha <st...@harsha.io> wrote: > > > > > > I think question why we need all those settings when a user can > > > pass it > > > via Properties with consumer properties defined or via Map conf > > > object. > > > Having the methods on top of consumer config means every time > Kafka > > > consumer property added or changed one needs add a builder > method. > > > We > > > need to get out of the way and let the user configure it like > they > > > do it > > > for typical Kafka Consumer instead we've 10s of methods that > sets > > > properties for ConsumerConfig. > > > > > > Examples: > > > https://github.com/apache/storm/blob/master/external/storm- > > > kafka-client/src/main/java/org/apache/storm/kafka/spout/ > > > KafkaSpoutConfig.java#L317 > > > > > > https://github.com/apache/storm/blob/master/external/storm- > > > kafka-client/src/main/java/org/apache/storm/kafka/spout/ > > > KafkaSpoutConfig.java#L309 > > > etc.. all of these are specific to KafkaConsumer config, users > > > should > > > be able to pass it via Properties all of these. > > > > > > https://github.com/apache/storm/blob/master/external/storm- > > > kafka-client/src/main/java/org/apache/storm/kafka/spout/ > > > KafkaSpoutConfig.java#L327 > > > > > > whats the benefit of adding that method? and we are forcing > that to > > > set > > > the protocol to "SSL" in this method > > > https://github.com/apache/storm/blob/master/external/storm- > > > kafka-client/src/main/java/org/apache/storm/kafka/spout/ > > > KafkaSpoutConfig.java#L318 > > > > > > Users can set the ssl properties and then can select the > > > securityProtocol "SASL_SSL" which requires both kerberos and ssl > > > configs > > > to be set. In above case making a call setSSLTruststore changes > the > > > security.protocol to "SSL". This could easily run into issues > if the > > > users sets securityProtocol first with "SASL_SSL" then later > calls > > > setSSLTruststore which changes it to "SSL". > > > > > > We are over-taking these settings instead of letting user to > figure > > > out > > > from Kafka consumer config page. > > > > > > In contrast we've KafkaProducer which does this > > > https://github.com/apache/storm/blob/master/external/storm- > > > kafka-client/src/main/java/org/apache/storm/kafka/bolt/ > > > KafkaBolt.java#L121 > > > . I would add Properties object instead of deriving it from > > > topologyConf > > > but this is much more easier to understand for the users. The > > > contract > > > here is put whatever the producer configs that users wants in > the > > > conf > > > object and we create producer out of that config. > > > > > > Honestly these interfaces needs to be simple and let the user > have > > > control instead of adding our interpretation. > > > > > > > > > > > > Thanks, > > > Harsha > > > On Jun 9, 2017, 2:08 PM -0700, Stig Døssing < > > > generalbas....@gmail.com>, > > > wrote: > > > I'd be happy with a simpler KafkaSpoutConfig, but I think most > of > > > the > > > configuration parameters have good reasons for being there. Any > > > examples > > > of > > > parameters you think we should remove? > > > > > > 2017-06-09 21:34 GMT+02:00 Harsha <st...@harsha.io>: > > > > > > +1 on using the manual assignment for the reasons specified > below. > > > We > > > will see duplicates even in stable conditions which > > > is not good. I don’t see any reason not to switch to manual > > > assignment. > > > While we are at it we should refactor the KafkaConfig part. > > > It should be as simple as accepting the kafka consumer config or > > > properties file and forwarding it to KafkaConsumer. We made > > > it overly complex and unnecessary. > > > > > > Thanks, > > > Harsha > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >