This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0943d85  [BEAM-14129] Restructure PubsubLiteIO Read side to produce 
smaller bundles (#17125)
0943d85 is described below

commit 0943d85113e77505f2a1291d7b256b9a4b1e5dec
Author: dpcollins-google <40498610+dpcollins-goo...@users.noreply.github.com>
AuthorDate: Wed Mar 23 11:48:53 2022 -0400

    [BEAM-14129] Restructure PubsubLiteIO Read side to produce smaller bundles 
(#17125)
    
    * [BEAM-14129] Restructure PubsubLiteIO Read side to produce smaller bundles
    
    * [BEAM-14129] Restructure PubsubLiteIO Read side to produce smaller bundles
    
    * fixes
    
    * fixes
    
    * fixes
    
    * fixes
    
    * fixes
    
    * fixes
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   8 +-
 .../internal/MemoryBufferedSubscriber.java         |  55 ++++++
 .../internal/MemoryBufferedSubscriberImpl.java     | 173 ++++++++++++++++++
 ...erverPublisherCache.java => MemoryLimiter.java} |  21 ++-
 .../gcp/pubsublite/internal/MemoryLimiterImpl.java |  92 ++++++++++
 .../internal/OffsetByteRangeTracker.java           | 104 ++---------
 .../internal/PerServerPublisherCache.java          |   7 +-
 ...herCache.java => PerServerSubscriberCache.java} |  14 +-
 .../internal/PerSubscriptionPartitionSdf.java      |  21 ++-
 .../io/gcp/pubsublite/internal/PubsubLiteSink.java |   4 +-
 .../{PublisherCache.java => ServiceCache.java}     |  54 +++---
 .../pubsublite/internal/SubscribeTransform.java    |  50 +++---
 .../pubsublite/internal/SubscriberAssembler.java   |   1 +
 .../SubscriptionPartitionProcessorImpl.java        | 180 +++++++------------
 .../internal/MemoryBufferedSubscriberImplTest.java | 200 +++++++++++++++++++++
 .../internal/OffsetByteRangeTrackerTest.java       |  37 +---
 .../SubscriptionPartitionProcessorImplTest.java    | 180 ++++++++-----------
 17 files changed, 786 insertions(+), 415 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index eb48d48..6b57085 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -573,11 +573,11 @@ class BeamModulePlugin implements Plugin<Project> {
         google_cloud_datastore_v1_proto_client      : 
"com.google.cloud.datastore:datastore-v1-proto-client:2.1.3",
         google_cloud_firestore                      : 
"com.google.cloud:google-cloud-firestore", // 
google_cloud_platform_libraries_bom sets version
         google_cloud_pubsub                         : 
"com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom 
sets version
-        google_cloud_pubsublite                     : 
"com.google.cloud:google-cloud-pubsublite", // 
google_cloud_platform_libraries_bom sets version
+        google_cloud_pubsublite                     : 
"com.google.cloud:google-cloud-pubsublite:1.5.0",  // TODO(dpcollins-google): 
Let google_cloud_platform_libraries_bom set version once high enough
         // The GCP Libraries BOM dashboard shows the versions set by the BOM:
         // 
https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/24.3.0/artifact_details.html
         // Update libraries-bom version on 
sdks/java/container/license_scripts/dep_urls_java.yaml
-        google_cloud_platform_libraries_bom         : 
"com.google.cloud:libraries-bom:24.4.0",
+        google_cloud_platform_libraries_bom         : 
"com.google.cloud:libraries-bom:24.4.0",  // TODO(updater): Remove the 
pubsublite pin to 1.5.0 if the BOM includes a higher version
         google_cloud_spanner                        : 
"com.google.cloud:google-cloud-spanner", // google_cloud_platform_libraries_bom 
sets version
         google_cloud_spanner_test                   : 
"com.google.cloud:google-cloud-spanner:$google_cloud_spanner_version:tests",
         google_code_gson                            : 
"com.google.code.gson:gson:$google_code_gson_version",
@@ -599,7 +599,7 @@ class BeamModulePlugin implements Plugin<Project> {
         grpc_core                                   : "io.grpc:grpc-core", // 
google_cloud_platform_libraries_bom sets version
         grpc_google_cloud_firestore_v1              : 
"com.google.api.grpc:grpc-google-cloud-firestore-v1", // 
google_cloud_platform_libraries_bom sets version
         grpc_google_cloud_pubsub_v1                 : 
"com.google.api.grpc:grpc-google-cloud-pubsub-v1", // 
google_cloud_platform_libraries_bom sets version
-        grpc_google_cloud_pubsublite_v1             : 
"com.google.api.grpc:grpc-google-cloud-pubsublite-v1", // 
google_cloud_platform_libraries_bom sets version
+        grpc_google_cloud_pubsublite_v1             : 
"com.google.api.grpc:grpc-google-cloud-pubsublite-v1:1.5.0",  // 
TODO(dpcollins-google): Let google_cloud_platform_libraries_bom set version 
once high enough
         grpc_google_common_protos                   : 
"com.google.api.grpc:grpc-google-common-protos", // 
google_cloud_platform_libraries_bom sets version
         grpc_grpclb                                 : "io.grpc:grpc-grpclb", 
// google_cloud_platform_libraries_bom sets version
         grpc_protobuf                               : "io.grpc:grpc-protobuf", 
// google_cloud_platform_libraries_bom sets version
@@ -662,7 +662,7 @@ class BeamModulePlugin implements Plugin<Project> {
         proto_google_cloud_datastore_v1             : 
"com.google.api.grpc:proto-google-cloud-datastore-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_cloud_firestore_v1             : 
"com.google.api.grpc:proto-google-cloud-firestore-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_cloud_pubsub_v1                : 
"com.google.api.grpc:proto-google-cloud-pubsub-v1", // 
google_cloud_platform_libraries_bom sets version
-        proto_google_cloud_pubsublite_v1            : 
"com.google.api.grpc:proto-google-cloud-pubsublite-v1", // 
google_cloud_platform_libraries_bom sets version
+        proto_google_cloud_pubsublite_v1            : 
"com.google.api.grpc:proto-google-cloud-pubsublite-v1:1.5.0", // 
TODO(dpcollins-google): Let google_cloud_platform_libraries_bom set version 
once high enough
         proto_google_cloud_spanner_v1               : 
"com.google.api.grpc:proto-google-cloud-spanner-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_cloud_spanner_admin_database_v1: 
"com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // 
google_cloud_platform_libraries_bom sets version
         proto_google_common_protos                  : 
"com.google.api.grpc:proto-google-common-protos", // 
google_cloud_platform_libraries_bom sets version
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber.java
new file mode 100644
index 0000000..27b6d83
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriber.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiService;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.util.Optional;
+
+interface MemoryBufferedSubscriber extends ApiService {
+
+  /**
+   * Get the current fetch offset of this subscriber. This offset will be less 
than or equal to all
+   * future messages returned by this object.
+   */
+  Offset fetchOffset();
+
+  /**
+   * Notify this subscriber that all messages that have been removed with 
`pop` should no longer be
+   * counted against the memory budget.
+   *
+   * <p>Acquire a new memory buffer and allow any bytes which are now 
available to be sent to this
+   * job.
+   */
+  void rebuffer() throws ApiException;
+
+  /** Return the head message from the buffer if it exists, or empty() 
otherwise. */
+  Optional<SequencedMessage> peek();
+
+  /** Remove the head message from the buffer. Throws if no messages exist in 
the buffer. */
+  void pop();
+
+  /**
+   * Return a future which will be satisfied when data is likely available or 
the subscriber has
+   * failed.
+   */
+  ApiFuture<Void> onData();
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImpl.java
new file mode 100644
index 0000000..d836797
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImpl.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.SettableApiFuture;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.internal.CheckedApiException;
+import com.google.cloud.pubsublite.internal.ProxyService;
+import com.google.cloud.pubsublite.internal.wire.Subscriber;
+import com.google.cloud.pubsublite.proto.FlowControlRequest;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryLimiter.Block;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class MemoryBufferedSubscriberImpl extends ProxyService implements 
MemoryBufferedSubscriber {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryBufferedSubscriberImpl.class);
+
+  private final Partition partition;
+  private final MemoryLimiter limiter;
+  private final Subscriber subscriber;
+  private long targetMemory;
+  private Offset fetchOffset;
+  private Block memBlock;
+
+  private long bytesOutstandingToServer = 0;
+  private long bytesOutstanding = 0;
+  private final Queue<SequencedMessage> messages = new ArrayDeque<>();
+  private SettableApiFuture<Void> newData = SettableApiFuture.create();
+  private boolean shutdown = false;
+
+  // onReceive will not be called inline as subscriber is not started.
+  // addServices is intended to be called from the constructor.
+  @SuppressWarnings({"methodref.receiver.bound.invalid", 
"method.invocation.invalid"})
+  public MemoryBufferedSubscriberImpl(
+      Partition partition,
+      Offset startOffset,
+      MemoryLimiter limiter,
+      Function<Consumer<List<SequencedMessage>>, Subscriber> 
subscriberFactory) {
+    this.partition = partition;
+    this.fetchOffset = startOffset;
+    this.limiter = limiter;
+    this.targetMemory = limiter.maxBlockSize();
+    this.subscriber = subscriberFactory.apply(this::onReceive);
+    addServices(this.subscriber);
+    memBlock = limiter.claim(targetMemory);
+  }
+
+  @Override
+  protected synchronized void start() throws CheckedApiException {
+    bytesOutstandingToServer += memBlock.claimed();
+    bytesOutstanding += memBlock.claimed();
+    subscriber.allowFlow(
+        FlowControlRequest.newBuilder()
+            .setAllowedBytes(memBlock.claimed())
+            .setAllowedMessages(Long.MAX_VALUE)
+            .build());
+  }
+
+  @Override
+  protected synchronized void stop() {
+    if (shutdown) {
+      return;
+    }
+    shutdown = true;
+    newData.set(null);
+    memBlock.close();
+  }
+
+  @Override
+  protected synchronized void handlePermanentError(CheckedApiException e) {
+    stop();
+  }
+
+  private synchronized void onReceive(List<SequencedMessage> batch) {
+    if (shutdown) {
+      return;
+    }
+    for (SequencedMessage message : batch) {
+      bytesOutstandingToServer -= message.getSizeBytes();
+    }
+    messages.addAll(batch);
+    newData.set(null);
+    newData = SettableApiFuture.create();
+  }
+
+  @Override
+  public synchronized Offset fetchOffset() {
+    return fetchOffset;
+  }
+
+  @Override
+  public synchronized void rebuffer() throws ApiException {
+    if (shutdown) {
+      return;
+    }
+    if (bytesOutstandingToServer < (targetMemory / 3)) {
+      // Server is delivering lots of data, increase the target so that it is 
not throttled.
+      targetMemory = Math.min(limiter.maxBlockSize(), targetMemory * 2);
+    } else if (bytesOutstandingToServer > (2 * targetMemory / 3)) {
+      // Server is delivering little data, decrease the target so that memory 
can be used for other
+      // users of the limiter.
+      targetMemory = Math.max(limiter.minBlockSize(), targetMemory / 2);
+    }
+    long claimTarget = Math.max(bytesOutstanding, targetMemory);
+    memBlock.close();
+    memBlock = limiter.claim(claimTarget);
+    long toAllow = Math.max(memBlock.claimed() - bytesOutstanding, 0);
+    if (toAllow > 0) {
+      bytesOutstanding += toAllow;
+      bytesOutstandingToServer += toAllow;
+      try {
+        
subscriber.allowFlow(FlowControlRequest.newBuilder().setAllowedBytes(toAllow).build());
+      } catch (CheckedApiException e) {
+        throw e.underlying;
+      }
+    } else {
+      LOG.debug(
+          "Not claiming memory: partition {} outstanding {} to server {} 
target {} claimed {} messages {}",
+          partition,
+          bytesOutstanding,
+          bytesOutstandingToServer,
+          targetMemory,
+          memBlock.claimed(),
+          messages.size());
+    }
+  }
+
+  @Override
+  public synchronized Optional<SequencedMessage> peek() {
+    return Optional.ofNullable(messages.peek());
+  }
+
+  @Override
+  public synchronized void pop() {
+    SequencedMessage message = messages.remove();
+    bytesOutstanding -= message.getSizeBytes();
+    fetchOffset = Offset.of(message.getCursor().getOffset() + 1);
+  }
+
+  @Override
+  public synchronized ApiFuture<Void> onData() {
+    if (shutdown || !messages.isEmpty()) {
+      return ApiFutures.immediateFuture(null);
+    }
+    return newData;
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerPublisherCache.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryLimiter.java
similarity index 65%
copy from 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerPublisherCache.java
copy to 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryLimiter.java
index 07953cb..02cee19 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerPublisherCache.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryLimiter.java
@@ -18,17 +18,20 @@
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
 /**
- * A shared cache per-worker instance of Pub/Sub Lite publishers.
- *
- * <p>Pub/Sub Lite publishers connect to all available partitions: it would be 
a pessimization for
- * all instances of the PubsubLiteSink to do this.
+ * A class which tracks blocks of memory which have been given out, and tries 
to limit total memory
+ * size.
  */
-final class PerServerPublisherCache {
-  private PerServerPublisherCache() {}
+interface MemoryLimiter {
+  Block claim(long toAcquire);
+
+  long minBlockSize();
+
+  long maxBlockSize();
 
-  static final PublisherCache PUBLISHER_CACHE = new PublisherCache();
+  interface Block extends AutoCloseable {
+    long claimed();
 
-  static {
-    Runtime.getRuntime().addShutdownHook(new Thread(PUBLISHER_CACHE::close));
+    @Override
+    void close();
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryLimiterImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryLimiterImpl.java
new file mode 100644
index 0000000..3e54cc5
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryLimiterImpl.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.concurrent.GuardedBy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class MemoryLimiterImpl implements MemoryLimiter {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MemoryLimiterImpl.class);
+  private final long minBlockSize;
+  private final long maxBlockSize;
+  private final long maxAvailable;
+
+  @GuardedBy("this")
+  private long available;
+
+  public MemoryLimiterImpl(long minBlockSize, long maxBlockSize, long 
maxAvailable) {
+    this.minBlockSize = minBlockSize;
+    this.maxBlockSize = maxBlockSize;
+    this.maxAvailable = maxAvailable;
+    this.available = maxAvailable;
+  }
+
+  @Override
+  public synchronized Block claim(long toAcquire) {
+    toAcquire = Math.max(Math.min(toAcquire, available / 2), minBlockSize);
+    available -= toAcquire;
+    return new Block(toAcquire);
+  }
+
+  @Override
+  public long minBlockSize() {
+    return minBlockSize;
+  }
+
+  @Override
+  public long maxBlockSize() {
+    return maxBlockSize;
+  }
+
+  private synchronized void release(long toRelease) {
+    available += toRelease;
+    checkState(available <= maxAvailable);
+  }
+
+  public class Block implements MemoryLimiter.Block {
+    public final long claimed;
+    private boolean released = false;
+
+    private Block(long claimed) {
+      this.claimed = claimed;
+    }
+
+    @Override
+    public long claimed() {
+      return claimed;
+    }
+
+    @Override
+    public void close() {
+      checkState(!released);
+      released = true;
+      release(claimed);
+    }
+
+    @Override
+    public void finalize() {
+      if (!released) {
+        LOG.error("Failed to release memory block- likely SDF implementation 
error.");
+        close();
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTracker.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTracker.java
index 44e6246..f4ac39d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTracker.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTracker.java
@@ -17,45 +17,28 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import com.google.cloud.pubsublite.Offset;
 import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
-import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
-import org.joda.time.Duration;
 
 /**
  * OffsetByteRangeTracker is an unbounded restriction tracker for Pub/Sub lite 
partitions that
  * tracks offsets for checkpointing and bytes for progress.
- *
- * <p>Any valid instance of an OffsetByteRangeTracker tracks one of exactly 
two types of ranges: -
- * Unbounded ranges whose last offset is Long.MAX_VALUE - Completed ranges 
that are either empty
- * (From == To) or fully claimed (lastClaimed == To - 1)
- *
- * <p>Also prevents splitting until minTrackingTime has passed or 
minBytesReceived have been
- * received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF 
read timeout when it
- * would return ProcessContinuation.resume().
  */
 class OffsetByteRangeTracker extends TrackerWithProgress {
   private final TopicBacklogReader unownedBacklogReader;
-  private final Duration minTrackingTime;
-  private final long minBytesReceived;
-  private final Stopwatch stopwatch;
-  private OffsetByteRange range;
+  private final OffsetRangeTracker rangeTracker;
+  private long bytes;
   private @Nullable Long lastClaimed;
 
-  public OffsetByteRangeTracker(
-      OffsetByteRange range,
-      TopicBacklogReader unownedBacklogReader,
-      Stopwatch stopwatch,
-      Duration minTrackingTime,
-      long minBytesReceived) {
+  public OffsetByteRangeTracker(OffsetByteRange range, TopicBacklogReader 
unownedBacklogReader) {
     checkArgument(
         range.getRange().getTo() == Long.MAX_VALUE,
         "May only construct OffsetByteRangeTracker with an unbounded range 
with no progress.");
@@ -63,10 +46,8 @@ class OffsetByteRangeTracker extends TrackerWithProgress {
         range.getByteCount() == 0L,
         "May only construct OffsetByteRangeTracker with an unbounded range 
with no progress.");
     this.unownedBacklogReader = unownedBacklogReader;
-    this.minTrackingTime = minTrackingTime;
-    this.minBytesReceived = minBytesReceived;
-    this.stopwatch = stopwatch.reset().start();
-    this.range = range;
+    this.rangeTracker = new OffsetRangeTracker(range.getRange());
+    this.bytes = range.getByteCount();
   }
 
   @Override
@@ -76,34 +57,17 @@ class OffsetByteRangeTracker extends TrackerWithProgress {
 
   @Override
   public boolean tryClaim(OffsetByteProgress position) {
-    long toClaim = position.lastOffset().value();
-    checkArgument(
-        lastClaimed == null || toClaim > lastClaimed,
-        "Trying to claim offset %s while last attempted was %s",
-        position.lastOffset().value(),
-        lastClaimed);
-    checkArgument(
-        toClaim >= range.getRange().getFrom(),
-        "Trying to claim offset %s before start of the range %s",
-        toClaim,
-        range);
-    // split() has already been called, truncating this range. No more offsets 
may be claimed.
-    if (range.getRange().getTo() != Long.MAX_VALUE) {
-      boolean isRangeEmpty = range.getRange().getTo() == 
range.getRange().getFrom();
-      boolean isValidClosedRange = nextOffset() == range.getRange().getTo();
-      checkState(
-          isRangeEmpty || isValidClosedRange,
-          "Violated class precondition: offset range improperly split. Please 
report a beam bug.");
+    if (!rangeTracker.tryClaim(position.lastOffset().value())) {
       return false;
     }
-    lastClaimed = toClaim;
-    range = OffsetByteRange.of(range.getRange(), range.getByteCount() + 
position.batchBytes());
+    lastClaimed = position.lastOffset().value();
+    bytes += position.batchBytes();
     return true;
   }
 
   @Override
   public OffsetByteRange currentRestriction() {
-    return range;
+    return OffsetByteRange.of(rangeTracker.currentRestriction(), bytes);
   }
 
   private long nextOffset() {
@@ -111,62 +75,30 @@ class OffsetByteRangeTracker extends TrackerWithProgress {
     return lastClaimed == null ? currentRestriction().getRange().getFrom() : 
lastClaimed + 1;
   }
 
-  /**
-   * Whether the tracker has received enough data/been running for enough time 
that it can
-   * checkpoint and be confident it can get sufficient throughput.
-   */
-  private boolean receivedEnough() {
-    Duration duration = 
Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS));
-    if (duration.isLongerThan(minTrackingTime)) {
-      return true;
-    }
-    if (currentRestriction().getByteCount() >= minBytesReceived) {
-      return true;
-    }
-    return false;
-  }
-
   @Override
   public @Nullable SplitResult<OffsetByteRange> trySplit(double 
fractionOfRemainder) {
     // Cannot split a bounded range. This should already be completely claimed.
-    if (range.getRange().getTo() != Long.MAX_VALUE) {
+    if (rangeTracker.currentRestriction().getTo() != Long.MAX_VALUE) {
       return null;
     }
-    if (!receivedEnough()) {
+    @Nullable SplitResult<OffsetRange> ranges = 
rangeTracker.trySplit(fractionOfRemainder);
+    if (ranges == null) {
       return null;
     }
-    range =
-        OffsetByteRange.of(
-            new OffsetRange(currentRestriction().getRange().getFrom(), 
nextOffset()),
-            range.getByteCount());
+    
checkArgument(rangeTracker.currentRestriction().equals(ranges.getPrimary()));
     return SplitResult.of(
-        this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), 
Long.MAX_VALUE), 0));
+        currentRestriction(), 
OffsetByteRange.of(checkArgumentNotNull(ranges.getResidual())));
   }
 
   @Override
-  @SuppressWarnings("unboxing.of.nullable")
   public void checkDone() throws IllegalStateException {
-    if (range.getRange().getFrom() == range.getRange().getTo()) {
-      return;
-    }
-    checkState(
-        lastClaimed != null,
-        "Last attempted offset should not be null. No work was claimed in 
non-empty range %s.",
-        range);
-    long lastClaimedNotNull = checkNotNull(lastClaimed);
-    checkState(
-        lastClaimedNotNull >= range.getRange().getTo() - 1,
-        "Last attempted offset was %s in range %s, claiming work in [%s, %s) 
was not attempted",
-        lastClaimedNotNull,
-        range,
-        lastClaimedNotNull + 1,
-        range.getRange().getTo());
+    rangeTracker.checkDone();
   }
 
   @Override
   public Progress getProgress() {
     ComputeMessageStatsResponse stats =
         this.unownedBacklogReader.computeMessageStats(Offset.of(nextOffset()));
-    return Progress.from(range.getByteCount(), stats.getMessageBytes());
+    return Progress.from(bytes, stats.getMessageBytes());
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerPublisherCache.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerPublisherCache.java
index 07953cb..ee76169 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerPublisherCache.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerPublisherCache.java
@@ -17,6 +17,10 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
+import com.google.cloud.pubsublite.MessageMetadata;
+import com.google.cloud.pubsublite.internal.Publisher;
+import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
+
 /**
  * A shared cache per-worker instance of Pub/Sub Lite publishers.
  *
@@ -26,7 +30,8 @@ package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 final class PerServerPublisherCache {
   private PerServerPublisherCache() {}
 
-  static final PublisherCache PUBLISHER_CACHE = new PublisherCache();
+  static final ServiceCache<PublisherOptions, Publisher<MessageMetadata>> 
PUBLISHER_CACHE =
+      new ServiceCache<>();
 
   static {
     Runtime.getRuntime().addShutdownHook(new Thread(PUBLISHER_CACHE::close));
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerPublisherCache.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerSubscriberCache.java
similarity index 66%
copy from 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerPublisherCache.java
copy to 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerSubscriberCache.java
index 07953cb..130ecc8 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerPublisherCache.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerServerSubscriberCache.java
@@ -18,17 +18,17 @@
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
 /**
- * A shared cache per-worker instance of Pub/Sub Lite publishers.
+ * A shared cache per-worker instance of Pub/Sub Lite subscribers.
  *
- * <p>Pub/Sub Lite publishers connect to all available partitions: it would be 
a pessimization for
- * all instances of the PubsubLiteSink to do this.
+ * <p>This allows subscribers to buffer in the background while processElement 
is not running.
  */
-final class PerServerPublisherCache {
-  private PerServerPublisherCache() {}
+final class PerServerSubscriberCache {
+  private PerServerSubscriberCache() {}
 
-  static final PublisherCache PUBLISHER_CACHE = new PublisherCache();
+  static final ServiceCache<SubscriptionPartition, MemoryBufferedSubscriber> 
CACHE =
+      new ServiceCache<>();
 
   static {
-    Runtime.getRuntime().addShutdownHook(new Thread(PUBLISHER_CACHE::close));
+    Runtime.getRuntime().addShutdownHook(new Thread(CACHE::close));
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
index d387cf1..0996449 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PerSubscriptionPartitionSdf.java
@@ -28,8 +28,11 @@ import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class PerSubscriptionPartitionSdf extends DoFn<SubscriptionPartition, 
SequencedMessage> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PerSubscriptionPartitionSdf.class);
   private final Duration maxSleepTime;
   private final ManagedBacklogReaderFactory backlogReaderFactory;
   private final SubscriptionPartitionProcessorFactory processorFactory;
@@ -60,6 +63,13 @@ class PerSubscriptionPartitionSdf extends 
DoFn<SubscriptionPartition, SequencedM
     backlogReaderFactory.close();
   }
 
+  /**
+   * The initial watermark state is not allowed to return less than the 
element's input timestamp.
+   *
+   * <p>The polling logic for identifying new partitions will export all 
preexisting partitions with
+   * very old (EPOCH) initial watermarks, and any new partitions with a recent 
watermark likely to
+   * be before all messages that could exist on that partition given the 
polling delay.
+   */
   @GetInitialWatermarkEstimatorState
   public Instant getInitialWatermarkState(@Timestamp Instant elementTimestamp) 
{
     return elementTimestamp;
@@ -76,20 +86,25 @@ class PerSubscriptionPartitionSdf extends 
DoFn<SubscriptionPartition, SequencedM
       @Element SubscriptionPartition subscriptionPartition,
       OutputReceiver<SequencedMessage> receiver)
       throws Exception {
+    LOG.debug("Starting process for {} at {}", subscriptionPartition, 
Instant.now());
     SubscriptionPartitionProcessor processor =
         processorFactory.newProcessor(subscriptionPartition, tracker, 
receiver);
     ProcessContinuation result = processor.runFor(maxSleepTime);
+    LOG.debug("Starting commit for {} at {}", subscriptionPartition, 
Instant.now());
+    // TODO(dpcollins-google): Move commits to a bundle finalizer for drain 
correctness
     processor
         .lastClaimed()
         .ifPresent(
-            lastClaimedOffset -> {
-              Offset commitOffset = Offset.of(lastClaimedOffset.value() + 1);
+            lastClaimed -> {
               try {
-                
committerFactory.apply(subscriptionPartition).commitOffset(commitOffset);
+                committerFactory
+                    .apply(subscriptionPartition)
+                    .commitOffset(Offset.of(lastClaimed.value() + 1));
               } catch (Exception e) {
                 throw ExtractStatus.toCanonical(e).underlying;
               }
             });
+    LOG.debug("Finishing process for {} at {}", subscriptionPartition, 
Instant.now());
     return result;
   }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
index f370919..e23a66d 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java
@@ -53,7 +53,9 @@ public class PubsubLiteSink extends DoFn<PubSubMessage, Void> 
{
     private final Publisher<MessageMetadata> publisher;
 
     RunState(PublisherOptions options) {
-      publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
+      publisher =
+          PerServerPublisherCache.PUBLISHER_CACHE.get(
+              options, () -> new PublisherAssembler(options).newPublisher());
     }
 
     void publish(PubSubMessage message) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/ServiceCache.java
similarity index 55%
rename from 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java
rename to 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/ServiceCache.java
index ac85ba9..5e124c0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/ServiceCache.java
@@ -17,59 +17,63 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
+import com.google.api.core.ApiService;
 import com.google.api.core.ApiService.Listener;
 import com.google.api.core.ApiService.State;
 import com.google.api.gax.rpc.ApiException;
-import com.google.cloud.pubsublite.MessageMetadata;
-import com.google.cloud.pubsublite.internal.Publisher;
+import com.google.cloud.pubsublite.internal.wire.ApiServiceUtils;
 import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import java.util.HashMap;
-import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
+import java.util.function.Supplier;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** A map of working publishers by PublisherOptions. */
-class PublisherCache implements AutoCloseable {
-  private final Logger logger = LoggerFactory.getLogger(PublisherCache.class);
+/** A map of working ApiServices by identifying key. The key must be hashable. 
*/
+class ServiceCache<K, V extends ApiService> implements AutoCloseable {
+  private final Logger logger = LoggerFactory.getLogger(ServiceCache.class);
 
   @GuardedBy("this")
-  private final HashMap<PublisherOptions, Publisher<MessageMetadata>> 
livePublishers =
-      new HashMap<>();
+  private final HashMap<K, V> liveMap = new HashMap<>();
 
-  private synchronized void evict(PublisherOptions options) {
-    livePublishers.remove(options);
+  private synchronized void evict(K key, V service) {
+    liveMap.remove(key, service);
   }
 
-  synchronized Publisher<MessageMetadata> get(PublisherOptions options) throws 
ApiException {
-    Publisher<MessageMetadata> publisher = livePublishers.get(options);
-    if (publisher != null) {
-      return publisher;
+  synchronized V get(K key, Supplier<V> factory) throws ApiException {
+    V service = liveMap.get(key);
+    if (service != null) {
+      return service;
     }
-    publisher = new PublisherAssembler(options).newPublisher();
-    livePublishers.put(options, publisher);
-    publisher.addListener(
+    V newService = factory.get();
+    liveMap.put(key, newService);
+    newService.addListener(
         new Listener() {
           @Override
           public void failed(State s, Throwable t) {
-            logger.warn("Publisher failed.", t);
-            evict(options);
+            logger.warn(newService.getClass().getSimpleName() + " failed.", t);
+            evict(key, newService);
+          }
+
+          @Override
+          public void terminated(State from) {
+            evict(key, newService);
           }
         },
         SystemExecutors.getFuturesExecutor());
-    publisher.startAsync().awaitRunning();
-    return publisher;
+    newService.startAsync().awaitRunning();
+    return newService;
   }
 
   @VisibleForTesting
-  synchronized void set(PublisherOptions options, Publisher<MessageMetadata> 
toCache) {
-    livePublishers.put(options, toCache);
+  synchronized void set(K key, V service) {
+    liveMap.put(key, service);
   }
 
   @Override
   public synchronized void close() {
-    livePublishers.forEach((options, publisher) -> publisher.stopAsync());
-    livePublishers.clear();
+    ApiServiceUtils.blockingShutdown(liveMap.values());
+    liveMap.clear();
   }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
index 4b572b6..97c2899 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscribeTransform.java
@@ -30,6 +30,7 @@ import com.google.cloud.pubsublite.internal.wire.Subscriber;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import java.util.List;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
@@ -38,12 +39,16 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
 import org.joda.time.Duration;
 
 public class SubscribeTransform extends PTransform<PBegin, 
PCollection<SequencedMessage>> {
   private static final long MEBIBYTE = 1L << 20;
+  private static final long SOFT_MEMORY_LIMIT = 512 * MEBIBYTE;
+  private static final long MIN_PER_PARTITION_MEMORY = 4 * MEBIBYTE;
+  private static final long MAX_PER_PARTITION_MEMORY = 100 * MEBIBYTE;
+
+  private static final MemoryLimiter LIMITER =
+      new MemoryLimiterImpl(MIN_PER_PARTITION_MEMORY, 
MAX_PER_PARTITION_MEMORY, SOFT_MEMORY_LIMIT);
 
   private final SubscriberOptions options;
 
@@ -74,40 +79,37 @@ public class SubscribeTransform extends PTransform<PBegin, 
PCollection<Sequenced
   private SubscriptionPartitionProcessor newPartitionProcessor(
       SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
-      OutputReceiver<SequencedMessage> receiver)
-      throws ApiException {
-    checkSubscription(subscriptionPartition);
+      OutputReceiver<SequencedMessage> receiver) {
+    Supplier<MemoryBufferedSubscriber> newSubscriber =
+        () ->
+            newBufferedSubscriber(
+                subscriptionPartition,
+                Offset.of(tracker.currentRestriction().getRange().getFrom()));
     return new SubscriptionPartitionProcessorImpl(
+        subscriptionPartition,
         tracker,
         receiver,
-        consumer ->
-            newSubscriber(
-                subscriptionPartition.partition(),
-                Offset.of(tracker.currentRestriction().getRange().getFrom()),
-                consumer),
-        options.flowControlSettings());
+        () -> PerServerSubscriberCache.CACHE.get(subscriptionPartition, 
newSubscriber));
   }
 
-  private TopicBacklogReader newBacklogReader(SubscriptionPartition 
subscriptionPartition) {
+  private MemoryBufferedSubscriber newBufferedSubscriber(
+      SubscriptionPartition subscriptionPartition, Offset startOffset) throws 
ApiException {
     checkSubscription(subscriptionPartition);
-    return new SubscriberAssembler(options, 
subscriptionPartition.partition()).getBacklogReader();
+    return new MemoryBufferedSubscriberImpl(
+        subscriptionPartition.partition(),
+        startOffset,
+        LIMITER,
+        consumer -> newSubscriber(subscriptionPartition.partition(), 
startOffset, consumer));
   }
 
-  private long calculateMinWindowBytes() {
-    long minFromFlowControl =
-        
LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 
10);
-    // Dataflow will not accept outputs larger than 1 GiB. Cap the maximum at 
750 MiB to avoid this.
-    return Math.min(minFromFlowControl, 750 * MEBIBYTE);
+  private TopicBacklogReader newBacklogReader(SubscriptionPartition 
subscriptionPartition) {
+    checkSubscription(subscriptionPartition);
+    return new SubscriberAssembler(options, 
subscriptionPartition.partition()).getBacklogReader();
   }
 
   private TrackerWithProgress newRestrictionTracker(
       TopicBacklogReader backlogReader, OffsetByteRange initial) {
-    return new OffsetByteRangeTracker(
-        initial,
-        backlogReader,
-        Stopwatch.createUnstarted(),
-        options.minBundleTimeout(),
-        calculateMinWindowBytes());
+    return new OffsetByteRangeTracker(initial, backlogReader);
   }
 
   private InitialOffsetReader newInitialOffsetReader(SubscriptionPartition 
subscriptionPartition) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java
index 7825ab8..d65375e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriberAssembler.java
@@ -114,6 +114,7 @@ class SubscriberAssembler {
             .setMessageConsumer(consumer)
             .setSubscriptionPath(options.subscriptionPath())
             .setPartition(partition)
+            .setRetryStreamRaces(false)
             .setStreamFactory(
                 responseStream -> {
                   ApiCallContext context =
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
index 8b2eff9..818ccf7 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.java
@@ -17,161 +17,111 @@
  */
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
-import static 
com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
-import com.google.api.core.ApiService.Listener;
-import com.google.api.core.ApiService.State;
 import com.google.cloud.pubsublite.Offset;
-import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
-import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.ExtractStatus;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import javax.annotation.Nullable;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class SubscriptionPartitionProcessorImpl extends Listener
-    implements SubscriptionPartitionProcessor, AutoCloseable {
+class SubscriptionPartitionProcessorImpl implements 
SubscriptionPartitionProcessor {
   private static final Logger LOG =
       LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
+  private final SubscriptionPartition subscriptionPartition;
   private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> 
tracker;
   private final OutputReceiver<SequencedMessage> receiver;
-  private final Subscriber subscriber;
-  private final SettableFuture<Void> completionFuture = 
SettableFuture.create();
-  // Queue to transfer messages from subscriber callback to runFor downcall.
-  private final SynchronousQueue<List<SequencedMessage>> transfer = new 
SynchronousQueue<>();
-  private final FlowControlSettings flowControlSettings;
+  private final MemoryBufferedSubscriber subscriber;
   private Optional<Offset> lastClaimedOffset = Optional.empty();
 
-  @SuppressWarnings("methodref.receiver.bound.invalid")
+  // getReadySubscriber doesn't reference the subscriber member.
+  @SuppressWarnings("method.invocation.invalid")
   SubscriptionPartitionProcessorImpl(
+      SubscriptionPartition subscriptionPartition,
       RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker,
       OutputReceiver<SequencedMessage> receiver,
-      Function<Consumer<List<SequencedMessage>>, Subscriber> subscriberFactory,
-      FlowControlSettings flowControlSettings) {
+      Supplier<MemoryBufferedSubscriber> subscriberFactory) {
+    this.subscriptionPartition = subscriptionPartition;
     this.tracker = tracker;
     this.receiver = receiver;
-    this.subscriber = subscriberFactory.apply(this::onSubscriberMessages);
-    this.flowControlSettings = flowControlSettings;
-  }
-
-  @Override
-  public void failed(State from, Throwable failure) {
-    completionFuture.setException(ExtractStatus.toCanonical(failure));
-  }
-
-  private void onSubscriberMessages(List<SequencedMessage> messages) {
-    try {
-      while (!completionFuture.isDone()) {
-        if (transfer.offer(messages, 10, TimeUnit.MILLISECONDS)) {
-          return;
-        }
-      }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  @SuppressWarnings("argument.type.incompatible")
-  private void start() {
-    this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor());
-    this.subscriber.startAsync();
-    this.subscriber.awaitRunning();
-    try {
-      this.subscriber.allowFlow(
-          FlowControlRequest.newBuilder()
-              .setAllowedBytes(flowControlSettings.bytesOutstanding())
-              .setAllowedMessages(flowControlSettings.messagesOutstanding())
-              .build());
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-  }
-
-  private void handleMessages(List<SequencedMessage> messages) {
-    if (completionFuture.isDone()) {
-      return;
-    }
-    Offset lastOffset = 
Offset.of(Iterables.getLast(messages).getCursor().getOffset());
-    long byteSize = 
messages.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
-    if (tracker.tryClaim(OffsetByteProgress.of(lastOffset, byteSize))) {
-      lastClaimedOffset = Optional.of(lastOffset);
-      messages.forEach(
-          message ->
-              receiver.outputWithTimestamp(
-                  message, new 
Instant(Timestamps.toMillis(message.getPublishTime()))));
-      try {
-        subscriber.allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(byteSize)
-                .setAllowedMessages(messages.size())
-                .build());
-      } catch (CheckedApiException e) {
-        completionFuture.setException(e);
-      }
-    } else {
-      completionFuture.set(null);
-    }
+    this.subscriber = getReadySubscriber(subscriberFactory);
   }
 
   @Override
   @SuppressWarnings("argument.type.incompatible")
   public ProcessContinuation runFor(Duration duration) {
-    Instant deadline = Instant.now().plus(duration);
-    start();
-    try (SubscriptionPartitionProcessorImpl closeThis = this) {
-      while (!completionFuture.isDone() && deadline.isAfterNow()) {
-        @Nullable List<SequencedMessage> messages = transfer.poll(10, 
TimeUnit.MILLISECONDS);
-        if (messages != null) {
-          handleMessages(messages);
+    Instant maxReadTime = Instant.now().plus(duration);
+    while (subscriber.isRunning()) {
+      // Read any available data.
+      for (Optional<SequencedMessage> next = subscriber.peek();
+          next.isPresent();
+          next = subscriber.peek()) {
+        SequencedMessage message = next.get();
+        Offset messageOffset = Offset.of(message.getCursor().getOffset());
+        if (tracker.tryClaim(OffsetByteProgress.of(messageOffset, 
message.getSizeBytes()))) {
+          subscriber.pop();
+          lastClaimedOffset = Optional.of(messageOffset);
+          receiver.outputWithTimestamp(
+              message, new 
Instant(Timestamps.toMillis(message.getPublishTime())));
+        } else {
+          // Our claim failed, return stop()
+          return ProcessContinuation.stop();
         }
       }
-    } catch (Throwable t) {
-      throw ExtractStatus.toCanonical(t).underlying;
-    }
-    // Determine return code after shutdown.
-    if (completionFuture.isDone()) {
-      // Call get() to ensure there is no exception.
+      // Try waiting for new data.
       try {
-        completionFuture.get();
-      } catch (Throwable t) {
-        throw ExtractStatus.toCanonical(t).underlying;
+        Duration readTime = new Duration(Instant.now(), maxReadTime);
+        Future<Void> onData = subscriber.onData();
+        checkArgumentNotNull(onData);
+        onData.get(readTime.getMillis(), TimeUnit.MILLISECONDS);
+      } catch (TimeoutException e) {
+        // Read timed out without us being cut off, yield to the runtime.
+        return ProcessContinuation.resume();
+      } catch (InterruptedException | ExecutionException e2) {
+        // We should never be interrupted by beam, and onData should never 
return an error.
+        throw new RuntimeException(e2);
       }
-      // CompletionFuture set with null when tryClaim returned false.
-      return ProcessContinuation.stop();
     }
+    // Subscriber is no longer running, it has likely failed. Yield to the 
runtime to retry reading
+    // with a new subscriber.
     return ProcessContinuation.resume();
   }
 
   @Override
-  public void close() {
-    try {
-      blockingShutdown(subscriber);
-    } catch (Throwable t) {
-      // Don't propagate errors on subscriber shutdown.
-      LOG.info("Error on subscriber shutdown.", t);
-    }
-  }
-
-  @Override
   public Optional<Offset> lastClaimed() {
     return lastClaimedOffset;
   }
+
+  private MemoryBufferedSubscriber getReadySubscriber(
+      Supplier<MemoryBufferedSubscriber> getOrCreate) {
+    Offset startOffset = 
Offset.of(tracker.currentRestriction().getRange().getFrom());
+    while (true) {
+      MemoryBufferedSubscriber subscriber = getOrCreate.get();
+      Offset fetchOffset = subscriber.fetchOffset();
+      if (startOffset.equals(fetchOffset)) {
+        subscriber.rebuffer(); // TODO(dpcollins-google): Move this to a 
bundle finalizer
+        return subscriber;
+      }
+      LOG.info(
+          "Discarding subscriber due to mismatch, this should be rare. {}, 
start: {} fetch: {}",
+          subscriptionPartition,
+          startOffset,
+          fetchOffset);
+      try {
+        subscriber.stopAsync().awaitTerminated();
+      } catch (Exception ignored) {
+      }
+    }
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImplTest.java
new file mode 100644
index 0000000..31eae3c
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImplTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsublite.internal;
+
+import static 
com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.internal.testing.FakeApiService;
+import com.google.cloud.pubsublite.internal.wire.Subscriber;
+import com.google.cloud.pubsublite.proto.Cursor;
+import com.google.cloud.pubsublite.proto.SequencedMessage;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Spy;
+
+@RunWith(JUnit4.class)
+public class MemoryBufferedSubscriberImplTest {
+  private static final long MAX_MEMORY = 1024;
+
+  @Rule public Timeout globalTimeout = Timeout.seconds(30);
+
+  abstract static class FakeSubscriber extends FakeApiService implements 
Subscriber {}
+
+  @Spy FakeSubscriber subscriber;
+  @Mock Function<Consumer<List<SequencedMessage>>, Subscriber> 
subscriberFactory;
+  @Mock MemoryLimiter limiter;
+  @Mock MemoryLimiter.Block block;
+
+  MemoryBufferedSubscriber bufferedSubscriber;
+
+  Consumer<List<SequencedMessage>> consumer;
+
+  private static SequencedMessage messageWithSize(long size) {
+    return SequencedMessage.newBuilder().setSizeBytes(size).build();
+  }
+
+  @Before
+  public void setUp() {
+    initMocks(this);
+    doAnswer(
+            args -> {
+              consumer = args.getArgument(0);
+              return subscriber;
+            })
+        .when(subscriberFactory)
+        .apply(any());
+    doReturn(1L).when(limiter).minBlockSize();
+    doReturn(MAX_MEMORY).when(limiter).maxBlockSize();
+    checkNotNull(block);
+    checkNotNull(limiter);
+    doReturn(block).when(limiter).claim(anyLong());
+    doReturn(MAX_MEMORY).when(block).claimed();
+    bufferedSubscriber =
+        new MemoryBufferedSubscriberImpl(
+            example(Partition.class), example(Offset.class), limiter, 
subscriberFactory);
+    checkNotNull(consumer);
+    bufferedSubscriber.startAsync().awaitRunning();
+    verify(subscriber).startAsync();
+    assertTrue(subscriber.isRunning());
+  }
+
+  @Test
+  public void underlyingFailureFails() {
+    subscriber.fail(new RuntimeException("bad"));
+    assertThrows(Exception.class, subscriber::awaitTerminated);
+  }
+
+  @Test
+  public void rebufferReducesToOutstandingWhenLittleData() {
+    consumer.accept(ImmutableList.of(messageWithSize(MAX_MEMORY / 4)));
+    bufferedSubscriber.pop();
+    bufferedSubscriber.rebuffer();
+    verify(block).close();
+    verify(limiter).claim(3 * MAX_MEMORY / 4);
+  }
+
+  @Test
+  public void rebufferCannotGoBelowMin() {
+    long minBlock = MAX_MEMORY * 4 / 5;
+    doReturn(minBlock).when(limiter).minBlockSize();
+    for (int i = 0; i < 1000; ++i) {
+      // Rebuffer many times with no data to bring down the target value
+      bufferedSubscriber.rebuffer();
+    }
+    reset(limiter);
+    doReturn(minBlock).when(limiter).minBlockSize();
+    doReturn(block).when(limiter).claim(anyLong());
+    // Deliver enough data that 3 * minBlock / 4 is outstanding, buffer is 
allowed to and will
+    // shrink except it is limited by min block size.
+    consumer.accept(ImmutableList.of(messageWithSize(2 * MAX_MEMORY / 5)));
+    bufferedSubscriber.pop();
+    bufferedSubscriber.rebuffer();
+    verify(limiter).claim(minBlock);
+  }
+
+  @Test
+  public void rebufferStaysSameOnHalfDelivered() {
+    consumer.accept(ImmutableList.of(messageWithSize(MAX_MEMORY / 4)));
+    bufferedSubscriber.pop();
+    bufferedSubscriber.rebuffer();
+    verify(limiter).claim(3 * MAX_MEMORY / 4);
+    consumer.accept(ImmutableList.of(messageWithSize(3 * MAX_MEMORY / 8)));
+    bufferedSubscriber.pop();
+    bufferedSubscriber.rebuffer();
+    verify(limiter).claim(3 * MAX_MEMORY / 4);
+  }
+
+  @Test
+  public void rebufferGrowsOnMoreDelivered() {
+    consumer.accept(ImmutableList.of(messageWithSize(MAX_MEMORY / 4)));
+    bufferedSubscriber.pop();
+    bufferedSubscriber.rebuffer();
+    verify(limiter).claim(3 * MAX_MEMORY / 4);
+    consumer.accept(
+        ImmutableList.of(messageWithSize(MAX_MEMORY / 2), 
messageWithSize(MAX_MEMORY / 8)));
+    bufferedSubscriber.rebuffer();
+    verify(limiter, times(2)).claim(MAX_MEMORY); // once in setup
+  }
+
+  @Test
+  public void onDataSatisfiedOnShutdown() throws Exception {
+    Future<Void> onData = bufferedSubscriber.onData();
+    bufferedSubscriber.stopAsync();
+    onData.get();
+  }
+
+  @Test
+  public void onDataSatisfiedOnError() throws Exception {
+    Future<Void> onData = bufferedSubscriber.onData();
+    subscriber.fail(new RuntimeException("bad"));
+    onData.get();
+  }
+
+  @Test
+  public void onDataSatisfiedOnData() throws Exception {
+    SequencedMessage message1 =
+        SequencedMessage.newBuilder()
+            
.setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value() + 10))
+            .setSizeBytes(1)
+            .build();
+    SequencedMessage message2 =
+        SequencedMessage.newBuilder()
+            
.setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value() + 20))
+            .setSizeBytes(1)
+            .build();
+    Future<Void> onData = bufferedSubscriber.onData();
+    assertFalse(onData.isDone());
+    consumer.accept(ImmutableList.of(message1, message2));
+    onData.get();
+    assertTrue(bufferedSubscriber.onData().isDone()); // Still messages, 
onData is satisfied.
+    assertEquals(bufferedSubscriber.fetchOffset(), example(Offset.class));
+    assertEquals(bufferedSubscriber.peek().get(), message1);
+    bufferedSubscriber.pop();
+    assertEquals(bufferedSubscriber.fetchOffset(), 
Offset.of(message1.getCursor().getOffset() + 1));
+    assertTrue(bufferedSubscriber.onData().isDone()); // Still messages, 
onData is satisfied.
+    assertEquals(bufferedSubscriber.peek().get(), message2);
+    bufferedSubscriber.pop();
+    assertEquals(bufferedSubscriber.fetchOffset(), 
Offset.of(message2.getCursor().getOffset() + 1));
+    assertFalse(bufferedSubscriber.onData().isDone());
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTrackerTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTrackerTest.java
index f7d206d..79585ab 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTrackerTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/OffsetByteRangeTrackerTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -34,9 +35,7 @@ import 
com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
-import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -58,13 +57,7 @@ public class OffsetByteRangeTrackerTest {
   public void setUp() {
     initMocks(this);
     when(ticker.read()).thenReturn(0L);
-    tracker =
-        new OffsetByteRangeTracker(
-            OffsetByteRange.of(RANGE, 0),
-            unownedBacklogReader,
-            Stopwatch.createUnstarted(ticker),
-            Duration.millis(500),
-            MIN_BYTES);
+    tracker = new OffsetByteRangeTracker(OffsetByteRange.of(RANGE, 0), 
unownedBacklogReader);
   }
 
   @Test
@@ -136,31 +129,7 @@ public class OffsetByteRangeTrackerTest {
   @Test
   public void cannotClaimSplitRange() {
     assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), 
MIN_BYTES)));
-    assertTrue(tracker.trySplit(IGNORED_FRACTION) != null);
+    assertNotNull(tracker.trySplit(IGNORED_FRACTION));
     assertFalse(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_001), 
MIN_BYTES)));
   }
-
-  @Test
-  public void cannotSplitNotEnoughBytesOrTime() {
-    assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), 
MIN_BYTES - 2)));
-    assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_001), 1)));
-    when(ticker.read()).thenReturn(100_000_000L);
-    assertTrue(tracker.trySplit(IGNORED_FRACTION) == null);
-  }
-
-  @Test
-  public void canSplitTimeOnly() {
-    assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), 
MIN_BYTES - 2)));
-    assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_001), 1)));
-    when(ticker.read()).thenReturn(1_000_000_000L);
-    assertTrue(tracker.trySplit(IGNORED_FRACTION) != null);
-  }
-
-  @Test
-  public void canSplitBytesOnly() {
-    assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), 
MIN_BYTES - 2)));
-    assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_001), 2)));
-    when(ticker.read()).thenReturn(100_000_000L);
-    assertTrue(tracker.trySplit(IGNORED_FRACTION) != null);
-  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImplTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImplTest.java
index 5977560..679861b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImplTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImplTest.java
@@ -18,40 +18,37 @@
 package org.apache.beam.sdk.io.gcp.pubsublite.internal;
 
 import static 
com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
-import static 
org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions.DEFAULT_FLOW_CONTROL;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.initMocks;
 
+import com.google.api.core.ApiFutures;
 import com.google.api.core.SettableApiFuture;
 import com.google.api.gax.rpc.ApiException;
 import com.google.api.gax.rpc.StatusCode.Code;
 import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
 import com.google.cloud.pubsublite.internal.CheckedApiException;
 import com.google.cloud.pubsublite.internal.testing.FakeApiService;
-import com.google.cloud.pubsublite.internal.wire.Subscriber;
-import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
 import com.google.cloud.pubsublite.proto.Cursor;
-import com.google.cloud.pubsublite.proto.FlowControlRequest;
 import com.google.cloud.pubsublite.proto.SequencedMessage;
 import com.google.protobuf.util.Timestamps;
-import java.util.List;
-import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.Optional;
+import java.util.function.Supplier;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -63,24 +60,23 @@ import org.junit.runners.JUnit4;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Spy;
-import org.mockito.stubbing.Answer;
 
 @RunWith(JUnit4.class)
 @SuppressWarnings("initialization.fields.uninitialized")
 public class SubscriptionPartitionProcessorImplTest {
+  private static final SubscriptionPartition PARTITION =
+      SubscriptionPartition.of(example(SubscriptionPath.class), 
example(Partition.class));
+
   @Spy RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
   @Mock OutputReceiver<SequencedMessage> receiver;
-  @Mock Function<Consumer<List<SequencedMessage>>, Subscriber> 
subscriberFactory;
+  @Mock Supplier<MemoryBufferedSubscriber> subscriberFactory;
 
   @Rule public Timeout globalTimeout = Timeout.seconds(30);
 
-  abstract static class FakeSubscriber extends FakeApiService implements 
Subscriber {}
+  abstract static class FakeSubscriber extends FakeApiService implements 
MemoryBufferedSubscriber {}
 
   @Spy FakeSubscriber subscriber;
 
-  Consumer<List<SequencedMessage>> leakedConsumer;
-  SubscriptionPartitionProcessor processor;
-
   private static SequencedMessage messageWithOffset(long offset) {
     return SequencedMessage.newBuilder()
         .setCursor(Cursor.newBuilder().setOffset(offset))
@@ -96,134 +92,106 @@ public class SubscriptionPartitionProcessorImplTest {
   @Before
   public void setUp() {
     initMocks(this);
-    when(subscriberFactory.apply(any()))
-        .then(
-            args -> {
-              leakedConsumer = args.getArgument(0);
-              return subscriber;
-            });
-    processor =
-        new SubscriptionPartitionProcessorImpl(
-            tracker, receiver, subscriberFactory, DEFAULT_FLOW_CONTROL);
-    assertNotNull(leakedConsumer);
+    when(subscriberFactory.get()).thenReturn(subscriber);
+    when(tracker.currentRestriction()).thenReturn(initialRange());
+    doReturn(true).when(subscriber).isRunning();
+    doReturn(example(Offset.class)).when(subscriber).fetchOffset();
+    doReturn(SettableApiFuture.create()).when(subscriber).onData();
+  }
+
+  private SubscriptionPartitionProcessor newProcessor() {
+    return new SubscriptionPartitionProcessorImpl(PARTITION, tracker, 
receiver, subscriberFactory);
   }
 
   @Test
-  public void lifecycle() throws Exception {
-    when(tracker.currentRestriction()).thenReturn(initialRange());
+  public void lifecycle() {
+    SubscriptionPartitionProcessor processor = newProcessor();
     assertEquals(ProcessContinuation.resume(), 
processor.runFor(Duration.millis(10)));
-    InOrder order = inOrder(subscriber);
-    order.verify(subscriber).startAsync();
-    order.verify(subscriber).awaitRunning();
-    order
-        .verify(subscriber)
-        .allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedBytes(DEFAULT_FLOW_CONTROL.bytesOutstanding())
-                .setAllowedMessages(DEFAULT_FLOW_CONTROL.messagesOutstanding())
-                .build());
-    order.verify(subscriber).stopAsync();
-    order.verify(subscriber).awaitTerminated();
+    InOrder order = inOrder(subscriberFactory, subscriber);
+    order.verify(subscriberFactory).get();
+    order.verify(subscriber).fetchOffset();
+    order.verify(subscriber).rebuffer();
   }
 
   @Test
-  public void lifecycleFlowControlThrows() throws Exception {
-    when(tracker.currentRestriction()).thenReturn(initialRange());
-    doThrow(new 
CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());
-    assertThrows(ApiException.class, () -> processor.runFor(Duration.ZERO));
+  public void lifecycleOffsetMismatch() {
+    MemoryBufferedSubscriber badSubscriber = spy(FakeSubscriber.class);
+    doReturn(Offset.of(example(Offset.class).value() + 
1)).when(badSubscriber).fetchOffset();
+    doThrow(new 
RuntimeException("Ignored")).when(badSubscriber).awaitTerminated();
+    doReturn(badSubscriber, subscriber).when(subscriberFactory).get();
+    SubscriptionPartitionProcessor processor = newProcessor();
+    assertEquals(ProcessContinuation.resume(), 
processor.runFor(Duration.millis(10)));
+    InOrder order = inOrder(subscriberFactory, badSubscriber, subscriber);
+    order.verify(subscriberFactory).get();
+    order.verify(badSubscriber).fetchOffset();
+    order.verify(badSubscriber).stopAsync();
+    order.verify(badSubscriber).awaitTerminated();
+    order.verify(subscriberFactory).get();
+    order.verify(subscriber).fetchOffset();
+    order.verify(subscriber).rebuffer();
   }
 
   @Test
-  public void subscriberFailureFails() throws Exception {
-    when(tracker.currentRestriction()).thenReturn(initialRange());
-    doAnswer(
-            (Answer<Void>)
-                args -> {
-                  subscriber.fail(new CheckedApiException(Code.OUT_OF_RANGE));
-                  return null;
-                })
-        .when(subscriber)
-        .awaitRunning();
-    ApiException e =
-        assertThrows(
-            // Longer wait is needed due to listener asynchrony, but should 
never wait this long.
-            ApiException.class, () -> 
processor.runFor(Duration.standardMinutes(2)));
-    assertEquals(Code.OUT_OF_RANGE, e.getStatusCode().getCode());
+  public void lifecycleRebufferThrows() throws Exception {
+    doThrow(new 
CheckedApiException(Code.OUT_OF_RANGE).underlying).when(subscriber).rebuffer();
+    assertThrows(ApiException.class, this::newProcessor);
   }
 
   @Test
-  public void allowFlowFailureFails() throws Exception {
-    when(tracker.currentRestriction()).thenReturn(initialRange());
-    when(tracker.tryClaim(any())).thenReturn(true);
-    doThrow(new 
CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());
-    SystemExecutors.getFuturesExecutor()
-        .execute(() -> 
leakedConsumer.accept(ImmutableList.of(messageWithOffset(1))));
-    ApiException e =
-        assertThrows(ApiException.class, () -> 
processor.runFor(Duration.standardHours(10)));
-    assertEquals(Code.OUT_OF_RANGE, e.getStatusCode().getCode());
+  public void subscriberFailureReturnsResume() throws Exception {
+    SubscriptionPartitionProcessor processor = newProcessor();
+    doReturn(ApiFutures.immediateFuture(null)).when(subscriber).onData();
+    doReturn(false).when(subscriber).isRunning();
+    assertEquals(ProcessContinuation.resume(), 
processor.runFor(Duration.standardHours(1)));
   }
 
   @Test
   public void timeoutReturnsResume() {
+    SubscriptionPartitionProcessor processor = newProcessor();
     assertEquals(ProcessContinuation.resume(), 
processor.runFor(Duration.millis(10)));
     assertFalse(processor.lastClaimed().isPresent());
   }
 
   @Test
-  public void failedClaimCausesStop() throws Exception {
+  public void failedClaimCausesStop() {
+    SubscriptionPartitionProcessor processor = newProcessor();
+
     when(tracker.tryClaim(any())).thenReturn(false);
-    SettableApiFuture<Void> runDone = SettableApiFuture.create();
-    SystemExecutors.getFuturesExecutor()
-        .execute(
-            () -> {
-              assertEquals(
-                  ProcessContinuation.stop(), 
processor.runFor(Duration.standardHours(10)));
-              runDone.set(null);
-            });
-    leakedConsumer.accept(ImmutableList.of(messageWithOffset(1)));
-    runDone.get();
+    doReturn(ApiFutures.immediateFuture(null)).when(subscriber).onData();
+    doReturn(Optional.of(messageWithOffset(1))).when(subscriber).peek();
+
+    assertEquals(ProcessContinuation.stop(), 
processor.runFor(Duration.standardHours(10)));
 
     verify(tracker, times(1)).tryClaim(any());
+    verify(subscriber, times(0)).pop();
     assertFalse(processor.lastClaimed().isPresent());
-    // Future calls to process don't try to claim.
-    leakedConsumer.accept(ImmutableList.of(messageWithOffset(2)));
-    verify(tracker, times(1)).tryClaim(any());
   }
 
   @Test
-  public void successfulClaimThenTimeout() throws Exception {
-    when(tracker.tryClaim(any())).thenReturn(true);
-    SettableApiFuture<Void> runDone = SettableApiFuture.create();
-    SystemExecutors.getFuturesExecutor()
-        .execute(
-            () -> {
-              assertEquals(
-                  ProcessContinuation.resume(), 
processor.runFor(Duration.standardSeconds(3)));
-              runDone.set(null);
-            });
+  public void successfulClaimThenTimeout() {
+    doReturn(true).when(tracker).tryClaim(any());
+    doReturn(ApiFutures.immediateFuture(null), SettableApiFuture.create())
+        .when(subscriber)
+        .onData();
 
     SequencedMessage message1 = messageWithOffset(1);
     SequencedMessage message3 = messageWithOffset(3);
-    leakedConsumer.accept(ImmutableList.of(message1, message3));
-    runDone.get();
-    InOrder order = inOrder(tracker, receiver, subscriber);
-    order
-        .verify(tracker)
-        .tryClaim(
-            OffsetByteProgress.of(Offset.of(3), message1.getSizeBytes() + 
message3.getSizeBytes()));
+    doReturn(Optional.of(message1), Optional.of(message3), Optional.empty())
+        .when(subscriber)
+        .peek();
+
+    SubscriptionPartitionProcessor processor = newProcessor();
+    assertEquals(ProcessContinuation.resume(), 
processor.runFor(Duration.standardSeconds(3)));
+
+    InOrder order = inOrder(tracker, receiver);
+    order.verify(tracker).tryClaim(OffsetByteProgress.of(Offset.of(1), 
message1.getSizeBytes()));
     order
         .verify(receiver)
         .outputWithTimestamp(message1, new 
Instant(Timestamps.toMillis(message1.getPublishTime())));
+    order.verify(tracker).tryClaim(OffsetByteProgress.of(Offset.of(3), 
message3.getSizeBytes()));
     order
         .verify(receiver)
         .outputWithTimestamp(message3, new 
Instant(Timestamps.toMillis(message3.getPublishTime())));
-    order
-        .verify(subscriber)
-        .allowFlow(
-            FlowControlRequest.newBuilder()
-                .setAllowedMessages(2)
-                .setAllowedBytes(message1.getSizeBytes() + 
message3.getSizeBytes())
-                .build());
     assertEquals(processor.lastClaimed().get(), Offset.of(3));
   }
 }

Reply via email to