This is an automated email from the ASF dual-hosted git repository. drazzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push: new cc62555 GORA-649 Replace usage of deprecated API of MongoDB driver (#207) cc62555 is described below commit cc6255585dfffd51e3003c46f07b96b7082aa79f Author: Damien Raude-Morvan <draz...@drazzib.com> AuthorDate: Sun Mar 29 14:47:48 2020 +0200 GORA-649 Replace usage of deprecated API of MongoDB driver (#207) * GORA-649 MongoDBResult: pass `cursor` and `size` as constructor args * GORA-649 MongoDBQuery: Use Filters and Projections helper * GORA-649 MongoStoreParameters: improve Javadoc * GORA-649: Use org.bson.Document as container * GORA-649 MongoFilterUtil: Avoid changing query passed as reference Return an Optional<> with subfilter to apply * GORA-649 Use Codec interface instead of DefaultDBEncoder * GORA-649 Use new MongoDatabase and MongoCollection API * GORA-649 Use com.mongodb.client.MongoClient interface This interface, while similar to the existing com.mongodb.MongoClient class in that it is a factory for com.mongodb.client.MongoDatabase instances, does not support the legacy com.mongodb.DBCollection-based API : http://mongodb.github.io/mongo-java-driver/3.12/whats-new/#new-entry-point * GORA-649 MongoStore#flush is now no-op Remove fsync handling since its deprecated upstream and should be replaced by proper WriteConcern --- .../gora/mongodb/filters/DefaultFactory.java | 83 +++-- .../apache/gora/mongodb/filters/FilterFactory.java | 7 +- .../gora/mongodb/filters/MongoFilterUtil.java | 28 +- .../apache/gora/mongodb/query/MongoDBQuery.java | 55 ++-- .../apache/gora/mongodb/query/MongoDBResult.java | 35 +-- .../org/apache/gora/mongodb/store/MongoStore.java | 333 ++++++++++----------- .../gora/mongodb/store/MongoStoreParameters.java | 11 +- .../apache/gora/mongodb/utils/BSONDecorator.java | 85 +++--- .../utils/{GoraDBEncoder.java => Utf8Codec.java} | 42 +-- .../gora/mongodb/filters/DefaultFactoryTest.java | 41 +-- .../apache/gora/mongodb/store/TestMongoStore.java | 14 +- .../gora/mongodb/utils/TestBSONDecorator.java | 44 ++- 12 files changed, 365 insertions(+), 413 deletions(-) diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/DefaultFactory.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/DefaultFactory.java index 597f7e9..bd26209 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/DefaultFactory.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/DefaultFactory.java @@ -17,19 +17,21 @@ */ package org.apache.gora.mongodb.filters; -import java.util.ArrayList; -import java.util.List; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.gora.filter.*; import org.apache.gora.mongodb.store.MongoMapping; import org.apache.gora.mongodb.store.MongoStore; import org.apache.gora.persistency.impl.PersistentBase; +import org.bson.Document; +import org.bson.conversions.Bson; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; -import com.mongodb.BasicDBObject; -import com.mongodb.DBObject; -import com.mongodb.QueryBuilder; +import static com.mongodb.client.model.Filters.*; public class DefaultFactory<K, T extends PersistentBase> extends BaseFactory<K, T> { @@ -45,8 +47,8 @@ public class DefaultFactory<K, T extends PersistentBase> extends } @Override - public DBObject createFilter(final Filter<K, T> filter, - final MongoStore<K, T> store) { + public Bson createFilter(final Filter<K, T> filter, + final MongoStore<K, T> store) { if (filter instanceof FilterList) { FilterList<K, T> filterList = (FilterList<K, T>) filter; @@ -64,19 +66,17 @@ public class DefaultFactory<K, T extends PersistentBase> extends } } - protected DBObject transformListFilter(final FilterList<K, T> filterList, - final MongoStore<K, T> store) { - BasicDBObject query = new BasicDBObject(); - for (Filter<K, T> filter : filterList.getFilters()) { - boolean succeeded = getFilterUtil().setFilter(query, filter, store); - if (!succeeded) { - return null; - } - } - return query; + protected Bson transformListFilter(final FilterList<K, T> filterList, + final MongoStore<K, T> store) { + List<Bson> filters = filterList.getFilters().stream() + .map(filter -> getFilterUtil().setFilter(filter, store)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); + return filters.isEmpty() ? new Document() : and(filters); } - protected DBObject transformFieldFilter( + protected Bson transformFieldFilter( final SingleFieldValueFilter<K, T> fieldFilter, final MongoStore<K, T> store) { MongoMapping mapping = store.getMapping(); @@ -85,17 +85,16 @@ public class DefaultFactory<K, T extends PersistentBase> extends FilterOp filterOp = fieldFilter.getFilterOp(); List<Object> operands = fieldFilter.getOperands(); - QueryBuilder builder = QueryBuilder.start(dbFieldName); - builder = appendToBuilder(builder, filterOp, operands); + Bson filter = appendToBuilder(dbFieldName, filterOp, operands); if (!fieldFilter.isFilterIfMissing()) { // If false, the find query will pass if the column is not found. - DBObject notExist = QueryBuilder.start(dbFieldName).exists(false).get(); - builder = QueryBuilder.start().or(notExist, builder.get()); + Bson notExist = exists(dbFieldName, false); + filter = or(notExist, filter); } - return builder.get(); + return filter; } - protected DBObject transformMapFilter( + protected Bson transformMapFilter( final MapFieldValueFilter<K, T> mapFilter, final MongoStore<K, T> store) { MongoMapping mapping = store.getMapping(); String dbFieldName = mapping.getDocumentField(mapFilter.getFieldName()) @@ -104,51 +103,43 @@ public class DefaultFactory<K, T extends PersistentBase> extends FilterOp filterOp = mapFilter.getFilterOp(); List<Object> operands = mapFilter.getOperands(); - QueryBuilder builder = QueryBuilder.start(dbFieldName); - builder = appendToBuilder(builder, filterOp, operands); + Bson filter = appendToBuilder(dbFieldName, filterOp, operands); if (!mapFilter.isFilterIfMissing()) { // If false, the find query will pass if the column is not found. - DBObject notExist = QueryBuilder.start(dbFieldName).exists(false).get(); - builder = QueryBuilder.start().or(notExist, builder.get()); + Bson notExist = exists(dbFieldName, false); + filter = or(notExist, filter); } - return builder.get(); + return filter; } - protected QueryBuilder appendToBuilder(final QueryBuilder builder, + protected Bson appendToBuilder(final String dbFieldName, final FilterOp filterOp, final List<Object> rawOperands) { List<String> operands = convertOperandsToString(rawOperands); switch (filterOp) { case EQUALS: if (operands.size() == 1) { - builder.is(operands.iterator().next()); + return eq(dbFieldName, operands.iterator().next()); } else { - builder.in(operands); + return in(dbFieldName, operands); } - break; case NOT_EQUALS: if (operands.size() == 1) { - builder.notEquals(operands.iterator().next()); + return ne(dbFieldName, operands.iterator().next()); } else { - builder.notIn(operands); + return nin(dbFieldName, operands); } - break; case LESS: - builder.lessThan(operands); - break; + return lt(dbFieldName, operands); case LESS_OR_EQUAL: - builder.lessThanEquals(operands); - break; + return lte(dbFieldName, operands); case GREATER: - builder.greaterThan(operands); - break; + return gt(dbFieldName, operands); case GREATER_OR_EQUAL: - builder.greaterThanEquals(operands); - break; + return gte(dbFieldName, operands); default: throw new IllegalArgumentException(filterOp + " no MongoDB equivalent yet"); } - return builder; } /** diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/FilterFactory.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/FilterFactory.java index c68364f..5276852 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/FilterFactory.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/FilterFactory.java @@ -17,13 +17,12 @@ */ package org.apache.gora.mongodb.filters; -import java.util.List; - import org.apache.gora.filter.Filter; import org.apache.gora.mongodb.store.MongoStore; import org.apache.gora.persistency.impl.PersistentBase; +import org.bson.conversions.Bson; -import com.mongodb.DBObject; +import java.util.List; /** * Describe factory which create remote filter for MongoDB. @@ -38,5 +37,5 @@ public interface FilterFactory<K, T extends PersistentBase> { List<String> getSupportedFilters(); - DBObject createFilter(Filter<K, T> filter, MongoStore<K, T> store); + Bson createFilter(Filter<K, T> filter, MongoStore<K, T> store); } diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/MongoFilterUtil.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/MongoFilterUtil.java index 8779af4..a7ccdc0 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/MongoFilterUtil.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/filters/MongoFilterUtil.java @@ -17,9 +17,6 @@ */ package org.apache.gora.mongodb.filters; -import java.util.LinkedHashMap; -import java.util.Map; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.gora.filter.Filter; @@ -28,8 +25,11 @@ import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.util.GoraException; import org.apache.gora.util.ReflectionUtils; import org.apache.hadoop.conf.Configuration; +import org.bson.conversions.Bson; -import com.mongodb.DBObject; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; /** * Manage creation of filtering {@link org.apache.gora.query.Query} using @@ -40,8 +40,7 @@ import com.mongodb.DBObject; * </p> * * @author Damien Raude-Morvan draudemor...@dictanova.com - * @see #setFilter(com.mongodb.DBObject, org.apache.gora.filter.Filter, - * org.apache.gora.mongodb.store.MongoStore) + * @see #setFilter(Filter, MongoStore) */ public class MongoFilterUtil<K, T extends PersistentBase> { @@ -87,32 +86,29 @@ public class MongoFilterUtil<K, T extends PersistentBase> { /** * Set a filter on the <tt>query</tt>. It translates a Gora filter to a * MongoDB filter. - * - * @param query - * The Mongo Query + * * @param filter * The Gora filter. * @param store * The MongoStore. * @return if remote filter is successfully applied. */ - public boolean setFilter(final DBObject query, final Filter<K, T> filter, - final MongoStore<K, T> store) { + public Optional<Bson> setFilter(final Filter<K, T> filter, + final MongoStore<K, T> store) { FilterFactory<K, T> factory = getFactory(filter); if (factory == null) { LOG.warn("MongoDB remote filter factory not yet implemented for " + filter.getClass().getCanonicalName()); - return false; + return Optional.empty(); } else { - DBObject mongoFilter = factory.createFilter(filter, store); + Bson mongoFilter = factory.createFilter(filter, store); if (mongoFilter == null) { LOG.warn("MongoDB remote filter not yet implemented for " + filter.getClass().getCanonicalName()); - return false; + return Optional.empty(); } else { - query.putAll(mongoFilter); - return true; + return Optional.of(mongoFilter); } } } diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/query/MongoDBQuery.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/query/MongoDBQuery.java index 0c2f22f..762d782 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/query/MongoDBQuery.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/query/MongoDBQuery.java @@ -17,14 +17,24 @@ */ package org.apache.gora.mongodb.query; +import com.mongodb.client.model.Projections; import org.apache.gora.mongodb.store.MongoMapping; import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.Query; import org.apache.gora.query.impl.QueryBase; import org.apache.gora.store.DataStore; +import org.bson.Document; +import org.bson.conversions.Bson; -import com.mongodb.BasicDBObject; -import com.mongodb.DBObject; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.mongodb.client.model.Filters.and; +import static com.mongodb.client.model.Filters.eq; +import static com.mongodb.client.model.Filters.gte; +import static com.mongodb.client.model.Filters.lte; /** * MongoDB specific implementation of the {@link Query} interface. @@ -44,40 +54,37 @@ public class MongoDBQuery<K, T extends PersistentBase> extends QueryBase<K, T> { /** * Compute the query itself. Only make use of the keys for querying. * - * @return a {@link DBObject} corresponding to the query + * @return a {@link Document} corresponding to the query */ - public static DBObject toDBQuery(Query<?, ?> query) { - BasicDBObject q = new BasicDBObject(); + public static Bson toDBQuery(Query<?, ?> query) { + if ((query.getStartKey() != null) && (query.getEndKey() != null) && query.getStartKey().equals(query.getEndKey())) { - q.put("_id", query.getStartKey()); + return eq("_id", query.getStartKey()); } else { - if (query.getStartKey() != null) - q.put("_id", new BasicDBObject("$gte", query.getStartKey())); - if (query.getEndKey() != null) - q.put("_id", new BasicDBObject("$lte", query.getEndKey())); + List<Bson> filters = new ArrayList<>(); + if (query.getStartKey() != null) { + filters.add(gte("_id", query.getStartKey())); + } + if (query.getEndKey() != null) { + filters.add(lte("_id", query.getEndKey())); + } + return filters.isEmpty() ? new Document() : and(filters); } - - return q; } /** * Compute the projection of the query, that is the fields that will be * retrieved from the database. * - * @return a {@link DBObject} corresponding to the list of field to be + * @return a {@link Document} corresponding to the list of field to be * retrieved with the associated boolean */ - public static DBObject toProjection(String[] fields, MongoMapping mapping) { - BasicDBObject proj = new BasicDBObject(); - - for (String k : fields) { - String dbFieldName = mapping.getDocumentField(k); - if (dbFieldName != null && dbFieldName.length() > 0) { - proj.put(dbFieldName, true); - } - } - - return proj; + public static Bson toProjection(String[] fields, MongoMapping mapping) { + List<String> dbFields = Stream.of(fields) + .map(mapping::getDocumentField) + .filter(dbField -> dbField != null && !dbField.isEmpty()) + .collect(Collectors.toList()); + return Projections.include(dbFields); } } diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/query/MongoDBResult.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/query/MongoDBResult.java index 3965333..c2ba04e 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/query/MongoDBResult.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/query/MongoDBResult.java @@ -17,16 +17,15 @@ */ package org.apache.gora.mongodb.query; -import java.io.IOException; - +import com.mongodb.client.MongoCursor; import org.apache.gora.mongodb.store.MongoStore; import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.Query; import org.apache.gora.query.impl.ResultBase; import org.apache.gora.store.DataStore; +import org.bson.Document; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; +import java.io.IOException; /** * MongoDB specific implementation of the {@link org.apache.gora.query.Result} @@ -41,15 +40,17 @@ public class MongoDBResult<K, T extends PersistentBase> extends /** * Reference to the cursor pointing to the results */ - private DBCursor cursor; - private int size; + private MongoCursor<Document> cursor; + private long size; - public MongoDBResult(DataStore<K, T> dataStore, Query<K, T> query) { + public MongoDBResult(DataStore<K, T> dataStore, Query<K, T> query, MongoCursor<Document> cursor, long size) { super(dataStore, query); + this.cursor = cursor; + this.size = size; } @Override - public float getProgress() throws IOException { + public float getProgress() { if (cursor == null) { return 0; } else if (size == 0) { @@ -60,7 +61,7 @@ public class MongoDBResult<K, T extends PersistentBase> extends } @Override - public void close() throws IOException { + public void close() { if (cursor != null) { cursor.close(); } @@ -72,27 +73,15 @@ public class MongoDBResult<K, T extends PersistentBase> extends return false; } - DBObject obj = cursor.next(); + Document obj = cursor.next(); key = (K) obj.get("_id"); persistent = ((MongoStore<K, T>) getDataStore()).newInstance(obj, getQuery().getFields()); return persistent != null; } - /** - * Save the reference to the cursor that holds the actual results. - * - * @param cursor - * {@link DBCursor} obtained from a query execution and that holds - * the actual results - */ - public void setCursor(DBCursor cursor) { - this.cursor = cursor; - this.size = cursor.size(); - } - @Override public int size() { - return size; + return (int) size; } } diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java index 82657e3..295a675 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStore.java @@ -17,31 +17,13 @@ */ package org.apache.gora.mongodb.store; -import static com.mongodb.AuthenticationMechanism.GSSAPI; -import static com.mongodb.AuthenticationMechanism.MONGODB_CR; -import static com.mongodb.AuthenticationMechanism.MONGODB_X509; -import static com.mongodb.AuthenticationMechanism.PLAIN; -import static com.mongodb.AuthenticationMechanism.SCRAM_SHA_1; - -import java.io.IOException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.TimeZone; -import java.util.concurrent.ConcurrentHashMap; - -import javax.xml.bind.DatatypeConverter; - +import com.google.common.base.Splitter; +import com.mongodb.*; +import com.mongodb.client.*; +import com.mongodb.client.model.CountOptions; +import com.mongodb.client.model.CreateCollectionOptions; +import com.mongodb.client.model.UpdateOptions; +import com.mongodb.client.result.DeleteResult; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; @@ -52,7 +34,7 @@ import org.apache.gora.mongodb.query.MongoDBQuery; import org.apache.gora.mongodb.query.MongoDBResult; import org.apache.gora.mongodb.store.MongoMapping.DocumentFieldType; import org.apache.gora.mongodb.utils.BSONDecorator; -import org.apache.gora.mongodb.utils.GoraDBEncoder; +import org.apache.gora.mongodb.utils.Utf8Codec; import org.apache.gora.persistency.impl.BeanFactoryImpl; import org.apache.gora.persistency.impl.DirtyListWrapper; import org.apache.gora.persistency.impl.DirtyMapWrapper; @@ -65,26 +47,25 @@ import org.apache.gora.store.impl.DataStoreBase; import org.apache.gora.util.AvroUtils; import org.apache.gora.util.ClassLoadingUtils; import org.apache.gora.util.GoraException; +import org.bson.Document; +import org.bson.codecs.configuration.CodecRegistries; +import org.bson.codecs.configuration.CodecRegistry; +import org.bson.conversions.Bson; import org.bson.types.ObjectId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Splitter; -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; -import com.mongodb.Bytes; -import com.mongodb.DB; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.Mongo; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientOptions; -import com.mongodb.MongoCredential; -import com.mongodb.ReadPreference; -import com.mongodb.ServerAddress; -import com.mongodb.WriteConcern; -import com.mongodb.WriteResult; +import javax.xml.bind.DatatypeConverter; +import java.io.IOException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.StreamSupport; + +import static com.mongodb.AuthenticationMechanism.*; +import static com.mongodb.client.model.Filters.and; /** * Implementation of a MongoDB data store to be used by gora. @@ -109,11 +90,11 @@ DataStoreBase<K, T> { /** * MongoDB client */ - private static ConcurrentHashMap<String, MongoClient> mapsOfClients = new ConcurrentHashMap<>(); + private static ConcurrentHashMap<String, com.mongodb.client.MongoClient> mapsOfClients = new ConcurrentHashMap<>(); - private DB mongoClientDB; + private MongoDatabase mongoClientDB; - private DBCollection mongoClientColl; + private MongoCollection<Document> mongoClientColl; /** * Mapping definition for MongoDB @@ -167,47 +148,53 @@ DataStoreBase<K, T> { * * @param params This value should specify the host:port (at least one) for * connecting to remote MongoDB. - * @return a {@link Mongo} instance connected to the server - * @throws UnknownHostException + * @return a {@link com.mongodb.client.MongoClient} instance connected to the server */ - private MongoClient getClient(MongoStoreParameters params) - throws UnknownHostException { + private com.mongodb.client.MongoClient getClient(MongoStoreParameters params) { + + // Utf8 serialization! + CodecRegistry codecRegistry = CodecRegistries.fromRegistries( + MongoClientSettings.getDefaultCodecRegistry(), + CodecRegistries.fromCodecs(new Utf8Codec()) + ); // Configure options - MongoClientOptions.Builder optBuilder = new MongoClientOptions.Builder() - .dbEncoderFactory(GoraDBEncoder.FACTORY); // Utf8 serialization! + MongoClientSettings.Builder settings = MongoClientSettings.builder(); + settings.codecRegistry(codecRegistry); if (params.getReadPreference() != null) { - optBuilder.readPreference(ReadPreference.valueOf(params.getReadPreference())); + settings.readPreference(ReadPreference.valueOf(params.getReadPreference())); } if (params.getWriteConcern() != null) { - optBuilder.writeConcern(WriteConcern.valueOf(params.getWriteConcern())); - } - // If configuration contains a login + secret, try to authenticated with DB - List<MongoCredential> credentials = new ArrayList<>(); - if (params.getLogin() != null && params.getSecret() != null) { - credentials.add(createCredential(params.getAuthenticationType(), params.getLogin(), params.getDbname(), params.getSecret())); + settings.writeConcern(WriteConcern.valueOf(params.getWriteConcern())); } + + // Build server address - List<ServerAddress> addrs = new ArrayList<>(); + List<ServerAddress> seeds = new ArrayList<>(); Iterable<String> serversArray = Splitter.on(",").split(params.getServers()); - if (serversArray != null) { - for (String server : serversArray) { - Iterator<String> paramsIterator = Splitter.on(":").trimResults().split(server).iterator(); - if (!paramsIterator.hasNext()) { - // No server, use default - addrs.add(new ServerAddress()); + for (String server : serversArray) { + Iterator<String> paramsIterator = Splitter.on(":").trimResults().split(server).iterator(); + if (!paramsIterator.hasNext()) { + // No server, use default + seeds.add(new ServerAddress()); + } else { + String host = paramsIterator.next(); + if (paramsIterator.hasNext()) { + String port = paramsIterator.next(); + seeds.add(new ServerAddress(host, Integer.parseInt(port))); } else { - String host = paramsIterator.next(); - if (paramsIterator.hasNext()) { - String port = paramsIterator.next(); - addrs.add(new ServerAddress(host, Integer.parseInt(port))); - } else { - addrs.add(new ServerAddress(host)); - } + seeds.add(new ServerAddress(host)); } } } - // Connect to the Mongo server - return new MongoClient(addrs, credentials, optBuilder.build()); + settings.applyToClusterSettings(builder -> builder.hosts(seeds)); + + // If configuration contains a login + secret, try to authenticated with DB + if (params.getLogin() != null && params.getSecret() != null) { + MongoCredential credential = createCredential(params.getAuthenticationType(), params.getLogin(), params.getDbname(), params.getSecret()); + settings.credential(credential); + } + + return MongoClients.create(settings.build()); } /** @@ -226,8 +213,8 @@ DataStoreBase<K, T> { credential = MongoCredential.createPlainCredential(username, database, password.toCharArray()); } else if (SCRAM_SHA_1.getMechanismName().equals(authenticationType)) { credential = MongoCredential.createScramSha1Credential(username, database, password.toCharArray()); - } else if (MONGODB_CR.getMechanismName().equals(authenticationType)) { - credential = MongoCredential.createMongoCRCredential(username, database, password.toCharArray()); + } else if (SCRAM_SHA_256.getMechanismName().equals(authenticationType)) { + credential = MongoCredential.createScramSha256Credential(username, database, password.toCharArray()); } else if (GSSAPI.getMechanismName().equals(authenticationType)) { credential = MongoCredential.createGSSAPICredential(username); } else if (MONGODB_X509.getMechanismName().equals(authenticationType)) { @@ -241,13 +228,12 @@ DataStoreBase<K, T> { /** * Get reference to Mongo DB, using credentials if not null. */ - private DB getDB(MongoStoreParameters parameters) throws UnknownHostException { + private MongoDatabase getDB(MongoStoreParameters parameters) throws UnknownHostException { // Get reference to Mongo DB if (!mapsOfClients.containsKey(parameters.getServers())) mapsOfClients.put(parameters.getServers(), getClient(parameters)); - DB db = mapsOfClients.get(parameters.getServers()).getDB(parameters.getDbname()); - return db; + return mapsOfClients.get(parameters.getServers()).getDatabase(parameters.getDbname()); } public MongoMapping getMapping() { @@ -282,14 +268,13 @@ DataStoreBase<K, T> { try { // If initialized create the collection - mongoClientColl = mongoClientDB.createCollection( - mapping.getCollectionName(), new BasicDBObject()); // send a DBObject to - // force creation - // otherwise creation is deferred - mongoClientColl.setDBEncoderFactory(GoraDBEncoder.FACTORY); + CreateCollectionOptions opts = new CreateCollectionOptions(); + String name = mapping.getCollectionName(); + mongoClientDB.createCollection(name, opts); + mongoClientColl = mongoClientDB.getCollection(name); - LOG.debug("Collection {} has been created for Mongo instance {}.", - new Object[] { mapping.getCollectionName(), mongoClientDB.getMongo() }); + LOG.debug("Collection {} has been created for Mongo database {}.", + new Object[] {name, mongoClientDB.getName() }); } catch (Exception e) { throw new GoraException(e); } @@ -309,8 +294,8 @@ DataStoreBase<K, T> { mongoClientColl.drop(); LOG.debug( - "Collection {} has been dropped for Mongo instance {}.", - new Object[] { mongoClientColl.getFullName(), mongoClientDB.getMongo() }); + "Collection {} has been dropped.", + new Object[] { mongoClientColl.getNamespace().getFullName() }); } catch (Exception e) { throw new GoraException(e); } @@ -322,7 +307,10 @@ DataStoreBase<K, T> { @Override public boolean schemaExists() throws GoraException { try { - return mongoClientDB.collectionExists(mapping.getCollectionName()); + MongoIterable<String> names = mongoClientDB.listCollectionNames(); + String name = mapping.getCollectionName(); + return StreamSupport.stream(names.spliterator(), false) + .anyMatch(name::equals); } catch (Exception e) { throw new GoraException(e); } @@ -333,15 +321,7 @@ DataStoreBase<K, T> { */ @Override public void flush() throws GoraException { - try { - for (MongoClient client : mapsOfClients.values()) { - client.fsync(false); - LOG.debug("Forced synced of database for Mongo instance {}.", - new Object[] { client }); - } - } catch (Exception e) { - throw new GoraException(e); - } + // no-op } /** @@ -364,8 +344,8 @@ DataStoreBase<K, T> { try { String[] dbFields = getFieldsToQuery(fields); // Prepare the MongoDB query - BasicDBObject q = new BasicDBObject("_id", key); - BasicDBObject proj = new BasicDBObject(); + Document q = new Document("_id", key); + Document proj = new Document(); for (String field : dbFields) { String docf = mapping.getDocumentField(field); if (docf != null) { @@ -373,9 +353,9 @@ DataStoreBase<K, T> { } } // Execute the query - DBObject res = mongoClientColl.findOne(q, proj); + FindIterable<Document> res = mongoClientColl.find(q).projection(proj); // Build the corresponding persistent - return newInstance(res, dbFields); + return newInstance(res.first(), dbFields); } catch (Exception e) { throw new GoraException(e); } @@ -385,11 +365,10 @@ DataStoreBase<K, T> { public boolean exists(final K key) throws GoraException { try { // Prepare the MongoDB query - BasicDBObject q = new BasicDBObject("_id", key); - BasicDBObject proj = new BasicDBObject(); + Document q = new Document("_id", key); // Execute the query - DBObject res = mongoClientColl.findOne(q, proj); - return res != null; + long res = mongoClientColl.countDocuments(q); + return res > 0; } catch (Exception e) { throw new GoraException(e); } @@ -429,24 +408,24 @@ DataStoreBase<K, T> { */ private void performPut(final K key, final T obj) { // Build the query to select the object to be updated - DBObject qSel = new BasicDBObject("_id", key); + Document qSel = new Document("_id", key); // Build the update query - BasicDBObject qUpdate = new BasicDBObject(); + Document qUpdate = new Document(); - BasicDBObject qUpdateSet = newUpdateSetInstance(obj); + Document qUpdateSet = newUpdateSetInstance(obj); if (qUpdateSet.size() > 0) { qUpdate.put("$set", qUpdateSet); } - BasicDBObject qUpdateUnset = newUpdateUnsetInstance(obj); + Document qUpdateUnset = newUpdateUnsetInstance(obj); if (qUpdateUnset.size() > 0) { qUpdate.put("$unset", qUpdateUnset); } // Execute the update (if there is at least one $set ot $unset if (!qUpdate.isEmpty()) { - mongoClientColl.update(qSel, qUpdate, true, false); + mongoClientColl.updateOne(qSel, qUpdate, new UpdateOptions().upsert(true)); obj.clearDirty(); } else { LOG.debug("No update to perform, skip {}", key); @@ -456,9 +435,9 @@ DataStoreBase<K, T> { @Override public boolean delete(final K key) throws GoraException { try { - DBObject removeKey = new BasicDBObject("_id", key); - WriteResult writeResult = mongoClientColl.remove(removeKey); - return writeResult != null && writeResult.getN() > 0; + Document removeKey = new Document("_id", key); + DeleteResult writeResult = mongoClientColl.deleteOne(removeKey); + return writeResult.getDeletedCount() > 0; } catch (Exception e) { throw new GoraException(e); } @@ -468,12 +447,9 @@ DataStoreBase<K, T> { public long deleteByQuery(final Query<K, T> query) throws GoraException { try { // Build the actual MongoDB query - DBObject q = MongoDBQuery.toDBQuery(query); - WriteResult writeResult = mongoClientColl.remove(q); - if (writeResult != null) { - return writeResult.getN(); - } - return 0; + Bson q = MongoDBQuery.toDBQuery(query); + DeleteResult writeResult = mongoClientColl.deleteMany(q); + return writeResult.getDeletedCount(); } catch (Exception e) { throw new GoraException(e); } @@ -487,29 +463,32 @@ DataStoreBase<K, T> { try { String[] fields = getFieldsToQuery(query.getFields()); // Build the actual MongoDB query - DBObject q = MongoDBQuery.toDBQuery(query); - DBObject p = MongoDBQuery.toProjection(fields, mapping); + Bson q = MongoDBQuery.toDBQuery(query); + Bson p = MongoDBQuery.toProjection(fields, mapping); if (query.getFilter() != null) { - boolean succeeded = filterUtil.setFilter(q, query.getFilter(), this); - if (succeeded) { + Optional<Bson> filter = filterUtil.setFilter(query.getFilter(), this); + if (!filter.isPresent()) { // don't need local filter query.setLocalFilterEnabled(false); + } else { + q = and(q, filter.get()); } } // Execute the query on the collection - DBCursor cursor = mongoClientColl.find(q, p); - if (query.getLimit() > 0) - cursor = cursor.limit((int) query.getLimit()); - cursor.batchSize(100); - cursor.addOption(Bytes.QUERYOPTION_NOTIMEOUT); - + FindIterable<Document> iterable = mongoClientColl.find(q).projection(p); + CountOptions countOptions = new CountOptions(); + if (query.getLimit() > 0) { + iterable.limit((int) query.getLimit()); + countOptions.limit((int) query.getLimit()); + } + iterable.batchSize(100); + iterable.noCursorTimeout(true); + // Build the result - MongoDBResult<K, T> mongoResult = new MongoDBResult<>(this, query); - mongoResult.setCursor(cursor); - - return mongoResult; + long size = mongoClientColl.countDocuments(q, countOptions); + return new MongoDBResult<>(this, query, iterable.cursor(), size); } catch(Exception e) { throw new GoraException(e); } @@ -545,18 +524,18 @@ DataStoreBase<K, T> { // //////////////////////////////////////////////////////// DESERIALIZATION /** - * Build a new instance of the persisted class from the {@link DBObject} + * Build a new instance of the persisted class from the {@link Document} * retrieved from the database. * * @param obj - * the {@link DBObject} that results from the query to the database + * the {@link Document} that results from the query to the database * @param fields * the list of fields to be mapped to the persistence class instance * @return a persistence class instance which content was deserialized from - * the {@link DBObject} + * the {@link Document} * @throws GoraException */ - public T newInstance(final DBObject obj, final String[] fields) throws GoraException { + public T newInstance(final Document obj, final String[] fields) throws GoraException { if (obj == null) return null; BSONDecorator easybson = new BSONDecorator(obj); @@ -578,7 +557,7 @@ DataStoreBase<K, T> { LOG.debug( "Load from DBObject (MAIN), field:{}, schemaType:{}, docField:{}, storeType:{}", new Object[] { field.name(), fieldSchema.getType(), docf, storeType }); - Object result = fromDBObject(fieldSchema, storeType, field, docf, + Object result = fromDocument(fieldSchema, storeType, field, docf, easybson); persistent.put(field.pos(), result); } @@ -586,9 +565,9 @@ DataStoreBase<K, T> { return persistent; } - private Object fromDBObject(final Schema fieldSchema, - final DocumentFieldType storeType, final Field field, final String docf, - final BSONDecorator easybson) throws GoraException { + private Object fromDocument(final Schema fieldSchema, + final DocumentFieldType storeType, final Field field, final String docf, + final BSONDecorator easybson) throws GoraException { Object result = null; switch (fieldSchema.getType()) { case MAP: @@ -598,7 +577,7 @@ DataStoreBase<K, T> { result = fromMongoList(docf, fieldSchema, easybson, field); break; case RECORD: - DBObject rec = easybson.getDBObject(docf); + Document rec = easybson.getDBObject(docf); if (rec == null) { result = null; break; @@ -660,7 +639,7 @@ DataStoreBase<K, T> { "Load from DBObject (UNION), schemaType:{}, docField:{}, storeType:{}", new Object[] { innerSchema.getType(), docf, storeType }); // Deserialize as if schema was ["type"] - result = fromDBObject(innerSchema, storeType, field, docf, easybson); + result = fromDocument(innerSchema, storeType, field, docf, easybson); } else { throw new IllegalStateException( "MongoStore doesn't support 3 types union field yet. Please update your mapping"); @@ -670,7 +649,7 @@ DataStoreBase<K, T> { @SuppressWarnings({ "unchecked", "rawtypes" }) private Object fromMongoRecord(final Schema fieldSchema, final String docf, - final DBObject rec) throws GoraException { + final Document rec) throws GoraException { Object result; BSONDecorator innerBson = new BSONDecorator(rec); Class<?> clazz = null; @@ -692,7 +671,7 @@ DataStoreBase<K, T> { innerStoreType }); record.put( recField.pos(), - fromDBObject(innerSchema, innerStoreType, recField, innerDocField, + fromDocument(innerSchema, innerStoreType, recField, innerDocField, innerBson)); } result = record; @@ -701,7 +680,7 @@ DataStoreBase<K, T> { /* pp */ Object fromMongoList(final String docf, final Schema fieldSchema, final BSONDecorator easybson, final Field f) throws GoraException { - List<Object> list = easybson.getDBList(docf); + List<Document> list = easybson.getDBList(docf); List<Object> rlist = new ArrayList<>(); if (list == null) { return new DirtyListWrapper(rlist); @@ -710,8 +689,8 @@ DataStoreBase<K, T> { for (Object item : list) { DocumentFieldType storeType = mapping.getDocumentFieldType(docf); - Object o = fromDBObject(fieldSchema.getElementType(), storeType, f, - "item", new BSONDecorator(new BasicDBObject("item", item))); + Object o = fromDocument(fieldSchema.getElementType(), storeType, f, + "item", new BSONDecorator(new Document("item", item))); rlist.add(o); } return new DirtyListWrapper<>(rlist); @@ -719,7 +698,7 @@ DataStoreBase<K, T> { /* pp */ Object fromMongoMap(final String docf, final Schema fieldSchema, final BSONDecorator easybson, final Field f) throws GoraException { - BasicDBObject map = easybson.getDBObject(docf); + Document map = easybson.getDBObject(docf); Map<Utf8, Object> rmap = new HashMap<>(); if (map == null) { return new DirtyMapWrapper(rmap); @@ -729,7 +708,7 @@ DataStoreBase<K, T> { String decodedMapKey = decodeFieldKey(mapKey); DocumentFieldType storeType = mapping.getDocumentFieldType(docf); - Object o = fromDBObject(fieldSchema.getValueType(), storeType, f, mapKey, + Object o = fromDocument(fieldSchema.getValueType(), storeType, f, mapKey, new BSONDecorator(map)); rmap.put(new Utf8(decodedMapKey), o); } @@ -767,20 +746,20 @@ DataStoreBase<K, T> { // ////////////////////////////////////////////////////////// SERIALIZATION /** - * Build a new instance of {@link DBObject} from the persistence class - * instance in parameter. Limit the {@link DBObject} to the fields that are + * Build a new instance of {@link Document} from the persistence class + * instance in parameter. Limit the {@link Document} to the fields that are * dirty and not null, that is the fields that will need to be updated in the * store. * * @param persistent * a persistence class instance which content is to be serialized as - * a {@link DBObject} for use as parameter of a $set operator - * @return a {@link DBObject} which content corresponds to the fields that + * a {@link Document} for use as parameter of a $set operator + * @return a {@link Document} which content corresponds to the fields that * have to be updated... and formatted to be passed in parameter of a * $set operator */ - private BasicDBObject newUpdateSetInstance(final T persistent) { - BasicDBObject result = new BasicDBObject(); + private Document newUpdateSetInstance(final T persistent) { + Document result = new Document(); for (Field f : persistent.getSchema().getFields()) { if (persistent.isDirty(f.pos()) && (persistent.get(f.pos()) != null)) { String docf = mapping.getDocumentField(f.name()); @@ -789,7 +768,7 @@ DataStoreBase<K, T> { LOG.debug( "Transform value to DBObject (MAIN), docField:{}, schemaType:{}, storeType:{}", new Object[] { docf, f.schema().getType(), storeType }); - Object o = toDBObject(docf, f.schema(), f.schema().getType(), + Object o = toDocument(docf, f.schema(), f.schema().getType(), storeType, value); result.put(docf, o); } @@ -798,20 +777,20 @@ DataStoreBase<K, T> { } /** - * Build a new instance of {@link DBObject} from the persistence class - * instance in parameter. Limit the {@link DBObject} to the fields that are + * Build a new instance of {@link Document} from the persistence class + * instance in parameter. Limit the {@link Document} to the fields that are * dirty and null, that is the fields that will need to be updated in the * store by being removed. * * @param persistent * a persistence class instance which content is to be serialized as - * a {@link DBObject} for use as parameter of a $set operator - * @return a {@link DBObject} which content corresponds to the fields that + * a {@link Document} for use as parameter of a $set operator + * @return a {@link Document} which content corresponds to the fields that * have to be updated... and formated to be passed in parameter of a * $unset operator */ - private BasicDBObject newUpdateUnsetInstance(final T persistent) { - BasicDBObject result = new BasicDBObject(); + private Document newUpdateUnsetInstance(final T persistent) { + Document result = new Document(); for (Field f : persistent.getSchema().getFields()) { if (persistent.isDirty(f.pos()) && (persistent.get(f.pos()) == null)) { String docf = mapping.getDocumentField(f.name()); @@ -820,7 +799,7 @@ DataStoreBase<K, T> { LOG.debug( "Transform value to DBObject (MAIN), docField:{}, schemaType:{}, storeType:{}", new Object[] { docf, f.schema().getType(), storeType }); - Object o = toDBObject(docf, f.schema(), f.schema().getType(), + Object o = toDocument(docf, f.schema(), f.schema().getType(), storeType, value); result.put(docf, o); } @@ -829,9 +808,9 @@ DataStoreBase<K, T> { } @SuppressWarnings("unchecked") - private Object toDBObject(final String docf, final Schema fieldSchema, - final Type fieldType, final DocumentFieldType storeType, - final Object value) { + private Object toDocument(final String docf, final Schema fieldSchema, + final Type fieldType, final DocumentFieldType storeType, + final Object value) { Object result = null; switch (fieldType) { case MAP: @@ -910,7 +889,7 @@ DataStoreBase<K, T> { "Transform value to DBObject (UNION), schemaType:{}, type1:{}, storeType:{}", new Object[] { innerSchema.getType(), type1, storeType }); // Deserialize as if schema was ["type"] - result = toDBObject(docf, innerSchema, type1, storeType, value); + result = toDocument(docf, innerSchema, type1, storeType, value); } else { throw new IllegalStateException( "MongoStore doesn't support 3 types union field yet. Please update your mapping"); @@ -918,9 +897,9 @@ DataStoreBase<K, T> { return result; } - private BasicDBObject recordToMongo(final String docf, + private Document recordToMongo(final String docf, final Schema fieldSchema, final Object value) { - BasicDBObject record = new BasicDBObject(); + Document record = new Document(); for (Field member : fieldSchema.getFields()) { Object innerValue = ((PersistentBase) value).get(member.pos()); String innerDoc = mapping.getDocumentField(member.name()); @@ -932,7 +911,7 @@ DataStoreBase<K, T> { innerStoreType }); record.put( member.name(), - toDBObject(docf, member.schema(), innerType, innerStoreType, + toDocument(docf, member.schema(), innerType, innerStoreType, innerValue)); } return record; @@ -991,13 +970,13 @@ DataStoreBase<K, T> { * the Java Map that must be serialized into a MongoDB object * @param fieldType * type of the values within the map - * @return a {@link BasicDBObject} version of the {@link Map} that can be + * @return a {@link Document} version of the {@link Map} that can be * safely serialized into MongoDB. */ - private BasicDBObject mapToMongo(final String docf, + private Document mapToMongo(final String docf, final Map<CharSequence, ?> value, final Schema fieldSchema, final Type fieldType) { - BasicDBObject map = new BasicDBObject(); + Document map = new Document(); // Handle null case if (value == null) return map; @@ -1009,7 +988,7 @@ DataStoreBase<K, T> { Object mapValue = e.getValue(); DocumentFieldType storeType = mapping.getDocumentFieldType(docf); - Object result = toDBObject(docf, fieldSchema, fieldType, storeType, + Object result = toDocument(docf, fieldSchema, fieldType, storeType, mapValue); map.put(encodedMapKey, result); } @@ -1038,7 +1017,7 @@ DataStoreBase<K, T> { // Handle regular cases for (Object item : array) { DocumentFieldType storeType = mapping.getDocumentFieldType(docf); - Object result = toDBObject(docf, fieldSchema, fieldType, storeType, item); + Object result = toDocument(docf, fieldSchema, fieldType, storeType, item); list.add(result); } diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStoreParameters.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStoreParameters.java index b278b89..e939785 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStoreParameters.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/store/MongoStoreParameters.java @@ -17,7 +17,6 @@ */ package org.apache.gora.mongodb.store; -import com.mongodb.DB; import org.apache.hadoop.conf.Configuration; import java.util.Properties; @@ -92,14 +91,14 @@ public class MongoStoreParameters { private final String writeConcern; /** - * @param mappingFile - * @param servers + * @param mappingFile Configuration file for mapping. + * @param servers Collection of seeds servers. * @param dbname Name of database to connect to. * @param authenticationType Authentication type to login - * @param login Optionnal login for remote database. + * @param login Optional login for remote database. * @param secret Optional secret for remote database. - * @param readPreference - * @param writeConcern @return a {@link DB} instance from <tt>mongoClient</tt> or null if + * @param readPreference Optional {@link com.mongodb.ReadPreference}. + * @param writeConcern Optional {@link com.mongodb.WriteConcern}. */ private MongoStoreParameters(String mappingFile, String servers, String dbname, String authenticationType, String login, String secret, String readPreference, String writeConcern) { this.mappingFile = mappingFile; diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/utils/BSONDecorator.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/utils/BSONDecorator.java index ac98096..56680a9 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/utils/BSONDecorator.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/utils/BSONDecorator.java @@ -17,35 +17,34 @@ */ package org.apache.gora.mongodb.utils; -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; -import com.mongodb.DBObject; import org.apache.avro.util.Utf8; -import org.bson.BSONObject; +import org.bson.Document; +import org.bson.types.Binary; import java.nio.ByteBuffer; import java.util.Date; +import java.util.List; /** - * Utility class to build {@link DBObject} used by MongoDB in an easy way by + * Utility class to build {@link Document} used by MongoDB in an easy way by * directly specifying the fully qualified names of fields. * * @author Fabien Poulard fpoul...@dictanova.com */ public class BSONDecorator { - final private DBObject myBson; + final private Document myBson; - public BSONDecorator(final DBObject obj) { + public BSONDecorator(final Document obj) { myBson = obj; } /** - * Access the decorated {@link BSONObject}. + * Access the decorated {@link Document}. * - * @return the decorated {@link DBObject} in its actual state + * @return the decorated {@link Document} in its actual state */ - public DBObject asDBObject() { + public Document asDocument() { return myBson; } @@ -56,45 +55,45 @@ public class BSONDecorator { * * @param fieldName fully qualified name of the field * @return true if the field and all its parents exists in the decorated - * {@link DBObject}, false otherwise + * {@link Document}, false otherwise */ public boolean containsField(String fieldName) { // Prepare for in depth setting String[] fields = fieldName.split("\\."); int i = 0; - DBObject intermediate = myBson; + Document intermediate = myBson; // Set intermediate parents while (i < (fields.length - 1)) { - if (!intermediate.containsField(fields[i])) + if (!intermediate.containsKey(fields[i])) return false; - intermediate = (DBObject) intermediate.get(fields[i]); + intermediate = (Document) intermediate.get(fields[i]); i++; } // Check final field - return intermediate.containsField(fields[fields.length - 1]); + return intermediate.containsKey(fields[fields.length - 1]); } /** - * Access field as a {@link BasicDBObject}. + * Access field as a {@link Document}. * * @param fieldName fully qualified name of the field to be accessed - * @return value of the field as a {@link BasicDBObject} + * @return value of the field as a {@link Document} */ - public BasicDBObject getDBObject(String fieldName) { - return (BasicDBObject) getFieldParent(fieldName) + public Document getDBObject(String fieldName) { + return (Document) getFieldParent(fieldName) .get(getLeafName(fieldName)); } /** - * Access field as a {@link BasicDBList}. + * Access field as a {@link List<Document>}. * * @param fieldName fully qualified name of the field to be accessed - * @return value of the field as a {@link BasicDBList} + * @return value of the field as a {@link List<Document>} */ - public BasicDBList getDBList(String fieldName) { - return (BasicDBList) getFieldParent(fieldName).get(getLeafName(fieldName)); + public List<Document> getDBList(String fieldName) { + return (List<Document>) getFieldParent(fieldName).get(getLeafName(fieldName)); } /** @@ -104,9 +103,9 @@ public class BSONDecorator { * @return value of the field as a boolean */ public Boolean getBoolean(String fieldName) { - BasicDBObject parent = getFieldParent(fieldName); + Document parent = getFieldParent(fieldName); String lf = getLeafName(fieldName); - return parent.containsField(lf) ? parent.getBoolean(lf) : null; + return parent.containsKey(lf) ? parent.getBoolean(lf) : null; } /** @@ -116,9 +115,9 @@ public class BSONDecorator { * @return value of the field as a double */ public Double getDouble(String fieldName) { - BasicDBObject parent = getFieldParent(fieldName); + Document parent = getFieldParent(fieldName); String lf = getLeafName(fieldName); - return parent.containsField(lf) ? parent.getDouble(lf) : null; + return parent.containsKey(lf) ? parent.getDouble(lf) : null; } /** @@ -128,9 +127,9 @@ public class BSONDecorator { * @return value of the field as a double */ public Integer getInt(String fieldName) { - BasicDBObject parent = getFieldParent(fieldName); + Document parent = getFieldParent(fieldName); String lf = getLeafName(fieldName); - return parent.containsField(lf) && parent.get(lf) != null ? parent.getInt(lf) : null; + return parent.containsKey(lf) && parent.get(lf) != null ? parent.getInteger(lf) : null; } /** @@ -140,9 +139,9 @@ public class BSONDecorator { * @return value of the field as a double */ public Long getLong(String fieldName) { - BasicDBObject parent = getFieldParent(fieldName); + Document parent = getFieldParent(fieldName); String lf = getLeafName(fieldName); - return parent.containsField(lf) ? parent.getLong(lf) : null; + return parent.containsKey(lf) ? parent.getLong(lf) : null; } /** @@ -152,7 +151,7 @@ public class BSONDecorator { * @return value of the field as a date */ public Date getDate(String fieldName) { - BasicDBObject parent = getFieldParent(fieldName); + Document parent = getFieldParent(fieldName); String lf = getLeafName(fieldName); return parent.getDate(lf); } @@ -164,7 +163,7 @@ public class BSONDecorator { * @return value of the field as a {@link Utf8} string */ public Utf8 getUtf8String(String fieldName) { - BasicDBObject parent = getFieldParent(fieldName); + Document parent = getFieldParent(fieldName); String value = parent.getString(getLeafName(fieldName)); return (value != null) ? new Utf8(value) : null; } @@ -179,6 +178,8 @@ public class BSONDecorator { Object o = get(fieldName); if (o == null) return null; + else if (o instanceof Binary) + return ByteBuffer.wrap(((Binary) o).getData()); else if (o instanceof byte[]) return ByteBuffer.wrap((byte[]) o); else @@ -192,19 +193,19 @@ public class BSONDecorator { * @return value of the field */ public Object get(String fieldName) { - BasicDBObject parent = getFieldParent(fieldName); + Document parent = getFieldParent(fieldName); return parent.get(getLeafName(fieldName)); } /** * Set field. Create the intermediate levels if necessary as - * {@link BasicDBObject} fields. + * {@link Document} fields. * * @param fieldName fully qualified name of the field to be accessed * @param value value of the field */ public void put(String fieldName, Object value) { - BasicDBObject parent = getFieldParent(fieldName, true); + Document parent = getFieldParent(fieldName, true); parent.put(getLeafName(fieldName), value); } @@ -218,27 +219,27 @@ public class BSONDecorator { * @return the parent of the field * @throws IllegalAccessError if the field does not exist */ - private BasicDBObject getFieldParent(String fieldName, boolean createIfMissing) { + private Document getFieldParent(String fieldName, boolean createIfMissing) { String[] fields = fieldName.split("\\."); int i = 0; - BasicDBObject intermediate = (BasicDBObject) myBson; + Document intermediate = myBson; // Set intermediate parents while (i < (fields.length - 1)) { - if (!intermediate.containsField(fields[i])) + if (!intermediate.containsKey(fields[i])) if (createIfMissing) - intermediate.put(fields[i], new BasicDBObject()); + intermediate.put(fields[i], new Document()); else throw new IllegalAccessError("The field '" + fieldName + "' does not exist: '" + fields[i] + "' is missing."); - intermediate = (BasicDBObject) intermediate.get(fields[i]); + intermediate = (Document) intermediate.get(fields[i]); i++; } return intermediate; } - private BasicDBObject getFieldParent(String fieldName) { + private Document getFieldParent(String fieldName) { return getFieldParent(fieldName, false); } diff --git a/gora-mongodb/src/main/java/org/apache/gora/mongodb/utils/GoraDBEncoder.java b/gora-mongodb/src/main/java/org/apache/gora/mongodb/utils/Utf8Codec.java similarity index 54% rename from gora-mongodb/src/main/java/org/apache/gora/mongodb/utils/GoraDBEncoder.java rename to gora-mongodb/src/main/java/org/apache/gora/mongodb/utils/Utf8Codec.java index 587968a..60d5b31 100644 --- a/gora-mongodb/src/main/java/org/apache/gora/mongodb/utils/GoraDBEncoder.java +++ b/gora-mongodb/src/main/java/org/apache/gora/mongodb/utils/Utf8Codec.java @@ -17,40 +17,30 @@ */ package org.apache.gora.mongodb.utils; -import java.nio.ByteBuffer; - import org.apache.avro.util.Utf8; - -import com.mongodb.DBEncoder; -import com.mongodb.DBEncoderFactory; -import com.mongodb.DefaultDBEncoder; +import org.bson.BsonReader; +import org.bson.BsonWriter; +import org.bson.codecs.Codec; +import org.bson.codecs.DecoderContext; +import org.bson.codecs.EncoderContext; /** - * BSON encoder for BSONObject instances. + * BSON encoder for {@link Utf8} instances. */ -public class GoraDBEncoder extends DefaultDBEncoder { +public class Utf8Codec implements Codec<Utf8> { - public static DBEncoderFactory FACTORY = new DefaultFactory(); - - @Override - protected boolean putSpecial(String name, Object val) { - if (val instanceof Utf8) { - putString(name, val.toString()); - return true; - } else if (val instanceof ByteBuffer) { - putBinary(name, ((ByteBuffer) val).array()); - return true; - } else { - return super.putSpecial(name, val); + @Override + public Utf8 decode(BsonReader reader, DecoderContext decoderContext) { + return new Utf8(reader.readString()); } - } - - static class DefaultFactory implements DBEncoderFactory { @Override - public DBEncoder create() { - return new GoraDBEncoder(); + public void encode(BsonWriter writer, Utf8 value, EncoderContext encoderContext) { + writer.writeString(value.toString()); } - } + @Override + public Class<Utf8> getEncoderClass() { + return Utf8.class; + } } diff --git a/gora-mongodb/src/test/java/org/apache/gora/mongodb/filters/DefaultFactoryTest.java b/gora-mongodb/src/test/java/org/apache/gora/mongodb/filters/DefaultFactoryTest.java index 3658e42..966f9be 100644 --- a/gora-mongodb/src/test/java/org/apache/gora/mongodb/filters/DefaultFactoryTest.java +++ b/gora-mongodb/src/test/java/org/apache/gora/mongodb/filters/DefaultFactoryTest.java @@ -17,8 +17,7 @@ */ package org.apache.gora.mongodb.filters; -import static org.junit.Assert.assertEquals; - +import com.mongodb.MongoClient; import org.apache.avro.util.Utf8; import org.apache.gora.examples.generated.WebPage; import org.apache.gora.filter.FilterList; @@ -27,11 +26,13 @@ import org.apache.gora.filter.MapFieldValueFilter; import org.apache.gora.filter.SingleFieldValueFilter; import org.apache.gora.mongodb.store.MongoStore; import org.apache.hadoop.conf.Configuration; +import org.bson.BsonDocument; +import org.bson.conversions.Bson; import org.json.JSONObject; import org.junit.Before; import org.junit.Test; -import com.mongodb.DBObject; +import static org.junit.Assert.assertEquals; public class DefaultFactoryTest { @@ -56,9 +57,9 @@ public class DefaultFactoryTest { filter.setFilterOp(FilterOp.NOT_EQUALS); filter.setFilterIfMissing(true); - DBObject dbObject = filterFactory.createFilter(filter, store); + Bson dbObject = filterFactory.createFilter(filter, store); assertEquals(new JSONObject("{ \"url\" : { \"$ne\" : \"http://www.example.com\"}}").toString(), - new JSONObject(dbObject.toString()).toString()); + new JSONObject(asJson(dbObject)).toString()); } @Test @@ -67,9 +68,9 @@ public class DefaultFactoryTest { filter.setFilterOp(FilterOp.EQUALS); filter.setFilterIfMissing(false); // include doc with missing field - DBObject dbObject = filterFactory.createFilter(filter, store); + Bson dbObject = filterFactory.createFilter(filter, store); assertEquals(new JSONObject("{ \"$or\" : [ { \"url\" : { \"$exists\" : false}} , " + - "{ \"url\" : \"http://www.example.com\"}]}").toString(), new JSONObject(dbObject.toString()).toString()); + "{ \"url\" : \"http://www.example.com\"}]}").toString(), new JSONObject(asJson(dbObject)).toString()); } @Test @@ -78,9 +79,9 @@ public class DefaultFactoryTest { filter.setFilterOp(FilterOp.NOT_EQUALS); filter.setFilterIfMissing(true); - DBObject dbObject = filterFactory.createFilter(filter, store); + Bson dbObject = filterFactory.createFilter(filter, store); assertEquals(new JSONObject("{ \"h.C·T\" : { \"$ne\" : \"text/html\"}}").toString(), - new JSONObject(dbObject.toString()).toString()); + new JSONObject(asJson(dbObject)).toString()); } @Test @@ -89,17 +90,17 @@ public class DefaultFactoryTest { filter.setFilterOp(FilterOp.EQUALS); filter.setFilterIfMissing(false); // include doc with missing field - DBObject dbObject = filterFactory.createFilter(filter, store); + Bson dbObject = filterFactory.createFilter(filter, store); assertEquals(new JSONObject("{ \"$or\" : [ { \"h.C·T\" : { \"$exists\" : false}} , " + - "{ \"h.C·T\" : \"text/html\"}]}").toString(), new JSONObject(dbObject.toString()).toString()); + "{ \"h.C·T\" : \"text/html\"}]}").toString(), new JSONObject(asJson(dbObject)).toString()); } @Test public void testCreateFilter_list_empty() throws Exception { FilterList<String, WebPage> filter = new FilterList<>(); - DBObject dbObject = filterFactory.createFilter(filter, store); - assertEquals(new JSONObject("{ }").toString(), new JSONObject(dbObject.toString()).toString()); + Bson dbObject = filterFactory.createFilter(filter, store); + assertEquals(new JSONObject("{ }").toString(), new JSONObject(asJson(dbObject)).toString()); } @Test @@ -114,9 +115,9 @@ public class DefaultFactoryTest { urlFilter.setFilterOp(FilterOp.EQUALS); filter.addFilter(urlFilter); - DBObject dbObject = filterFactory.createFilter(filter, store); + Bson dbObject = filterFactory.createFilter(filter, store); assertEquals(new JSONObject("{ \"h.C·T\" : \"text/html\" , \"url\" : \"http://www.example.com\"}").toString(), - new JSONObject(dbObject.toString()).toString()); + new JSONObject(asJson(dbObject)).toString()); } /** @@ -131,9 +132,9 @@ public class DefaultFactoryTest { filter.getOperands().add(new Utf8("http://www.example.com")); filter.setFilterIfMissing(true); - DBObject dbObject = filterFactory.createFilter(filter, store); + Bson dbObject = filterFactory.createFilter(filter, store); assertEquals(new JSONObject("{ \"url\" : \"http://www.example.com\"}").toString(), - new JSONObject(dbObject.toString()).toString()); + new JSONObject(asJson(dbObject)).toString()); } private MapFieldValueFilter<String, WebPage> createHeadersFilter() { @@ -150,4 +151,10 @@ public class DefaultFactoryTest { filter.getOperands().add("http://www.example.com"); return filter; } + + private static String asJson(Bson bson) { + BsonDocument bsonDocument = bson.toBsonDocument(BsonDocument.class, MongoClient.getDefaultCodecRegistry()); + return bsonDocument.toString(); + } + } diff --git a/gora-mongodb/src/test/java/org/apache/gora/mongodb/store/TestMongoStore.java b/gora-mongodb/src/test/java/org/apache/gora/mongodb/store/TestMongoStore.java index 824cc9a..7b9f6eb 100644 --- a/gora-mongodb/src/test/java/org/apache/gora/mongodb/store/TestMongoStore.java +++ b/gora-mongodb/src/test/java/org/apache/gora/mongodb/store/TestMongoStore.java @@ -17,23 +17,21 @@ */ package org.apache.gora.mongodb.store; -import com.mongodb.BasicDBList; -import com.mongodb.BasicDBObject; import org.apache.avro.util.Utf8; -import org.apache.gora.examples.generated.Employee; import org.apache.gora.examples.generated.WebPage; import org.apache.gora.mongodb.GoraMongodbTestDriver; import org.apache.gora.mongodb.utils.BSONDecorator; import org.apache.gora.query.Query; import org.apache.gora.query.Result; -import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreTestBase; import org.apache.gora.util.GoraException; +import org.bson.Document; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import static org.junit.Assert.assertEquals; @@ -87,7 +85,7 @@ public abstract class TestMongoStore extends DataStoreTestBase { @Test public void testFromMongoList_null() throws Exception { MongoStore store = new MongoStore(); - BasicDBObject noField = new BasicDBObject(); + Document noField = new Document(); String field = "myField"; Object item = store.fromMongoList(field, null, new BSONDecorator(noField), null); @@ -98,7 +96,7 @@ public abstract class TestMongoStore extends DataStoreTestBase { public void testFromMongoList_empty() throws Exception { MongoStore store = new MongoStore(); String field = "myField"; - BasicDBObject emptyField = new BasicDBObject(field, new BasicDBList()); + Document emptyField = new Document(field, new ArrayList<Document>()); Object item = store.fromMongoList(field, null, new BSONDecorator(emptyField), null); assertNotNull(item); @@ -107,7 +105,7 @@ public abstract class TestMongoStore extends DataStoreTestBase { @Test public void testFromMongoMap_null() throws Exception { MongoStore store = new MongoStore(); - BasicDBObject noField = new BasicDBObject(); + Document noField = new Document(); String field = "myField"; Object item = store.fromMongoMap(field, null, new BSONDecorator(noField), null); @@ -118,7 +116,7 @@ public abstract class TestMongoStore extends DataStoreTestBase { public void testFromMongoMap_empty() throws Exception { MongoStore store = new MongoStore(); String field = "myField"; - BasicDBObject emptyField = new BasicDBObject(field, new BasicDBObject()); + Document emptyField = new Document(field, new Document()); Object item = store.fromMongoMap(field, null, new BSONDecorator(emptyField), null); assertNotNull(item); diff --git a/gora-mongodb/src/test/java/org/apache/gora/mongodb/utils/TestBSONDecorator.java b/gora-mongodb/src/test/java/org/apache/gora/mongodb/utils/TestBSONDecorator.java index 70ca56f..9874a7f 100644 --- a/gora-mongodb/src/test/java/org/apache/gora/mongodb/utils/TestBSONDecorator.java +++ b/gora-mongodb/src/test/java/org/apache/gora/mongodb/utils/TestBSONDecorator.java @@ -17,28 +17,27 @@ */ package org.apache.gora.mongodb.utils; -import com.mongodb.BasicDBObject; -import com.mongodb.BasicDBObjectBuilder; -import com.mongodb.DBObject; +import org.bson.Document; import org.junit.Test; import java.nio.ByteBuffer; import java.nio.charset.Charset; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestBSONDecorator { @Test public void testContainsField() { // Init the object used for testing - DBObject dbo1 = BasicDBObjectBuilder - .start() - .add("root0", "value") - .add("root1", new BasicDBObject("leaf1", 1)) - .add("root2", - new BasicDBObject("parent1", new BasicDBObject("leaf2", "test"))) - .get(); + Document dbo1 = new Document(); + dbo1.put("root0", "value"); + dbo1.put("root1", new Document("leaf1", 1)); + dbo1.put("root2", + new Document("parent1", new Document("leaf2", "test"))); BSONDecorator dboc = new BSONDecorator(dbo1); // Root level field, does exist @@ -66,15 +65,14 @@ public class TestBSONDecorator { @Test public void testBinaryField() { // Init the object used for testing - DBObject dbo1 = BasicDBObjectBuilder - .start() - .add("root0", "value") - .add("root1", new BasicDBObject("leaf1", "abcdefgh".getBytes(Charset.defaultCharset()))) - .add( + Document dbo1 = new Document(); + dbo1.put("root0", "value"); + dbo1.put("root1", new Document("leaf1", "abcdefgh".getBytes(Charset.defaultCharset()))); + dbo1.put( "root2", - new BasicDBObject("parent1", new BasicDBObject("leaf2", "test" - .getBytes(Charset.defaultCharset())))) - .add("root3", ByteBuffer.wrap("test2".getBytes(Charset.defaultCharset()))).get(); + new Document("parent1", new Document("leaf2", "test" + .getBytes(Charset.defaultCharset())))); + dbo1.put("root3", ByteBuffer.wrap("test2".getBytes(Charset.defaultCharset()))); BSONDecorator dboc = new BSONDecorator(dbo1); // Access first bytes field @@ -95,10 +93,8 @@ public class TestBSONDecorator { @Test public void testNullStringField() { // Init the object used for testing - DBObject dbo1 = BasicDBObjectBuilder - .start() - .add("key1", null) - .get(); + Document dbo1 = new Document(); + dbo1.put("key1", null); BSONDecorator dboc = new BSONDecorator(dbo1); assertTrue(dboc.containsField("key1")); @@ -109,7 +105,7 @@ public class TestBSONDecorator { @Test public void testNullFields() { - BSONDecorator dboc = new BSONDecorator(new BasicDBObject()); + BSONDecorator dboc = new BSONDecorator(new Document()); assertNull(dboc.getInt("key1")); assertNull(dboc.getLong("key1"));