[ 
https://issues.apache.org/jira/browse/BEAM-9822?focusedWorklogId=434673&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-434673
 ]

ASF GitHub Bot logged work on BEAM-9822:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/May/20 23:33
            Start Date: 18/May/20 23:33
    Worklog Time Spent: 10m 
      Work Description: nielm commented on a change in pull request #11570:
URL: https://github.com/apache/beam/pull/11570#discussion_r426947630



##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##########
@@ -1171,67 +1145,127 @@ public void processElement(ProcessContext c) {
    * occur, Therefore this DoFn has to be tested in isolation.
    */
   @VisibleForTesting
-  static class GatherBundleAndSortFn extends DoFn<MutationGroup, 
Iterable<KV<byte[], byte[]>>> {
-    private final long maxBatchSizeBytes;
-    private final long maxNumMutations;
-    private final long maxNumRows;
-
-    // total size of the current batch.
-    private long batchSizeBytes;
-    // total number of mutated cells.
-    private long batchCells;
-    // total number of rows mutated.
-    private long batchRows;
+  static class GatherSortCreateBatchesFn extends DoFn<MutationGroup, 
Iterable<MutationGroup>> {
 
+    private final long maxBatchSizeBytes;
+    private final long maxBatchNumMutations;
+    private final long maxBatchNumRows;
+    private final long maxSortableSizeBytes;
+    private final long maxSortableNumMutations;
+    private final long maxSortableNumRows;
     private final PCollectionView<SpannerSchema> schemaView;
+    private final ArrayList<MutationGroupContainer> mutationsToSort = new 
ArrayList<>();
 
-    private transient ArrayList<KV<byte[], byte[]>> mutationsToSort = null;
+    // total size of MutationGroups in mutationsToSort.
+    private long sortableSizeBytes;
+    // total number of mutated cells in mutationsToSort
+    private long sortableNumCells;
+    // total number of rows mutated in mutationsToSort
+    private long sortableNumRows;
 
-    GatherBundleAndSortFn(
+    GatherSortCreateBatchesFn(
         long maxBatchSizeBytes,
         long maxNumMutations,
         long maxNumRows,
         long groupingFactor,
         PCollectionView<SpannerSchema> schemaView) {
-      this.maxBatchSizeBytes = maxBatchSizeBytes * groupingFactor;
-      this.maxNumMutations = maxNumMutations * groupingFactor;
-      this.maxNumRows = maxNumRows * groupingFactor;
+      this.maxBatchSizeBytes = maxBatchSizeBytes;
+      this.maxBatchNumMutations = maxNumMutations;
+      this.maxBatchNumRows = maxNumRows;
+
+      if (groupingFactor <= 0) {
+        groupingFactor = 1;
+      }
+
+      this.maxSortableSizeBytes = maxBatchSizeBytes * groupingFactor;
+      this.maxSortableNumMutations = maxNumMutations * groupingFactor;
+      this.maxSortableNumRows = maxNumRows * groupingFactor;
       this.schemaView = schemaView;
     }
 
     @StartBundle
     public synchronized void startBundle() throws Exception {
-      if (mutationsToSort == null) {
-        initSorter();
-      } else {
-        throw new IllegalStateException("Sorter should be null here");
-      }
+      initSorter();
     }
 
-    private void initSorter() {
-      mutationsToSort = new ArrayList<KV<byte[], byte[]>>((int) 
maxNumMutations);
-      batchSizeBytes = 0;
-      batchCells = 0;
-      batchRows = 0;
+    private synchronized void initSorter() {

Review comment:
       > Do we need to mark this as synchronized. Looks like all the callers 
are synchronized themselves.
   
   Probably not, but it does not harm.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 434673)
    Time Spent: 2h 10m  (was: 2h)

> SpannerIO: Reduce memory usage - especially when streaming
> ----------------------------------------------------------
>
>                 Key: BEAM-9822
>                 URL: https://issues.apache.org/jira/browse/BEAM-9822
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.20.0, 2.21.0
>            Reporter: Niel Markwick
>            Assignee: Niel Markwick
>            Priority: P2
>              Labels: google-cloud-spanner
>             Fix For: 2.22.0
>
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> SpannerIO uses a lot of memory. 
> In Streaming Dataflow, it uses many times as much (because dataflow creates 
> many worker threads)
> Lower the memory use, and change default parameters during streaming to use 
> smaller batches and disable grouping.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to