lukecwik commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r447133216



##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
##########
@@ -113,6 +113,8 @@
   // SplittableParDoComponents
   public static final String SPLITTABLE_PAIR_WITH_RESTRICTION_URN =
       "beam:transform:sdf_pair_with_restriction:v1";
+  public static final String SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN =
+      "beam:transform:sdf_truncate_sized_restrictions:v1";

Review comment:
       please add a checkState in static block like the others below

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
##########
@@ -306,6 +308,32 @@ public static void 
invokeSplitRestriction(DoFnInvoker.ArgumentProvider argumentP
     }
   }
 
+  /**
+   * Default implementation of {@link DoFn.TruncateSizedRestriction}, for 
delegation by bytebuddy.
+   */
+  public static class DefaultTruncateSizedRestriction {
+    private final boolean isBoundedPerElement;
+
+    DefaultTruncateSizedRestriction(DoFnSignature doFnSignature) {
+      this.isBoundedPerElement = 
doFnSignature.isBoundedPerElement().equals(IsBounded.BOUNDED);
+    }
+    /** Return the current restriction if it's bounded.Otherwise, return null. 
*/
+    @SuppressWarnings("unused")
+    public void invokeTruncateSizedRestriction(DoFnInvoker.ArgumentProvider 
argumentProvider) {
+      boolean isBounded;
+      try {
+        isBounded = argumentProvider.restrictionTracker().isBounded();
+      } catch (NotImplementedException expectedException) {
+        isBounded = this.isBoundedPerElement;
+      }
+      if (isBounded) {
+        
argumentProvider.outputReceiver(null).output(argumentProvider.restriction());
+      } else {
+        argumentProvider.outputReceiver(null).output(null);
+      }
+    }
+  }

Review comment:
       ```suggestion
     /**
      * Default implementation of {@link DoFn.TruncateRestriction}, for 
delegation by bytebuddy.
      */
     public static class DefaultTruncateRestriction {
       private final boolean isBoundedPerElement;
   
       DefaultTruncateRestriction(DoFnSignature doFnSignature) {
         this.isBoundedPerElement = 
doFnSignature.isBoundedPerElement().equals(IsBounded.BOUNDED);
       }
       /** Return the current restriction if it's bounded.Otherwise, return 
null. */
       @SuppressWarnings("unused")
       public void invokeTruncateRestriction(DoFnInvoker.ArgumentProvider 
argumentProvider) {
         boolean isBounded = this.isBoundedPerElement;
         try {
           isBounded = argumentProvider.restrictionTracker().isBounded();
         } catch (NotImplementedException expectedException) {
         }
         if (isBounded) {
           
argumentProvider.outputReceiver(null).output(argumentProvider.restriction());
         }
       }
     }
   ```

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
##########
@@ -140,4 +140,9 @@ public Progress getProgress() {
         totalWork.subtract(workRemaining, 
MathContext.DECIMAL128).doubleValue(),
         workRemaining.doubleValue());
   }
+
+  @Override
+  public boolean isBounded() {
+    return false;

Review comment:
       Shouldn't this be true if `range.getTo() != Long.MAX_VALUE`?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -574,6 +578,99 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
               };
         }
         break;
+      case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+        if ((doFnSignature.truncateSizedRestriction() != null
+                && doFnSignature.truncateSizedRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && 
doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = 
this::processElementForWindowObservingTruncateSizedRestriction;
+          // OutputT == RestrictionT
+          this.processContext =
+              new WindowObservingProcessBundleContext() {
+                @Override
+                public void outputWithTimestamp(OutputT output, Instant 
timestamp) {
+                  double size =
+                      doFnInvoker.invokeGetSize(
+                          new DelegatingArgumentProvider<InputT, OutputT>(
+                              this,
+                              
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                                  + "/GetSize") {

Review comment:
       ```suggestion
                                     + "/TruncateRestriction") {
   ```

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1428,6 +1428,36 @@ def process(self, element_restriction, *args, **kwargs):
   return _create_sdf_operation(SplitAndSizeRestrictions, *args)
 
 
+@BeamTransformFactory.register_urn(
+    common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn,
+    beam_runner_api_pb2.ParDoPayload)
+def create_truncate_sized_restriction(*args):
+  class TruncateAndSizeRestriction(beam.DoFn):
+    def __init__(self, fn, restriction_provider, watermark_estimator_provider):
+      self.restriction_provider = restriction_provider
+      self.dofn_signature = common.DoFnSignature(fn)
+
+    def process(self, element_restrictin, *args, **kwargs):
+      ((element, (restriction, estimator_state)), _) = element_restrictin
+      try:
+        truncated_restriciton = self.restriction_provider.truncate(

Review comment:
       We should be returning the same as split_and_size allowing for 0 
restrictions to be returned.

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1428,6 +1428,36 @@ def process(self, element_restriction, *args, **kwargs):
   return _create_sdf_operation(SplitAndSizeRestrictions, *args)
 
 
+@BeamTransformFactory.register_urn(
+    common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn,
+    beam_runner_api_pb2.ParDoPayload)
+def create_truncate_sized_restriction(*args):
+  class TruncateAndSizeRestriction(beam.DoFn):
+    def __init__(self, fn, restriction_provider, watermark_estimator_provider):
+      self.restriction_provider = restriction_provider
+      self.dofn_signature = common.DoFnSignature(fn)
+
+    def process(self, element_restrictin, *args, **kwargs):
+      ((element, (restriction, estimator_state)), _) = element_restrictin
+      try:
+        truncated_restriciton = self.restriction_provider.truncate(

Review comment:
       truncated_restriciton -> truncated_restriction

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
##########
@@ -500,6 +505,19 @@ public void splitRestriction(
       return new UnboundedSourceAsSDFRestrictionTracker(restriction, 
pipelineOptions);
     }
 
+    @TruncateSizedRestriction
+    public void truncateSizedRestriction(
+        @Restriction UnboundedSourceRestriction<OutputT, CheckpointT> 
restriction,
+        OutputReceiver<UnboundedSourceRestriction<OutputT, CheckpointT>> 
receiver) {
+      try {
+        restriction.getCheckpoint().finalizeCheckpoint();
+      } catch (Exception e) {
+        LOG.warn("Failed to finalize CheckpointMark {}", 
restriction.getWatermark());
+      } finally {
+        receiver.output(null);
+      }

Review comment:
       I don't think we produce any outputs and finalization is handled already 
by the bundleFinalizer.afterBundleCommit callback in `@ProcessElement`
   ```suggestion
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1034,6 +1034,47 @@ public Duration getAllowedTimestampSkew() {
    *   <li>If one of its arguments is tagged with the {@link Timestamp} 
annotation, then it will be
    *       passed the timestamp of the current element being processed; the 
argument must be of type
    *       {@link Instant}.
+   *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then 
it will be passed the
+   *       window of the current element. When applied by {@link ParDo} the 
subtype of {@link
+   *       BoundedWindow} must match the type of windows on the input {@link 
PCollection}. If the
+   *       window is not accessed a runner may perform additional 
optimizations.
+   *   <li>If one of its arguments is of type {@link PaneInfo}, then it will 
be passed information
+   *       about the current triggering pane.
+   *   <li>If one of the parameters is of type {@link PipelineOptions}, then 
it will be passed the
+   *       options for the current pipeline.
+   * </ul>
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface SplitRestriction {}
+
+  /**
+   * Annotation for the method that truncates restriction of a <a

Review comment:
       We should add a comment to the `@ProcessElement` javadoc comment stating 
something like:
   `If may define a TruncateRestriction method to override the default 
implementation where the default is ...`
   

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1034,6 +1034,47 @@ public Duration getAllowedTimestampSkew() {
    *   <li>If one of its arguments is tagged with the {@link Timestamp} 
annotation, then it will be
    *       passed the timestamp of the current element being processed; the 
argument must be of type
    *       {@link Instant}.
+   *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then 
it will be passed the
+   *       window of the current element. When applied by {@link ParDo} the 
subtype of {@link
+   *       BoundedWindow} must match the type of windows on the input {@link 
PCollection}. If the
+   *       window is not accessed a runner may perform additional 
optimizations.
+   *   <li>If one of its arguments is of type {@link PaneInfo}, then it will 
be passed information
+   *       about the current triggering pane.
+   *   <li>If one of the parameters is of type {@link PipelineOptions}, then 
it will be passed the
+   *       options for the current pipeline.
+   * </ul>
+   */
+  @Documented
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  @Experimental(Kind.SPLITTABLE_DO_FN)
+  public @interface SplitRestriction {}
+
+  /**
+   * Annotation for the method that truncates restriction of a <a
+   * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn} 
into bounded one to be
+   * processed when pipeline starts to drain.
+   *
+   * <p>This method is used to perform truncating the restriction before 
actual processing.
+   *
+   * <p>Signature: {@code void truncateSizedRestriction(<arguments>);}
+   *
+   * <p>This method must satisfy the following constraints:
+   *
+   * <ul>
+   *   <li>If one of the arguments is of type {@link OutputReceiver}, then it 
will be passed an
+   *       output receiver for outputting the splits. All splits must be 
output through this
+   *       parameter.
+   *   <li>If one of its arguments is tagged with the {@link Element} 
annotation, then it will be
+   *       passed the current element being processed; the argument must be of 
type {@code InputT}.
+   *       Note that automatic conversion of {@link Row}s and {@link 
FieldAccess} parameters are
+   *       currently unsupported.
+   *   <li>If one of its arguments is tagged with the {@link Restriction} 
annotation, then it will
+   *       be passed the current restriction being processed; the argument 
must be of type {@code
+   *       RestrictionT}.
+   *   <li>If one of its arguments is tagged with the {@link Timestamp} 
annotation, then it will be
+   *       passed the timestamp of the current element being processed; the 
argument must be of type
+   *       {@link Instant}.

Review comment:
       ```suggestion
      * Annotation for the method that truncates the restriction of a <a
      * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link 
DoFn} into a bounded one to be
      * processed when pipeline starts to drain.
      *
      * <p>This method is used to perform truncation of the restriction while 
it is not actively being processed.
      *
      * <p>Signature: {@code void truncateRestriction(<arguments>);}
      *
      * <p>This method must satisfy the following constraints:
      *
      * <ul>
      *   <li>If one of the arguments is of type {@link OutputReceiver}, then 
it will be passed an
      *       output receiver for outputting the truncated restrictions. All 
truncated restrictions must be output through this
      *       parameter.
      *   <li>If one of its arguments is tagged with the {@link Element} 
annotation, then it will be
      *       passed the current element being processed; the argument must be 
of type {@code InputT}.
      *       Note that automatic conversion of {@link Row}s and {@link 
FieldAccess} parameters are
      *       currently unsupported.
      *   <li>If one of its arguments is tagged with the {@link Restriction} 
annotation, then it will
      *       be passed the current restriction being processed; the argument 
must be of type {@code
      *       RestrictionT}.
      *   <li>If one of its arguments is tagged with the {@link Timestamp} 
annotation, then it will be
      *       passed the timestamp of the current element being processed; the 
argument must be of type
      *       {@link Instant}.
   ```

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +100,10 @@
    */
   public abstract void checkDone() throws IllegalStateException;
 
+  public boolean isBounded() throws NotImplementedException {

Review comment:
       We should use the return type `IsBounded`.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1051,7 +1092,7 @@ public Duration getAllowedTimestampSkew() {
   @Retention(RetentionPolicy.RUNTIME)
   @Target(ElementType.METHOD)
   @Experimental(Kind.SPLITTABLE_DO_FN)
-  public @interface SplitRestriction {}
+  public @interface TruncateSizedRestriction {}

Review comment:
       The DoFn doesn't need to know whether it is sized or not. How about 
`TruncateRestriction`?
   

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -574,6 +578,99 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
               };
         }
         break;
+      case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+        if ((doFnSignature.truncateSizedRestriction() != null
+                && doFnSignature.truncateSizedRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && 
doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = 
this::processElementForWindowObservingTruncateSizedRestriction;
+          // OutputT == RestrictionT
+          this.processContext =
+              new WindowObservingProcessBundleContext() {
+                @Override
+                public void outputWithTimestamp(OutputT output, Instant 
timestamp) {
+                  double size =
+                      doFnInvoker.invokeGetSize(

Review comment:
       invokeGetSize?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -574,6 +578,99 @@ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
               };
         }
         break;
+      case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+        if ((doFnSignature.truncateSizedRestriction() != null
+                && doFnSignature.truncateSizedRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && 
doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer = 
this::processElementForWindowObservingTruncateSizedRestriction;
+          // OutputT == RestrictionT
+          this.processContext =

Review comment:
       Since this shares the same ProcessBundleContext as GetSize (except for 
the error context string), can we make this a class called 
SizedRestrictionWindowObservingProcessBundleContext that extends 
WindowObservingProcessBundleContext and is used in both places.
   
   Ditto for the non window observing one as well.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +100,10 @@
    */
   public abstract void checkDone() throws IllegalStateException;
 
+  public boolean isBounded() throws NotImplementedException {
+    throw new NotImplementedException("isBounded is not implemented.");

Review comment:
       If we want to make this optional, we should follow the `HasProgress` 
style and use an interface like `HasBoundedness`
   
   We could also make this a required method and update the changelog stating 
that this is a breaking change. This would remove the "default" inference on 
the `@UnboundedPerElement`/`@BoundedPerElement` annotations on the DoFn.
   
   Depending on the default ordering we want, we'll want to update update the 
restriction trackers in PeriodicSequence and Watch as well (note that Watch 
will have to be updated regardless since it has a tracker that converts 
unbounded restrictions to bounded ones).
   

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +100,10 @@
    */
   public abstract void checkDone() throws IllegalStateException;
 
+  public boolean isBounded() throws NotImplementedException {
+    throw new NotImplementedException("isBounded is not implemented.");

Review comment:
       We should add a comment stating the unbounded restriction trackers can 
only be used with `@UnboundedPerElement` SDFs. We should add a comment on each 
restriction tracker whether it could be unbounded.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -608,17 +705,6 @@ public void accept(WindowedValue input) throws Exception {
     pCollectionConsumerRegistry.register(
         pTransform.getInputsOrThrow(mainInput), pTransformId, (FnDataReceiver) 
mainInputConsumer);
 
-    switch (pTransform.getSpec().getUrn()) {

Review comment:
       Thanks

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -858,6 +1008,10 @@ public static WindowedSplitResult forRoots(
   private void processElementForWindowObservingSizedElementAndRestriction(
       WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> elem) {
     currentElement = elem.withValue(elem.getValue().getKey().getKey());
+    // No need to process if current restriction is null.
+    if (elem.getValue().getKey().getValue().getKey() == null) {

Review comment:
       I don't think this is the right way to go. The output received should 
produce 0 outputs instead of null.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -829,6 +917,68 @@ private void 
processElementForWindowObservingSplitRestriction(
     this.stateAccessor.finalizeState();
   }
 
+  private void processElementForTruncateSizedRestriction(
+      WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> elem) {
+    currentElement = elem.withValue(elem.getValue().getKey().getKey());
+    currentRestriction = elem.getValue().getKey().getValue().getKey();
+    currentWatermarkEstimatorState = 
elem.getValue().getKey().getValue().getValue();
+    currentTracker =

Review comment:
       It looks like currentTracker isn't being set within both 
`processElementForSplitRestriction` methods. Could you fix that in this PR?

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1428,6 +1428,36 @@ def process(self, element_restriction, *args, **kwargs):
   return _create_sdf_operation(SplitAndSizeRestrictions, *args)
 
 
+@BeamTransformFactory.register_urn(
+    common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn,
+    beam_runner_api_pb2.ParDoPayload)
+def create_truncate_sized_restriction(*args):
+  class TruncateAndSizeRestriction(beam.DoFn):
+    def __init__(self, fn, restriction_provider, watermark_estimator_provider):
+      self.restriction_provider = restriction_provider
+      self.dofn_signature = common.DoFnSignature(fn)
+
+    def process(self, element_restrictin, *args, **kwargs):

Review comment:
       element_restrictin -> element_restriction

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -320,6 +320,22 @@ def split_and_size(self, element, restriction):
     for part in self.split(element, restriction):
       yield part, self.restriction_size(element, part)
 
+  def truncate(self, element, restriction):
+    """Truncate the given restriction into finite amount of work when the
+    pipeline starts to drain.
+
+    By default, if the restriction is bounded, it will return the entire 
current
+    restriction. If the restriction is unbounded, it will return None.

Review comment:
       ```suggestion
       By default, if the restriction is bounded, it will return the entire 
       restriction. If the restriction is unbounded, it will not return 
anything.
   ```

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -320,6 +320,22 @@ def split_and_size(self, element, restriction):
     for part in self.split(element, restriction):
       yield part, self.restriction_size(element, part)
 
+  def truncate(self, element, restriction):
+    """Truncate the given restriction into finite amount of work when the
+    pipeline starts to drain.
+
+    By default, if the restriction is bounded, it will return the entire 
current
+    restriction. If the restriction is unbounded, it will return None.
+
+    The method throws NotImplementError when RestrictionTracker.is_bounded() is
+    not implemented.
+
+    It's recommended to implement this API if more granularity is required.
+    """
+    restriction_tracker = self.create_tracker(restriction)
+    if restriction_tracker.is_bounded():
+      return restriction

Review comment:
       ```suggestion
         yield restriction
   ```

##########
File path: sdks/python/apache_beam/io/iobase.py
##########
@@ -1243,6 +1243,17 @@ def try_claim(self, position):
     """
     raise NotImplementedError
 
+  def is_bounded(self):
+    """Identify whether the output produced by the current restriction is
+    bounded.
+
+    The value is important for the default behavior of truncate when the
+    pipeline starts to drain in streaming. If the current restriction is
+    bounded, it will be processed completely by default. If the restriction is
+    unbounded, it will be truncated into null and finish processing 
immediately.
+    """
+    raise NotImplementedError

Review comment:
       ThreadsafeRestrictionTracker should call the delegate so that users of 
it get the same behavior as if they were calling the real restriction tracker
   
   We might want to update the RestrictionTrackerView if we want to expose the 
boundedness property during execution.

##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -968,6 +973,9 @@ def process_with_sized_restriction(self, windowed_value):
     (element, (restriction, estimator_state)), _ = windowed_value.value
     watermark_estimator = (
         self.do_fn_invoker.invoke_create_watermark_estimator(estimator_state))
+    if restriction is None:

Review comment:
       Same issue as in java, we should allow None restrictions and instead 
produce 0 outputs.

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -383,6 +383,12 @@ message StandardPTransforms {
     //
     // Input: KV(KV(element, restriction), size); output: DoFn's output.
     PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS = 2 [(beam_urn) = 
"beam:transform:sdf_process_sized_element_and_restrictions:v1"];
+
+    // Truncates the restriction of each element/restriction pair and returns 
the
+    // finite restriction which is going to be processed when drain starts.
+    // Input: KV(KV(element, restriction), size);
+    // Output: KV(KV(element, restriction), size).

Review comment:
       Add a link to the drain design doc that Reuven shared with the Beam 
community.
   
   ```suggestion
       // Truncates the restriction of each element/restriction pair and 
returns the
       // finite restriction which will be processed when a pipeline is drained 
<link>.
       //
       // Input: KV(KV(element, restriction), size);
       // Output: KV(KV(element, restriction), size).
   ```

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -320,6 +320,22 @@ def split_and_size(self, element, restriction):
     for part in self.split(element, restriction):
       yield part, self.restriction_size(element, part)
 
+  def truncate(self, element, restriction):

Review comment:
       Should we also provide truncate_and_size?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to