Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2443#discussion_r202356815
--- Diff:
sql/storm-sql-external/storm-sql-mongodb/src/jvm/org/apache/storm/sql/mongodb/MongoDataSourcesProvider.java
---
@@ -47,54 +45,60 @@
* The properties are in JSON format which specifies the name of the
MongoDB collection and etc.
*/
public class MongoDataSourcesProvider implements DataSourcesProvider {
+ public static final String SCHEME_NAME = "mongodb";
+ public static final String VALUE_SERIALIZED_FIELD = "ser.field";
+ public static final String TRIDENT_VALUE_SERIALIZED_FIELD =
"trident.ser.field";
+ public static final String DEFAULT_VALUE_SERIALIZED_FIELD =
"tridentSerField";
+ public static final String COLLECTION_NAME = "collection.name";
- private static class MongoTridentDataSource implements
ISqlTridentDataSource {
+ private static class MongoStreamsDataSource implements
ISqlStreamsDataSource {
private final String url;
private final Properties props;
private final IOutputSerializer serializer;
- private MongoTridentDataSource(String url, Properties props,
IOutputSerializer serializer) {
+ private MongoStreamsDataSource(String url, Properties props,
IOutputSerializer serializer) {
this.url = url;
this.props = props;
this.serializer = serializer;
}
@Override
- public ITridentDataSource getProducer() {
+ public IRichSpout getProducer() {
throw new
UnsupportedOperationException(this.getClass().getName() + " doesn't provide
Producer");
}
@Override
- public SqlTridentConsumer getConsumer() {
+ public IRichBolt getConsumer() {
Preconditions.checkArgument(!props.isEmpty(), "Writable
MongoDB must contain collection config");
- String serField = props.getProperty("trident.ser.field",
"tridentSerField");
- MongoMapper mapper = new TridentMongoMapper(serField,
serializer);
-
- MongoState.Options options = new MongoState.Options()
- .withUrl(url)
-
.withCollectionName(props.getProperty("collection.name"))
- .withMapper(mapper);
-
- StateFactory stateFactory = new MongoStateFactory(options);
- StateUpdater stateUpdater = new MongoStateUpdater();
-
- return new SimpleSqlTridentConsumer(stateFactory,
stateUpdater);
+ String serField;
+ if (props.contains(VALUE_SERIALIZED_FIELD)) {
+ serField = props.getProperty(VALUE_SERIALIZED_FIELD);
+ } else if (props.contains(TRIDENT_VALUE_SERIALIZED_FIELD)) {
+ // backward compatibility
+ serField =
props.getProperty(TRIDENT_VALUE_SERIALIZED_FIELD);
--- End diff --
Nit: Since this is targeting 2.0.0, it might be okay not to provide
backward compatibility.
---