Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2443#discussion_r202356815
  
    --- Diff: 
sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
 ---
    @@ -47,54 +45,60 @@
      * The properties are in JSON format which specifies the name of the 
MongoDB collection and etc.
      */
     public class MongoDataSourcesProvider implements DataSourcesProvider {
    +    public static final String SCHEME_NAME = "mongodb";
    +    public static final String VALUE_SERIALIZED_FIELD = "ser.field";
    +    public static final String TRIDENT_VALUE_SERIALIZED_FIELD = 
"trident.ser.field";
    +    public static final String DEFAULT_VALUE_SERIALIZED_FIELD = 
"tridentSerField";
    +    public static final String COLLECTION_NAME = "collection.name";
     
    -    private static class MongoTridentDataSource implements 
ISqlTridentDataSource {
    +    private static class MongoStreamsDataSource implements 
ISqlStreamsDataSource {
             private final String url;
             private final Properties props;
             private final IOutputSerializer serializer;
     
    -        private MongoTridentDataSource(String url, Properties props, 
IOutputSerializer serializer) {
    +        private MongoStreamsDataSource(String url, Properties props, 
IOutputSerializer serializer) {
                 this.url = url;
                 this.props = props;
                 this.serializer = serializer;
             }
     
             @Override
    -        public ITridentDataSource getProducer() {
    +        public IRichSpout getProducer() {
                 throw new 
UnsupportedOperationException(this.getClass().getName() + " doesn't provide 
Producer");
             }
     
             @Override
    -        public SqlTridentConsumer getConsumer() {
    +        public IRichBolt getConsumer() {
                 Preconditions.checkArgument(!props.isEmpty(), "Writable 
MongoDB must contain collection config");
    -            String serField = props.getProperty("trident.ser.field", 
"tridentSerField");
    -            MongoMapper mapper = new TridentMongoMapper(serField, 
serializer);
    -
    -            MongoState.Options options = new MongoState.Options()
    -                    .withUrl(url)
    -                    
.withCollectionName(props.getProperty("collection.name"))
    -                    .withMapper(mapper);
    -
    -            StateFactory stateFactory = new MongoStateFactory(options);
    -            StateUpdater stateUpdater = new MongoStateUpdater();
    -
    -            return new SimpleSqlTridentConsumer(stateFactory, 
stateUpdater);
    +            String serField;
    +            if (props.contains(VALUE_SERIALIZED_FIELD)) {
    +                serField = props.getProperty(VALUE_SERIALIZED_FIELD);
    +            } else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) {
    +                // backward compatibility
    +                serField = 
props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD);
    --- End diff --
    
    Nit: Since this is targeting 2.0.0, it might be okay not to provide 
backward compatibility.


---

Reply via email to