egalpin commented on a change in pull request #17097:
URL: https://github.com/apache/beam/pull/17097#discussion_r828029641
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -221,6 +221,31 @@ private GroupIntoBatches(BatchingParams<InputT> params) {
duration));
}
+ public GroupIntoBatches<K, InputT> withByteSize(Long batchSizeBytes) {
+ checkArgument(
+ batchSizeBytes != null && batchSizeBytes < Long.MAX_VALUE &&
batchSizeBytes > 0,
+ "batchSizeBytes should be a non-negative value less than " +
Long.MAX_VALUE);
+ return new GroupIntoBatches<>(
+ BatchingParams.create(
+ params.getBatchSize(),
+ batchSizeBytes,
+ params.getElementByteSize(),
+ params.getMaxBufferingDuration()));
+ }
+
+ public GroupIntoBatches<K, InputT> withByteSize(
Review comment:
cc: @reuvenlax since you seem to have made many of the recent changes to
GroupIntoBatches. I added these methods to allow for users to make use of
multiple batching criteria simultaneously. Another option for exposing this
configuration option would be to implement a method like:
```java
public static <K, InputT> GroupIntoBatches<K, InputT>
fromBatchingParams(BatchingParams params) {
return new GroupIntoBatches<>(params);
}
```
Thoughts on the preferred approach?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]