Repository: beam Updated Branches: refs/heads/master 1476f3412 -> b400f4a6f
[BEAM-975] Improve default connection options, javadoc and style in MongoDbIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/87be64e9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/87be64e9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/87be64e9 Branch: refs/heads/master Commit: 87be64e9817da5e5c86a243471021268d6281b33 Parents: 1476f34 Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Fri May 12 15:21:49 2017 +0200 Committer: Jean-Baptiste Onofré <jbono...@apache.org> Committed: Mon Jun 19 21:23:11 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/sdk/io/mongodb/MongoDbIO.java | 315 +++++++++++++++---- .../beam/sdk/io/mongodb/MongoDbIOTest.java | 37 +++ 2 files changed, 283 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/87be64e9/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 620df74..04d9975 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -18,12 +18,13 @@ package org.apache.beam.sdk.io.mongodb; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.mongodb.BasicDBObject; import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; @@ -100,12 +101,20 @@ public class MongoDbIO { /** Read data from MongoDB. */ public static Read read() { - return new AutoValue_MongoDbIO_Read.Builder().setNumSplits(0).build(); + return new AutoValue_MongoDbIO_Read.Builder() + .setKeepAlive(true) + .setMaxConnectionIdleTime(60000) + .setNumSplits(0) + .build(); } /** Write data to MongoDB. */ public static Write write() { - return new AutoValue_MongoDbIO_Write.Builder().setBatchSize(1024L).build(); + return new AutoValue_MongoDbIO_Write.Builder() + .setKeepAlive(true) + .setMaxConnectionIdleTime(60000) + .setBatchSize(1024L) + .build(); } private MongoDbIO() { @@ -117,16 +126,20 @@ public class MongoDbIO { @AutoValue public abstract static class Read extends PTransform<PBegin, PCollection<Document>> { @Nullable abstract String uri(); + abstract boolean keepAlive(); + abstract int maxConnectionIdleTime(); @Nullable abstract String database(); @Nullable abstract String collection(); @Nullable abstract String filter(); abstract int numSplits(); - abstract Builder toBuilder(); + abstract Builder builder(); @AutoValue.Builder abstract static class Builder { abstract Builder setUri(String uri); + abstract Builder setKeepAlive(boolean keepAlive); + abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime); abstract Builder setDatabase(String database); abstract Builder setCollection(String collection); abstract Builder setFilter(String filter); @@ -135,31 +148,94 @@ public class MongoDbIO { } /** - * Example documentation for withUri. + * Define the location of the MongoDB instances using an URI. The URI describes the hosts to + * be used and some options. + * + * <p>The format of the URI is: + * + * <pre>{@code + * mongodb://[username:password@]host1[:port1]...[,hostN[:portN]]][/[database][?options]] + * }</pre> + * + * <p>Where: + * <ul> + * <li>{@code mongodb://} is a required prefix to identify that this is a string in the + * standard connection format.</li> + * <li>{@code username:password@} are optional. If given, the driver will attempt to + * login to a database after connecting to a database server. For some authentication + * mechanisms, only the username is specified and the password is not, in which case + * the ":" after the username is left off as well.</li> + * <li>{@code host1} is the only required part of the URI. It identifies a server + * address to connect to.</li> + * <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li> + * <li>{@code /database} is the name of the database to login to and thus is only + * relevant if the {@code username:password@} syntax is used. If not specified, the + * "admin" database will be used by default. It has to be equivalent with the database + * you specific with {@link Read#withDatabase(String)}.</li> + * <li>{@code ?options} are connection options. Note that if {@code database} is absent + * there is still a {@code /} required between the last {@code host} and the {@code ?} + * introducing the options. Options are name=value pairs and the pairs are separated by + * "{@code &}". The {@code KeepAlive} connection option can't be passed via the URI, + * instead you have to use {@link Read#withKeepAlive(boolean)}. Same for the + * {@code MaxConnectionIdleTime} connection option via + * {@link Read#withMaxConnectionIdleTime(int)}. + * </li> + * </ul> */ public Read withUri(String uri) { - checkNotNull(uri); - return toBuilder().setUri(uri).build(); + checkArgument(uri != null, "MongoDbIO.read().withUri(uri) called with null uri"); + return builder().setUri(uri).build(); + } + + /** + * Sets whether socket keep alive is enabled. + */ + public Read withKeepAlive(boolean keepAlive) { + return builder().setKeepAlive(keepAlive).build(); + } + + /** + * Sets the maximum idle time for a pooled connection. + */ + public Read withMaxConnectionIdleTime(int maxConnectionIdleTime) { + return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build(); } + /** + * Sets the database to use. + */ public Read withDatabase(String database) { - checkNotNull(database); - return toBuilder().setDatabase(database).build(); + checkArgument(database != null, "MongoDbIO.read().withDatabase(database) called with null" + + " database"); + return builder().setDatabase(database).build(); } + /** + * Sets the collection to consider in the database. + */ public Read withCollection(String collection) { - checkNotNull(collection); - return toBuilder().setCollection(collection).build(); + checkArgument(collection != null, "MongoDbIO.read().withCollection(collection) called " + + "with null collection"); + return builder().setCollection(collection).build(); } + /** + * Sets a filter on the documents in a collection. + */ public Read withFilter(String filter) { - checkNotNull(filter); - return toBuilder().setFilter(filter).build(); + checkArgument(filter != null, "MongoDbIO.read().withFilter(filter) called with null " + + "filter"); + return builder().setFilter(filter).build(); } + /** + * Sets the user defined number of splits. + */ public Read withNumSplits(int numSplits) { - checkArgument(numSplits >= 0); - return toBuilder().setNumSplits(numSplits).build(); + checkArgument(numSplits >= 0, "MongoDbIO.read().withNumSplits(numSplits) called with " + + "invalid number. The number of splits has to be a positive value (currently %d)", + numSplits); + return builder().setNumSplits(numSplits).build(); } @Override @@ -169,15 +245,19 @@ public class MongoDbIO { @Override public void validate(PipelineOptions options) { - checkNotNull(uri(), "uri"); - checkNotNull(database(), "database"); - checkNotNull(collection(), "collection"); + checkState(uri() != null, "MongoDbIO.read() requires an URI to be set via withUri(uri)"); + checkState(database() != null, "MongoDbIO.read() requires a database to be set via " + + "withDatabase(database)"); + checkState(collection() != null, "MongoDbIO.read() requires a collection to be set via " + + "withCollection(collection)"); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.add(DisplayData.item("uri", uri())); + builder.add(DisplayData.item("keepAlive", keepAlive())); + builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime())); builder.add(DisplayData.item("database", database())); builder.add(DisplayData.item("collection", collection())); builder.addIfNotNull(DisplayData.item("filter", filter())); @@ -218,61 +298,71 @@ public class MongoDbIO { @Override public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) { - MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri())); - MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database()); + try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) { + return getEstimatedSizeBytes(mongoClient, spec.database(), spec.collection()); + } + } + + private long getEstimatedSizeBytes(MongoClient mongoClient, + String database, + String collection) { + MongoDatabase mongoDatabase = mongoClient.getDatabase(database); // get the Mongo collStats object // it gives the size for the entire collection BasicDBObject stat = new BasicDBObject(); - stat.append("collStats", spec.collection()); + stat.append("collStats", collection); Document stats = mongoDatabase.runCommand(stat); + return stats.get("size", Number.class).longValue(); } @Override public List<BoundedSource<Document>> split(long desiredBundleSizeBytes, PipelineOptions options) { - MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri())); - MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database()); - - List<Document> splitKeys; - if (spec.numSplits() > 0) { - // the user defines his desired number of splits - // calculate the batch size - long estimatedSizeBytes = getEstimatedSizeBytes(options); - desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits(); - } + try (MongoClient mongoClient = new MongoClient(new MongoClientURI(spec.uri()))) { + MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database()); + + List<Document> splitKeys; + if (spec.numSplits() > 0) { + // the user defines his desired number of splits + // calculate the batch size + long estimatedSizeBytes = getEstimatedSizeBytes(mongoClient, + spec.database(), spec.collection()); + desiredBundleSizeBytes = estimatedSizeBytes / spec.numSplits(); + } - // the desired batch size is small, using default chunk size of 1MB - if (desiredBundleSizeBytes < 1024 * 1024) { - desiredBundleSizeBytes = 1 * 1024 * 1024; - } + // the desired batch size is small, using default chunk size of 1MB + if (desiredBundleSizeBytes < 1024 * 1024) { + desiredBundleSizeBytes = 1 * 1024 * 1024; + } - // now we have the batch size (provided by user or provided by the runner) - // we use Mongo splitVector command to get the split keys - BasicDBObject splitVectorCommand = new BasicDBObject(); - splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection()); - splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1)); - splitVectorCommand.append("force", false); - // maxChunkSize is the Mongo partition size in MB - LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024); - splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024); - Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand); - splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys"); - - List<BoundedSource<Document>> sources = new ArrayList<>(); - if (splitKeys.size() < 1) { - LOG.debug("Split keys is low, using an unique source"); - sources.add(this); - return sources; - } + // now we have the batch size (provided by user or provided by the runner) + // we use Mongo splitVector command to get the split keys + BasicDBObject splitVectorCommand = new BasicDBObject(); + splitVectorCommand.append("splitVector", spec.database() + "." + spec.collection()); + splitVectorCommand.append("keyPattern", new BasicDBObject().append("_id", 1)); + splitVectorCommand.append("force", false); + // maxChunkSize is the Mongo partition size in MB + LOG.debug("Splitting in chunk of {} MB", desiredBundleSizeBytes / 1024 / 1024); + splitVectorCommand.append("maxChunkSize", desiredBundleSizeBytes / 1024 / 1024); + Document splitVectorCommandResult = mongoDatabase.runCommand(splitVectorCommand); + splitKeys = (List<Document>) splitVectorCommandResult.get("splitKeys"); + + List<BoundedSource<Document>> sources = new ArrayList<>(); + if (splitKeys.size() < 1) { + LOG.debug("Split keys is low, using an unique source"); + sources.add(this); + return sources; + } - LOG.debug("Number of splits is {}", splitKeys.size()); - for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) { - sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter))); - } + LOG.debug("Number of splits is {}", splitKeys.size()); + for (String shardFilter : splitKeysToFilters(splitKeys, spec.filter())) { + sources.add(new BoundedMongoDbSource(spec.withFilter(shardFilter))); + } - return sources; + return sources; + } } /** @@ -367,7 +457,10 @@ public class MongoDbIO { @Override public boolean start() { Read spec = source.spec; - client = new MongoClient(new MongoClientURI(spec.uri())); + MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder(); + optionsBuilder.maxConnectionIdleTime(spec.maxConnectionIdleTime()); + optionsBuilder.socketKeepAlive(spec.keepAlive()); + client = new MongoClient(new MongoClientURI(spec.uri(), optionsBuilder)); MongoDatabase mongoDatabase = client.getDatabase(spec.database()); @@ -426,36 +519,106 @@ public class MongoDbIO { */ @AutoValue public abstract static class Write extends PTransform<PCollection<Document>, PDone> { + @Nullable abstract String uri(); + abstract boolean keepAlive(); + abstract int maxConnectionIdleTime(); @Nullable abstract String database(); @Nullable abstract String collection(); abstract long batchSize(); - abstract Builder toBuilder(); + abstract Builder builder(); @AutoValue.Builder abstract static class Builder { abstract Builder setUri(String uri); + abstract Builder setKeepAlive(boolean keepAlive); + abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime); abstract Builder setDatabase(String database); abstract Builder setCollection(String collection); abstract Builder setBatchSize(long batchSize); abstract Write build(); } + /** + * Define the location of the MongoDB instances using an URI. The URI describes the hosts to + * be used and some options. + * + * <p>The format of the URI is: + * + * <pre>{@code + * mongodb://[username:password@]host1[:port1],...[,hostN[:portN]]][/[database][?options]] + * }</pre> + * + * <p>Where: + * <ul> + * <li>{@code mongodb://} is a required prefix to identify that this is a string in the + * standard connection format.</li> + * <li>{@code username:password@} are optional. If given, the driver will attempt to + * login to a database after connecting to a database server. For some authentication + * mechanisms, only the username is specified and the password is not, in which case + * the ":" after the username is left off as well.</li> + * <li>{@code host1} is the only required part of the URI. It identifies a server + * address to connect to.</li> + * <li>{@code :portX} is optional and defaults to {@code :27017} if not provided.</li> + * <li>{@code /database} is the name of the database to login to and thus is only + * relevant if the {@code username:password@} syntax is used. If not specified, the + * "admin" database will be used by default. It has to be equivalent with the database + * you specific with {@link Write#withDatabase(String)}.</li> + * <li>{@code ?options} are connection options. Note that if {@code database} is absent + * there is still a {@code /} required between the last {@code host} and the {@code ?} + * introducing the options. Options are name=value pairs and the pairs are separated by + * "{@code &}". The {@code KeepAlive} connection option can't be passed via the URI, instead + * you have to use {@link Write#withKeepAlive(boolean)}. Same for the + * {@code MaxConnectionIdleTime} connection option via + * {@link Write#withMaxConnectionIdleTime(int)}. + * </li> + * </ul> + */ public Write withUri(String uri) { - return toBuilder().setUri(uri).build(); + checkArgument(uri != null, "MongoDbIO.write().withUri(uri) called with null uri"); + return builder().setUri(uri).build(); + } + + /** + * Sets whether socket keep alive is enabled. + */ + public Write withKeepAlive(boolean keepAlive) { + return builder().setKeepAlive(keepAlive).build(); + } + + /** + * Sets the maximum idle time for a pooled connection. + */ + public Write withMaxConnectionIdleTime(int maxConnectionIdleTime) { + return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build(); } + /** + * Sets the database to use. + */ public Write withDatabase(String database) { - return toBuilder().setDatabase(database).build(); + checkArgument(database != null, "MongoDbIO.write().withDatabase(database) called with " + + "null database"); + return builder().setDatabase(database).build(); } + /** + * Sets the collection where to write data in the database. + */ public Write withCollection(String collection) { - return toBuilder().setCollection(collection).build(); + checkArgument(collection != null, "MongoDbIO.write().withCollection(collection) called " + + "with null collection"); + return builder().setCollection(collection).build(); } + /** + * Define the size of the batch to group write operations. + */ public Write withBatchSize(long batchSize) { - return toBuilder().setBatchSize(batchSize).build(); + checkArgument(batchSize >= 0, "MongoDbIO.write().withBatchSize(batchSize) called with " + + "invalid batch size. Batch size has to be >= 0 (currently %d)", batchSize); + return builder().setBatchSize(batchSize).build(); } @Override @@ -466,10 +629,21 @@ public class MongoDbIO { @Override public void validate(PipelineOptions options) { - checkNotNull(uri(), "uri"); - checkNotNull(database(), "database"); - checkNotNull(collection(), "collection"); - checkNotNull(batchSize(), "batchSize"); + checkState(uri() != null, "MongoDbIO.write() requires an URI to be set via withUri(uri)"); + checkState(database() != null, "MongoDbIO.write() requires a database to be set via " + + "withDatabase(database)"); + checkState(collection() != null, "MongoDbIO.write() requires a collection to be set via " + + "withCollection(collection)"); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("uri", uri())); + builder.add(DisplayData.item("keepAlive", keepAlive())); + builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime())); + builder.add(DisplayData.item("database", database())); + builder.add(DisplayData.item("collection", collection())); + builder.add(DisplayData.item("batchSize", batchSize())); } private static class WriteFn extends DoFn<Document, Void> { @@ -483,7 +657,10 @@ public class MongoDbIO { @Setup public void createMongoClient() throws Exception { - client = new MongoClient(new MongoClientURI(spec.uri())); + MongoClientOptions.Builder builder = new MongoClientOptions.Builder(); + builder.socketKeepAlive(spec.keepAlive()); + builder.maxConnectionIdleTime(spec.maxConnectionIdleTime()); + client = new MongoClient(new MongoClientURI(spec.uri(), builder)); } @StartBundle http://git-wip-us.apache.org/repos/asf/beam/blob/87be64e9/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java index cd26b48..67dbca4 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.mongodb; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; @@ -189,6 +190,42 @@ public class MongoDbIOTest implements Serializable { } @Test + public void testReadWithCustomConnectionOptions() throws Exception { + MongoDbIO.Read read = MongoDbIO.read() + .withUri("mongodb://localhost:" + port) + .withKeepAlive(false) + .withMaxConnectionIdleTime(10) + .withDatabase(DATABASE) + .withCollection(COLLECTION); + assertFalse(read.keepAlive()); + assertEquals(10, read.maxConnectionIdleTime()); + + PCollection<Document> documents = pipeline.apply(read); + + PAssert.thatSingleton(documents.apply("Count All", Count.<Document>globally())) + .isEqualTo(1000L); + + PAssert.that(documents + .apply("Map Scientist", MapElements.via(new SimpleFunction<Document, KV<String, Void>>() { + public KV<String, Void> apply(Document input) { + return KV.of(input.getString("scientist"), null); + } + })) + .apply("Count Scientist", Count.<String, Void>perKey()) + ).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() { + @Override + public Void apply(Iterable<KV<String, Long>> input) { + for (KV<String, Long> element : input) { + assertEquals(100L, element.getValue().longValue()); + } + return null; + } + }); + + pipeline.run(); + } + + @Test public void testReadWithFilter() throws Exception { PCollection<Document> output = pipeline.apply(