Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2443#discussion_r202356815 --- Diff: sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java --- @@ -47,54 +45,60 @@ * The properties are in JSON format which specifies the name of the MongoDB collection and etc. */ public class MongoDataSourcesProvider implements DataSourcesProvider { + public static final String SCHEME_NAME = "mongodb"; + public static final String VALUE_SERIALIZED_FIELD = "ser.field"; + public static final String TRIDENT_VALUE_SERIALIZED_FIELD = "trident.ser.field"; + public static final String DEFAULT_VALUE_SERIALIZED_FIELD = "tridentSerField"; + public static final String COLLECTION_NAME = "collection.name"; - private static class MongoTridentDataSource implements ISqlTridentDataSource { + private static class MongoStreamsDataSource implements ISqlStreamsDataSource { private final String url; private final Properties props; private final IOutputSerializer serializer; - private MongoTridentDataSource(String url, Properties props, IOutputSerializer serializer) { + private MongoStreamsDataSource(String url, Properties props, IOutputSerializer serializer) { this.url = url; this.props = props; this.serializer = serializer; } @Override - public ITridentDataSource getProducer() { + public IRichSpout getProducer() { throw new UnsupportedOperationException(this.getClass().getName() + " doesn't provide Producer"); } @Override - public SqlTridentConsumer getConsumer() { + public IRichBolt getConsumer() { Preconditions.checkArgument(!props.isEmpty(), "Writable MongoDB must contain collection config"); - String serField = props.getProperty("trident.ser.field", "tridentSerField"); - MongoMapper mapper = new TridentMongoMapper(serField, serializer); - - MongoState.Options options = new MongoState.Options() - .withUrl(url) - .withCollectionName(props.getProperty("collection.name")) - .withMapper(mapper); - - StateFactory stateFactory = new MongoStateFactory(options); - StateUpdater stateUpdater = new MongoStateUpdater(); - - return new SimpleSqlTridentConsumer(stateFactory, stateUpdater); + String serField; + if (props.contains(VALUE_SERIALIZED_FIELD)) { + serField = props.getProperty(VALUE_SERIALIZED_FIELD); + } else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) { + // backward compatibility + serField = props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD); --- End diff -- Nit: Since this is targeting 2.0.0, it might be okay not to provide backward compatibility.
---