This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fd98cf9  Add option to disable reshuffling of JdbcIO
     new 4815d0d  Merge pull request #7956: [BEAM-6670] Add 
`withOutputParallelization` option to disable reparallelization of JdbcIO.Read
fd98cf9 is described below

commit fd98cf91f16c9bc7fe6e77c2fa4866bf8cc3e056
Author: Mike Pedersen <m...@enversion.dk>
AuthorDate: Wed Feb 27 14:08:11 2019 +0100

    Add option to disable reshuffling of JdbcIO
---
 .../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java   | 59 +++++++++++++++++-----
 1 file changed, 47 insertions(+), 12 deletions(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 354b26c..e6f2699 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -160,7 +160,10 @@ public class JdbcIO {
    * @param <T> Type of the data to be read.
    */
   public static <T> Read<T> read() {
-    return new 
AutoValue_JdbcIO_Read.Builder<T>().setFetchSize(DEFAULT_FETCH_SIZE).build();
+    return new AutoValue_JdbcIO_Read.Builder<T>()
+        .setFetchSize(DEFAULT_FETCH_SIZE)
+        .setOutputParallelization(true)
+        .build();
   }
 
   /**
@@ -173,6 +176,7 @@ public class JdbcIO {
   public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
     return new AutoValue_JdbcIO_ReadAll.Builder<ParameterT, OutputT>()
         .setFetchSize(DEFAULT_FETCH_SIZE)
+        .setOutputParallelization(true)
         .build();
   }
 
@@ -399,6 +403,8 @@ public class JdbcIO {
 
     abstract int getFetchSize();
 
+    abstract boolean getOutputParallelization();
+
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -415,6 +421,8 @@ public class JdbcIO {
 
       abstract Builder<T> setFetchSize(int fetchSize);
 
+      abstract Builder<T> setOutputParallelization(boolean 
outputParallelization);
+
       abstract Read<T> build();
     }
 
@@ -457,6 +465,14 @@ public class JdbcIO {
       return toBuilder().setFetchSize(fetchSize).build();
     }
 
+    /**
+     * Whether to reshuffle the resulting PCollection so results are 
distributed to all workers. The
+     * default is to parallelize and should only be changed if this is known 
to be unnecessary.
+     */
+    public Read<T> withOutputParallelization(boolean outputParallelization) {
+      return 
toBuilder().setOutputParallelization(outputParallelization).build();
+    }
+
     @Override
     public PCollection<T> expand(PBegin input) {
       checkArgument(getQuery() != null, "withQuery() is required");
@@ -474,6 +490,7 @@ public class JdbcIO {
                   .withCoder(getCoder())
                   .withRowMapper(getRowMapper())
                   .withFetchSize(getFetchSize())
+                  .withOutputParallelization(getOutputParallelization())
                   .withParameterSetter(
                       (element, preparedStatement) -> {
                         if (getStatementPreparator() != null) {
@@ -515,6 +532,8 @@ public class JdbcIO {
 
     abstract int getFetchSize();
 
+    abstract boolean getOutputParallelization();
+
     abstract Builder<ParameterT, OutputT> toBuilder();
 
     @AutoValue.Builder
@@ -533,6 +552,8 @@ public class JdbcIO {
 
       abstract Builder<ParameterT, OutputT> setFetchSize(int fetchSize);
 
+      abstract Builder<ParameterT, OutputT> setOutputParallelization(boolean 
outputParallelization);
+
       abstract ReadAll<ParameterT, OutputT> build();
     }
 
@@ -582,19 +603,33 @@ public class JdbcIO {
       return toBuilder().setFetchSize(fetchSize).build();
     }
 
+    /**
+     * Whether to reshuffle the resulting PCollection so results are 
distributed to all workers. The
+     * default is to parallelize and should only be changed if this is known 
to be unnecessary.
+     */
+    public ReadAll<ParameterT, OutputT> withOutputParallelization(boolean 
outputParallelization) {
+      return 
toBuilder().setOutputParallelization(outputParallelization).build();
+    }
+
     @Override
     public PCollection<OutputT> expand(PCollection<ParameterT> input) {
-      return input
-          .apply(
-              ParDo.of(
-                  new ReadFn<>(
-                      getDataSourceConfiguration(),
-                      getQuery(),
-                      getParameterSetter(),
-                      getRowMapper(),
-                      getFetchSize())))
-          .setCoder(getCoder())
-          .apply(new Reparallelize<>());
+      PCollection<OutputT> output =
+          input
+              .apply(
+                  ParDo.of(
+                      new ReadFn<>(
+                          getDataSourceConfiguration(),
+                          getQuery(),
+                          getParameterSetter(),
+                          getRowMapper(),
+                          getFetchSize())))
+              .setCoder(getCoder());
+
+      if (getOutputParallelization()) {
+        output = output.apply(new Reparallelize<>());
+      }
+
+      return output;
     }
 
     @Override

Reply via email to