scwhittle commented on code in PR #36935:
URL: https://github.com/apache/beam/pull/36935#discussion_r2571578547


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+/**
+ * A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with 
unsplittable
+ * restrictions.
+ *
+ * <p>A restriction is considered unsplittable when restrictions of an element 
must not be processed
+ * simultaneously (e.g., Kafka topic partition).
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class UnsplittableRestrictionTracker<RestrictionT, PositionT>
+    extends RestrictionTracker<RestrictionT, PositionT> implements 
RestrictionTracker.HasProgress {
+  private final RestrictionTracker<RestrictionT, PositionT> tracker;
+
+  public UnsplittableRestrictionTracker(RestrictionTracker<RestrictionT, 
PositionT> tracker) {
+    this.tracker = tracker;
+  }
+
+  @Override
+  public boolean tryClaim(PositionT position) {
+    return tracker.tryClaim(position);
+  }
+
+  @Override
+  public RestrictionT currentRestriction() {
+    return tracker.currentRestriction();
+  }
+
+  @Override
+  public SplitResult<RestrictionT> trySplit(double fractionOfRemainder) {
+    return fractionOfRemainder < 1.0 ? null : 
tracker.trySplit(fractionOfRemainder);

Review Comment:
   it seems like 0 should also be handled specially, from base comment:
   ```
      * @return a {@link SplitResult} if a split was possible, otherwise 
returns {@code null}. If the
      *     {@code fractionOfRemainder == 0}, a {@code null} result MUST imply 
that the restriction
      *     tracker is done and there is no more work left to do.
   ```
   
   0 also seems ok to delegate since we shouldn't be doing parallel processing 
in that case either



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -108,6 +109,12 @@
  *
  * <h4>Splitting</h4>
  *
+ * <p>Consumer groups must not consume from the same {@link TopicPartition} 
simultaneously. Doing so
+ * may arbitrarily overwrite a consumer group's committed offset for a {@link 
TopicPartition}.

Review Comment:
   this committed offset is mostly for external monitoring correct? Beam is 
maintaining it's own offsets for reading for correctness reasons and I think 
that it could handle reading the same partition in parallel if it wasn't 
sharing a consumer.
   
   If the mode of committing the offset in bundle finalizations, the offset 
being committed could be out-of-order or stale anyway.
   
   So I think mainly we are preventing the splitting because the current 
caching strategy doesn't work well with it.  I'm also not sure if it would 
actually improve throughput to read from the same partition from different 
threads but if we are reading fixed non-overlapping ranges it seems like it 
possibly could.
   
   



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+/**
+ * A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with 
unsplittable
+ * restrictions.
+ *
+ * <p>A restriction is considered unsplittable when restrictions of an element 
must not be processed
+ * simultaneously (e.g., Kafka topic partition).
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class UnsplittableRestrictionTracker<RestrictionT, PositionT>
+    extends RestrictionTracker<RestrictionT, PositionT> implements 
RestrictionTracker.HasProgress {
+  private final RestrictionTracker<RestrictionT, PositionT> tracker;
+
+  public UnsplittableRestrictionTracker(RestrictionTracker<RestrictionT, 
PositionT> tracker) {
+    this.tracker = tracker;
+  }
+
+  @Override
+  public boolean tryClaim(PositionT position) {
+    return tracker.tryClaim(position);
+  }
+
+  @Override
+  public RestrictionT currentRestriction() {
+    return tracker.currentRestriction();
+  }
+
+  @Override
+  public SplitResult<RestrictionT> trySplit(double fractionOfRemainder) {

Review Comment:
   base method is @Nullable, if you mark this nullable here, can you remove the 
suppression above? If for some reason it still doesn't work can the suppression 
be limited to this function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to