It would obviously be ideal if Flux could be made to support object
creation via builders, but if that's not possible I think leaving the
KafkaSpoutConfig constructor public is a decent workaround. We are still
getting some of the benefits of the Builder pattern, even if Flux doesn't
use builder.build() to construct the KafkaSpoutConfig (immutability of
KafkaSpoutConfig, no telescoping constructors). I would prefer not to get
rid of it to work around a limitation in Flux.
My knowledge of Flux is very limited, but is there any reason it can't be
changed to support taking the result of a method call as a constructor
parameter? e.g.
components:
- id: "SpoutConfigBuilder"
className: "org.apache.storm.kafka.spout.KafkaSpoutConfig.Builder"
configMethods:
- name: "setProp"
args:
- "max.poll.records"
- 250
- id: "Spout"
className: "org.apache.storm.kafka.spout.KafkaSpout"
constructorArgs:
- ref: "SpoutConfigBuilder"
call: "build"
2017-06-14 22:09 GMT+02:00 Hugo Da Cruz Louro <[email protected]>:
> Hi,
>
> Flux is simply a mechanism to enabling Java objects creation using a
> descriptor file. If Flux does not support creating classes that follow the
> Builder design pattern, that is a Flux limitation and has to be fixed. It
> is not reasonable to impose that no one can write a builder because Flux
> does not support it. My suggested approach was a simple solution to quickly
> get around it. Let’s identify the proper way to fix it.
>
> I do not think it is reasonable not to respect immutability and
> encapsulation unless there is a very strong reason to do so. It makes the
> code super fragile, hard to debug, and thread unsafe.
>
> > On Jun 14, 2017, at 12:22 PM, Priyank Shah <[email protected]>
> wrote:
> >
> > Hi Stig/Hugo,
> >
> > That constructor is indeed public. I actually made that change but
> forgot about it. https://github.com/apache/storm/commit/
> 5ff7865cf0b86f40e99b54e789fa60b8843191aa The reason for making that
> change is to make it work with flux.
> >
> > I think changing flux code to access private constructor is a hack and I
> prefer not doing that. On the other hand, having a public constructor
> defeats the purpose of Builder pattern since builder.build() is supposed to
> create the object. I personally don’t think immutability of
> KafkaSpoutConfig is that important here. I would rather get rid of the
> builder and have it with one or two constructors with some setXXX methods.
> Let me know what you guys think.
> >
> >
> > On 6/14/17, 9:46 AM, "Hugo Da Cruz Louro" <[email protected]>
> wrote:
> >
> > @Harsha @Stig, I agree with you. Let’s make the de facto
> implementation manual partition assignment. I have already adjusted the
> KafkaTridentSpout code to reflect @Stig’s changes and things seem to be
> working very well for Trident as well. I am tracking that on
> https://issues.apache.org/jira/browse/STORM-2554 and I will submit a PR
> soon. There were a couple minor fixes that I had to provide to
> https://github.com/apache/storm/pull/2150 to make it work; I will mention
> them as comment in the PR.
> >
> > @Priyank, the KafkaSpoutConfig class should be immutable, as it is a
> configuration class, which should not be possible to change once it is
> passed onto the KafkaSpout or KafkaTridentSpout. The builder that @Stig
> referenced should indeed be private or at most package protected if needed
> for unit tests, not public. If we have to leave it public for now to make
> Flux work, so be it. However, the right fix for this would be to fix the
> Flux code to work with builders. Flux uses mostly Java reflection, so the
> fix may be as simple as allowing invocation of private constructors as
> described in here<https://stackoverflow.com/questions/11282265/how-to-
> call-a-private-method-from-outside-a-java-class>.
> >
> > We should try to eliminate as many constructors possible. There
> should be one or two constructors that enforce the dependencies that are
> absolutely required and for which there are no reasonable defaults. Any
> other optional, or non default, configuration setting should go in a setter
> method. All the KafkaConsumer properties, as we seem to all agree, should
> be passed in a Map<String, Object> which is what KafkaConsumer needs in its
> constructor.
> >
> > Hugo
> >
> >
> > On Jun 14, 2017, at 8:38 AM, Stig Døssing <[email protected]<
> mailto:[email protected]>> wrote:
> >
> > It looks public to me?
> > https://github.com/apache/storm/blob/38e997ed96ce6627cabb4054224d70
> 44fd2b40f9/external/storm-kafka-client/src/main/java/
> org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L461
> >
> > I think its good to be able to avoid telescoping constructors, while
> at the
> > same time not having a bunch of setters on the KafkaSpoutConfig.
> That's the
> > main purpose I think the builder has, allowing KafkaSpoutConfig to be
> > immutable.
> >
> > I'd be happy to fiddle with it if you have an example to work from?
> >
> > 2017-06-14 1:11 GMT+02:00 Priyank Shah <[email protected]>:
> >
> > Hi Stig,
> >
> > I think KafkaSpoutConfig constructor is private and it's throwing
> errors
> > while using the approach that you mentioned. Making it public defeats
> be
> > purpose of the builder. Can you give it a shot and confirm at your
> end if
> > it's possible?
> >
> > Thanks
> > Priyank
> >
> > Sent from my iPhone
> >
> > On Jun 13, 2017, at 9:36 AM, Stig Døssing <[email protected]>
> > wrote:
> >
> > Hi Priyank,
> >
> > I changed my mind since those mails were sent. I don't think
> setKey/Value
> > are very useful. They couldn't be used with the default Kafka
> > deserializers, only for deserializers implemented by the user, and
> then
> > only if they were declared to implement SerializableDeserializer. I
> agree
> > that we should remove them, and I'm not going to undo anything
> currently
> > in
> > the PR (unless there are objections on the PR of course)
> >
> > With regard to getting rid of the builder pattern, I think it is a
> pretty
> > nice pattern for Java. It looks to me like it should be possible to
> > declare
> > and configure the builder with "component:", and then pass it to the
> > KafkaSpoutConfig constructor with "ref:" after (which lets you avoid
> > calling build()). Doesn't this work?
> >
> > 2017-06-12 23:32 GMT+02:00 Priyank Shah <[email protected]>:
> >
> > Hi Stig,
> >
> > I think PR https://github.com/apache/storm/pull/2155/files you
> created
> > gets rid of setKey and setValue. I am fine with it and in fact that’s
> > what
> > I was suggesting in first place. However, your last two email replies
> > suggest something else. Just making sure you are not going to undo
> > anything
> > in the PR and that we are same page about this change. i.e. no setKey
> or
> > setValue. Either for SerializableDeserializer implementations or Kafka
> > Deserializer interface. Only string in fqcn as a property.
> >
> > The other thing I propose is to get rid of the builder class. Reason
> is
> > constructing an object with builder pattern requires builder.build and
> > that
> > does work well with flux yaml. I think we should be careful about
> > implementing new connectors and make sure they work with yaml as
> well. I
> > have commented on the PR as well. Unless, someone else has a different
> > opinion can you address that as well?
> >
> > On 6/10/17, 2:05 AM, "Stig Døssing" <[email protected]> wrote:
> >
> > Priyank, I was a bit too hasty in the last response. The
> setKey/Value
> > functions are necessary when users want to set only the key or the
> > value
> > deserializer. I think we should keep those. It may be possible to
> > deduplicate the functionality on the API by removing the Builder
> > constructors that takes deserializers, and by getting rid of the
> > setKey/Value functions that take Class instances, since those seem
> > like a
> > duplication of the consumer config functionality. This should get
> rid
> > of a
> > lot of the overloads.
> >
> > 2017-06-10 10:20 GMT+02:00 Stig Døssing <[email protected]>:
> >
> > Harsha,
> >
> > +1 for simplifying away those methods that are just setting consumer
> > config. The properties I think we should keep are all the spout
> > configuration (timeouts, retry handling, tuple construction). Maybe
> > we
> > deprecate the consumer config functions on 1.x and remove them on
> > master?
> >
> > Priyank,
> >
> > When the spout is declared, it takes type parameters to define the
> > key and
> > value types of the consumer. We are able to check at compile time
> > that the
> > deserializers match those expected types.
> > e.g.
> > SerializableDeserializer<Integer> des = ...
> >
> > KafkaSpoutConfig<Integer, String> config = KafkaSpoutConfig.builder("
> > dummy",
> > "dummy")
> > .setKey(des)
> > .build();
> >
> > KafkaSpout<String, String> wrongTypeSpout = new KafkaSpout<>(config);
> >
> > will not compile, while
> >
> > SerializableDeserializer<String> des = ...
> >
> > KafkaSpoutConfig<String, String> config = KafkaSpoutConfig.builder("
> > dummy",
> > "dummy")
> > .setKey(des)
> > .build();
> >
> > KafkaSpout<String, String> spout = new KafkaSpout<>(config);
> >
> > will. If we want to simplify the API, maybe we should just mirror the
> > KafkaConsumer API more closely and remove the Builder setKey/Value
> > methods.
> > I can't think of a reason why a user should need to create a Builder
> > of one
> > type, and then change the type later with setKey/Value. The
> > deserializers
> > can just go in through the Builder constructor.
> >
> > About KafkaTuple, I think it was done this way originally since
> > requiring
> > users to subclass KafkaTuple would be a breaking change. If we want
> > to do
> > it it should go in 2.x only. I'm not necessarily opposed to doing
> > it, but I
> > don't really have a strong opinion on it.
> >
> > Hugo,
> >
> > I appreciate that the subscribe API is a major new feature of the 0.9
> > consumer, but I can't come up with any reason to use it in Storm. I
> > don't
> > think we should support it just because it is there. As mentioned
> > upthread,
> > the features offered by that API are already covered by Storm, so
> > I'm not
> > seeing the value to having it. If we can't come up with a use case
> > for it I
> > don't see a reason to allow users to configure it, especially given
> > the
> > non-obvious problems users who choose it are likely to run into.
> >
> >
> > 2017-06-10 <20%2017%2006%2010> 6:03 GMT+02:00 Harsha <
> > [email protected]>:
> >
> > Dynamic assignment is what causing all the issues that we see now.
> > 1. Duplicates at the start of the KafkaSpout and upon any rebalance
> > 2. Trident Kafka Spout not holding the transactional batches.
> > Many corner cases can easily produce duplicates.
> >
> > There is no point in keeping dynamic assignment given all the issues
> > that are showing up.
> > Here is the excerpt from Kafka consumer docs
> > https://www-us.apache.org/dist/kafka/0.10.0.1/javadoc/org/
> > apache/kafka/clients/consumer/KafkaConsumer.html
> > "If the process itself is highly available and will be restarted if
> > it
> > fails (perhaps using a cluster management framework like YARN,
> > Mesos, or
> > AWS facilities, or as part of a stream processing framework). In
> > this
> > case there is no need for Kafka to detect the failure and reassign
> > the
> > partition since the consuming process will be restarted on another
> > machine."
> >
> > Manual assignment is the right way to go.
> >
> > -Harsha
> >
> > On Jun 9, 2017, 4:09 PM -0700, Hugo Da Cruz Louro
> > <[email protected]>, wrote:
> > +1 for simplifying KafkaSpoutConfig. Too many constructors and too
> > many
> > methods.. I am not sure it’s justifiable to have any methods that
> > simply
> > set KafkaConsumer properties. All of these properties should just
> > go in
> > a Map<String, Object>, which is what KafkaConsumer receives, and
> > what
> > was supported in the initial implementation. The names of the
> > properties
> > can be retrieved from org.apache.kafka.clients.
> > consumer.ConsumerConfig.
> > At this point we may have to keep in mind backwards compatibility.
> >
> > Not sure we should completely discontinue dynamic partition
> > assignment,
> > as it is one of primary features of the new Storm Kafka Client API.
> > With
> > this said, manual partition assignment should be supported and would
> > solve a lot of potential problems arising from dynamic partition
> > assignment.
> >
> > Hugo
> >
> > On Jun 9, 2017, at 3:33 PM, Harsha <[email protected]> wrote:
> >
> > I think question why we need all those settings when a user can
> > pass it
> > via Properties with consumer properties defined or via Map conf
> > object.
> > Having the methods on top of consumer config means every time Kafka
> > consumer property added or changed one needs add a builder method.
> > We
> > need to get out of the way and let the user configure it like they
> > do it
> > for typical Kafka Consumer instead we've 10s of methods that sets
> > properties for ConsumerConfig.
> >
> > Examples:
> > https://github.com/apache/storm/blob/master/external/storm-
> > kafka-client/src/main/java/org/apache/storm/kafka/spout/
> > KafkaSpoutConfig.java#L317
> >
> > https://github.com/apache/storm/blob/master/external/storm-
> > kafka-client/src/main/java/org/apache/storm/kafka/spout/
> > KafkaSpoutConfig.java#L309
> > etc.. all of these are specific to KafkaConsumer config, users
> > should
> > be able to pass it via Properties all of these.
> >
> > https://github.com/apache/storm/blob/master/external/storm-
> > kafka-client/src/main/java/org/apache/storm/kafka/spout/
> > KafkaSpoutConfig.java#L327
> >
> > whats the benefit of adding that method? and we are forcing that to
> > set
> > the protocol to "SSL" in this method
> > https://github.com/apache/storm/blob/master/external/storm-
> > kafka-client/src/main/java/org/apache/storm/kafka/spout/
> > KafkaSpoutConfig.java#L318
> >
> > Users can set the ssl properties and then can select the
> > securityProtocol "SASL_SSL" which requires both kerberos and ssl
> > configs
> > to be set. In above case making a call setSSLTruststore changes the
> > security.protocol to "SSL". This could easily run into issues if the
> > users sets securityProtocol first with "SASL_SSL" then later calls
> > setSSLTruststore which changes it to "SSL".
> >
> > We are over-taking these settings instead of letting user to figure
> > out
> > from Kafka consumer config page.
> >
> > In contrast we've KafkaProducer which does this
> > https://github.com/apache/storm/blob/master/external/storm-
> > kafka-client/src/main/java/org/apache/storm/kafka/bolt/
> > KafkaBolt.java#L121
> > . I would add Properties object instead of deriving it from
> > topologyConf
> > but this is much more easier to understand for the users. The
> > contract
> > here is put whatever the producer configs that users wants in the
> > conf
> > object and we create producer out of that config.
> >
> > Honestly these interfaces needs to be simple and let the user have
> > control instead of adding our interpretation.
> >
> >
> >
> > Thanks,
> > Harsha
> > On Jun 9, 2017, 2:08 PM -0700, Stig Døssing <
> > [email protected]>,
> > wrote:
> > I'd be happy with a simpler KafkaSpoutConfig, but I think most of
> > the
> > configuration parameters have good reasons for being there. Any
> > examples
> > of
> > parameters you think we should remove?
> >
> > 2017-06-09 21:34 GMT+02:00 Harsha <[email protected]>:
> >
> > +1 on using the manual assignment for the reasons specified below.
> > We
> > will see duplicates even in stable conditions which
> > is not good. I don’t see any reason not to switch to manual
> > assignment.
> > While we are at it we should refactor the KafkaConfig part.
> > It should be as simple as accepting the kafka consumer config or
> > properties file and forwarding it to KafkaConsumer. We made
> > it overly complex and unnecessary.
> >
> > Thanks,
> > Harsha
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
>
>