scwhittle commented on code in PR #36935: URL: https://github.com/apache/beam/pull/36935#discussion_r2571578547
########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.splittabledofn; + +/** + * A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with unsplittable + * restrictions. + * + * <p>A restriction is considered unsplittable when restrictions of an element must not be processed + * simultaneously (e.g., Kafka topic partition). + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class UnsplittableRestrictionTracker<RestrictionT, PositionT> + extends RestrictionTracker<RestrictionT, PositionT> implements RestrictionTracker.HasProgress { + private final RestrictionTracker<RestrictionT, PositionT> tracker; + + public UnsplittableRestrictionTracker(RestrictionTracker<RestrictionT, PositionT> tracker) { + this.tracker = tracker; + } + + @Override + public boolean tryClaim(PositionT position) { + return tracker.tryClaim(position); + } + + @Override + public RestrictionT currentRestriction() { + return tracker.currentRestriction(); + } + + @Override + public SplitResult<RestrictionT> trySplit(double fractionOfRemainder) { + return fractionOfRemainder < 1.0 ? null : tracker.trySplit(fractionOfRemainder); Review Comment: it seems like 0 should also be handled specially, from base comment: ``` * @return a {@link SplitResult} if a split was possible, otherwise returns {@code null}. If the * {@code fractionOfRemainder == 0}, a {@code null} result MUST imply that the restriction * tracker is done and there is no more work left to do. ``` 0 also seems ok to delegate since we shouldn't be doing parallel processing in that case either ########## sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java: ########## @@ -108,6 +109,12 @@ * * <h4>Splitting</h4> * + * <p>Consumer groups must not consume from the same {@link TopicPartition} simultaneously. Doing so + * may arbitrarily overwrite a consumer group's committed offset for a {@link TopicPartition}. Review Comment: this committed offset is mostly for external monitoring correct? Beam is maintaining it's own offsets for reading for correctness reasons and I think that it could handle reading the same partition in parallel if it wasn't sharing a consumer. If the mode of committing the offset in bundle finalizations, the offset being committed could be out-of-order or stale anyway. So I think mainly we are preventing the splitting because the current caching strategy doesn't work well with it. I'm also not sure if it would actually improve throughput to read from the same partition from different threads but if we are reading fixed non-overlapping ranges it seems like it possibly could. ########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/UnsplittableRestrictionTracker.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.splittabledofn; + +/** + * A {@link RestrictionTracker} for wrapping a {@link RestrictionTracker} with unsplittable + * restrictions. + * + * <p>A restriction is considered unsplittable when restrictions of an element must not be processed + * simultaneously (e.g., Kafka topic partition). + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class UnsplittableRestrictionTracker<RestrictionT, PositionT> + extends RestrictionTracker<RestrictionT, PositionT> implements RestrictionTracker.HasProgress { + private final RestrictionTracker<RestrictionT, PositionT> tracker; + + public UnsplittableRestrictionTracker(RestrictionTracker<RestrictionT, PositionT> tracker) { + this.tracker = tracker; + } + + @Override + public boolean tryClaim(PositionT position) { + return tracker.tryClaim(position); + } + + @Override + public RestrictionT currentRestriction() { + return tracker.currentRestriction(); + } + + @Override + public SplitResult<RestrictionT> trySplit(double fractionOfRemainder) { Review Comment: base method is @Nullable, if you mark this nullable here, can you remove the suppression above? If for some reason it still doesn't work can the suppression be limited to this function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
