mosche commented on code in PR #26946:
URL: https://github.com/apache/beam/pull/26946#discussion_r1259626710
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java:
##########
@@ -241,7 +277,442 @@ public void setup(PipelineOptions options) throws
Exception {
@ProcessElement
public void processElement(ProcessContext processContext) throws Exception
{
+ if (sqs == null) {
+ throw new IllegalStateException("No SQS client");
+ }
sqs.sendMessage(processContext.element());
}
}
+
+ /**
+ * A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more
information on usage
+ * and configuration.
+ */
+ @AutoValue
+ public abstract static class WriteBatches<T>
+ extends PTransform<PCollection<T>, WriteBatches.Result> {
+ private static final int DEFAULT_CONCURRENCY = 5;
+ private static final int MAX_BATCH_SIZE = 10;
+ private static final Duration DEFAULT_BATCH_TIMEOUT =
Duration.standardSeconds(3);
+
+ abstract @Pure int concurrentRequests();
+
+ abstract @Pure Duration batchTimeout();
+
+ abstract @Pure int batchSize();
+
+ abstract @Pure ClientConfiguration clientConfiguration();
+
+ abstract @Pure EntryBuilder<T> entryBuilder();
+
+ abstract @Pure @Nullable DynamicDestination<T> dynamicDestination();
+
+ abstract @Pure @Nullable String queueUrl();
+
+ abstract Builder<T> builder();
+
+ public interface DynamicDestination<T> extends Serializable {
+ String queueUrl(T message);
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+ abstract Builder<T> batchTimeout(Duration duration);
+
+ abstract Builder<T> batchSize(int batchSize);
+
+ abstract Builder<T> clientConfiguration(ClientConfiguration config);
+
+ abstract Builder<T> entryBuilder(EntryBuilder<T> entryBuilder);
+
+ abstract Builder<T> dynamicDestination(@Nullable DynamicDestination<T>
destination);
+
+ abstract Builder<T> queueUrl(@Nullable String queueUrl);
+
+ abstract WriteBatches<T> build();
+ }
+
+ /** Configuration of SQS client. */
+ public WriteBatches<T> withClientConfiguration(ClientConfiguration config)
{
+ checkArgument(config != null, "ClientConfiguration cannot be null");
+ return builder().clientConfiguration(config).build();
+ }
+
+ /** Max number of concurrent batch write requests per bundle, default is
{@code 5}. */
+ public WriteBatches<T> withConcurrentRequests(int concurrentRequests) {
+ checkArgument(concurrentRequests > 0, "concurrentRequests must be > 0");
+ return builder().concurrentRequests(concurrentRequests).build();
+ }
+
+ /** The batch size to use, default (and AWS limit) is {@code 10}. */
+ public WriteBatches<T> withBatchSize(int batchSize) {
+ checkArgument(
+ batchSize > 0 && batchSize <= MAX_BATCH_SIZE,
+ "Maximum allowed batch size is " + MAX_BATCH_SIZE);
+ return builder().batchSize(batchSize).build();
+ }
+
+ /**
+ * The duration to accumulate records before timing out, default is 3 secs.
+ *
+ * <p>Timeouts will be checked upon arrival of new messages.
+ */
+ public WriteBatches<T> withBatchTimeout(Duration timeout) {
+ return builder().batchTimeout(timeout).build();
+ }
+
+ /** Dynamic record based destination to write to. */
+ public WriteBatches<T> to(DynamicDestination<T> destination) {
+ checkArgument(destination != null, "DynamicDestination cannot be null");
+ return builder().queueUrl(null).dynamicDestination(destination).build();
+ }
+
+ /** Queue url to write to. */
+ public WriteBatches<T> to(String queueUrl) {
+ checkArgument(queueUrl != null, "queueUrl cannot be null");
+ return builder().dynamicDestination(null).queueUrl(queueUrl).build();
+ }
+
+ @Override
+ public Result expand(PCollection<T> input) {
+ checkState(dynamicDestination() != null || queueUrl() != null, "to is
required");
Review Comment:
removed this one, the check below validates the exactly same again
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java:
##########
@@ -241,7 +277,442 @@ public void setup(PipelineOptions options) throws
Exception {
@ProcessElement
public void processElement(ProcessContext processContext) throws Exception
{
+ if (sqs == null) {
+ throw new IllegalStateException("No SQS client");
+ }
sqs.sendMessage(processContext.element());
}
}
+
+ /**
+ * A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more
information on usage
+ * and configuration.
+ */
+ @AutoValue
+ public abstract static class WriteBatches<T>
+ extends PTransform<PCollection<T>, WriteBatches.Result> {
+ private static final int DEFAULT_CONCURRENCY = 5;
+ private static final int MAX_BATCH_SIZE = 10;
+ private static final Duration DEFAULT_BATCH_TIMEOUT =
Duration.standardSeconds(3);
+
+ abstract @Pure int concurrentRequests();
+
+ abstract @Pure Duration batchTimeout();
+
+ abstract @Pure int batchSize();
+
+ abstract @Pure ClientConfiguration clientConfiguration();
+
+ abstract @Pure EntryBuilder<T> entryBuilder();
+
+ abstract @Pure @Nullable DynamicDestination<T> dynamicDestination();
+
+ abstract @Pure @Nullable String queueUrl();
+
+ abstract Builder<T> builder();
+
+ public interface DynamicDestination<T> extends Serializable {
+ String queueUrl(T message);
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+ abstract Builder<T> batchTimeout(Duration duration);
+
+ abstract Builder<T> batchSize(int batchSize);
+
+ abstract Builder<T> clientConfiguration(ClientConfiguration config);
+
+ abstract Builder<T> entryBuilder(EntryBuilder<T> entryBuilder);
+
+ abstract Builder<T> dynamicDestination(@Nullable DynamicDestination<T>
destination);
+
+ abstract Builder<T> queueUrl(@Nullable String queueUrl);
+
+ abstract WriteBatches<T> build();
+ }
+
+ /** Configuration of SQS client. */
+ public WriteBatches<T> withClientConfiguration(ClientConfiguration config)
{
+ checkArgument(config != null, "ClientConfiguration cannot be null");
+ return builder().clientConfiguration(config).build();
+ }
+
+ /** Max number of concurrent batch write requests per bundle, default is
{@code 5}. */
+ public WriteBatches<T> withConcurrentRequests(int concurrentRequests) {
+ checkArgument(concurrentRequests > 0, "concurrentRequests must be > 0");
+ return builder().concurrentRequests(concurrentRequests).build();
+ }
+
+ /** The batch size to use, default (and AWS limit) is {@code 10}. */
+ public WriteBatches<T> withBatchSize(int batchSize) {
+ checkArgument(
+ batchSize > 0 && batchSize <= MAX_BATCH_SIZE,
+ "Maximum allowed batch size is " + MAX_BATCH_SIZE);
+ return builder().batchSize(batchSize).build();
+ }
+
+ /**
+ * The duration to accumulate records before timing out, default is 3 secs.
+ *
+ * <p>Timeouts will be checked upon arrival of new messages.
+ */
+ public WriteBatches<T> withBatchTimeout(Duration timeout) {
+ return builder().batchTimeout(timeout).build();
+ }
+
+ /** Dynamic record based destination to write to. */
+ public WriteBatches<T> to(DynamicDestination<T> destination) {
+ checkArgument(destination != null, "DynamicDestination cannot be null");
+ return builder().queueUrl(null).dynamicDestination(destination).build();
+ }
+
+ /** Queue url to write to. */
+ public WriteBatches<T> to(String queueUrl) {
+ checkArgument(queueUrl != null, "queueUrl cannot be null");
+ return builder().dynamicDestination(null).queueUrl(queueUrl).build();
+ }
+
+ @Override
+ public Result expand(PCollection<T> input) {
+ checkState(dynamicDestination() != null || queueUrl() != null, "to is
required");
+ AwsOptions awsOptions =
input.getPipeline().getOptions().as(AwsOptions.class);
+ ClientBuilderFactory.validate(awsOptions, clientConfiguration());
+
+ input.apply(
+ ParDo.of(
+ new DoFn<T, Void>() {
+ private @Nullable BatchHandler<T> handler = null;
+
+ @Setup
+ public void setup(PipelineOptions options) {
+ handler = new BatchHandler<>(WriteBatches.this,
options.as(AwsOptions.class));
+ }
+
+ @StartBundle
+ public void startBundle() {
+ handler().startBundle();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext cxt) throws
Throwable {
+ handler().process(cxt.element());
+ }
+
+ @FinishBundle
+ public void finishBundle() throws Throwable {
+ handler().finishBundle();
+ }
+
+ @Teardown
+ public void teardown() throws Exception {
+ if (handler != null) {
+ handler.close();
+ handler = null;
+ }
+ }
+
+ private BatchHandler<T> handler() {
+ return checkStateNotNull(handler, "SQS handler is null");
+ }
+ }));
+ return new Result(input.getPipeline());
+ }
+
+ /** Batch entry builder. */
+ public interface EntryBuilder<T>
+ extends BiConsumer<SendMessageBatchRequestEntry.Builder, T>,
Serializable {}
+
+ /** Result of {@link #writeBatches}. */
+ public static class Result implements POutput {
+ private final Pipeline pipeline;
+
+ private Result(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> expand() {
+ return ImmutableMap.of();
+ }
+
+ @Override
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform) {}
+ }
+
+ private static class BatchHandler<T> implements AutoCloseable {
+ private final WriteBatches<T> spec;
+ private final SqsAsyncClient sqs;
+ private final Batches batches;
+ private final AsyncBatchWriteHandler<SendMessageBatchRequestEntry,
BatchResultErrorEntry>
+ handler;
+
+ BatchHandler(WriteBatches<T> spec, AwsOptions options) {
+ this.spec = spec;
+ this.sqs = buildClient(options, SqsAsyncClient.builder(),
spec.clientConfiguration());
+ this.handler =
+ AsyncBatchWriteHandler.byId(
+ spec.concurrentRequests(),
+ spec.batchSize(),
+ spec.clientConfiguration().retry(),
+ Stats.NONE,
+ (queue, records) -> sendMessageBatch(sqs, queue, records),
+ error -> error.code(),
+ record -> record.id(),
+ error -> error.id());
+ if (spec.queueUrl() != null) {
+ this.batches = new Single(spec.queueUrl());
+ } else if (spec.dynamicDestination() != null) {
+ this.batches = new Dynamic(spec.dynamicDestination());
+ } else {
+ throw new IllegalStateException("queueUrl or dynamicDestination
required");
Review Comment:
👍
##########
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsIO.java:
##########
@@ -241,7 +277,442 @@ public void setup(PipelineOptions options) throws
Exception {
@ProcessElement
public void processElement(ProcessContext processContext) throws Exception
{
+ if (sqs == null) {
+ throw new IllegalStateException("No SQS client");
+ }
sqs.sendMessage(processContext.element());
}
}
+
+ /**
+ * A {@link PTransform} to send messages to SQS. See {@link SqsIO} for more
information on usage
+ * and configuration.
+ */
+ @AutoValue
+ public abstract static class WriteBatches<T>
+ extends PTransform<PCollection<T>, WriteBatches.Result> {
+ private static final int DEFAULT_CONCURRENCY = 5;
+ private static final int MAX_BATCH_SIZE = 10;
+ private static final Duration DEFAULT_BATCH_TIMEOUT =
Duration.standardSeconds(3);
+
+ abstract @Pure int concurrentRequests();
+
+ abstract @Pure Duration batchTimeout();
+
+ abstract @Pure int batchSize();
+
+ abstract @Pure ClientConfiguration clientConfiguration();
+
+ abstract @Pure EntryBuilder<T> entryBuilder();
+
+ abstract @Pure @Nullable DynamicDestination<T> dynamicDestination();
+
+ abstract @Pure @Nullable String queueUrl();
+
+ abstract Builder<T> builder();
+
+ public interface DynamicDestination<T> extends Serializable {
+ String queueUrl(T message);
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder<T> {
+ abstract Builder<T> concurrentRequests(int concurrentRequests);
+
+ abstract Builder<T> batchTimeout(Duration duration);
+
+ abstract Builder<T> batchSize(int batchSize);
+
+ abstract Builder<T> clientConfiguration(ClientConfiguration config);
+
+ abstract Builder<T> entryBuilder(EntryBuilder<T> entryBuilder);
+
+ abstract Builder<T> dynamicDestination(@Nullable DynamicDestination<T>
destination);
+
+ abstract Builder<T> queueUrl(@Nullable String queueUrl);
+
+ abstract WriteBatches<T> build();
+ }
+
+ /** Configuration of SQS client. */
+ public WriteBatches<T> withClientConfiguration(ClientConfiguration config)
{
+ checkArgument(config != null, "ClientConfiguration cannot be null");
+ return builder().clientConfiguration(config).build();
+ }
+
+ /** Max number of concurrent batch write requests per bundle, default is
{@code 5}. */
+ public WriteBatches<T> withConcurrentRequests(int concurrentRequests) {
+ checkArgument(concurrentRequests > 0, "concurrentRequests must be > 0");
+ return builder().concurrentRequests(concurrentRequests).build();
+ }
+
+ /** The batch size to use, default (and AWS limit) is {@code 10}. */
+ public WriteBatches<T> withBatchSize(int batchSize) {
+ checkArgument(
+ batchSize > 0 && batchSize <= MAX_BATCH_SIZE,
+ "Maximum allowed batch size is " + MAX_BATCH_SIZE);
+ return builder().batchSize(batchSize).build();
+ }
+
+ /**
+ * The duration to accumulate records before timing out, default is 3 secs.
+ *
+ * <p>Timeouts will be checked upon arrival of new messages.
+ */
+ public WriteBatches<T> withBatchTimeout(Duration timeout) {
+ return builder().batchTimeout(timeout).build();
+ }
+
+ /** Dynamic record based destination to write to. */
+ public WriteBatches<T> to(DynamicDestination<T> destination) {
+ checkArgument(destination != null, "DynamicDestination cannot be null");
+ return builder().queueUrl(null).dynamicDestination(destination).build();
+ }
+
+ /** Queue url to write to. */
+ public WriteBatches<T> to(String queueUrl) {
+ checkArgument(queueUrl != null, "queueUrl cannot be null");
+ return builder().dynamicDestination(null).queueUrl(queueUrl).build();
+ }
+
+ @Override
+ public Result expand(PCollection<T> input) {
+ checkState(dynamicDestination() != null || queueUrl() != null, "to is
required");
+ AwsOptions awsOptions =
input.getPipeline().getOptions().as(AwsOptions.class);
+ ClientBuilderFactory.validate(awsOptions, clientConfiguration());
+
+ input.apply(
+ ParDo.of(
+ new DoFn<T, Void>() {
+ private @Nullable BatchHandler<T> handler = null;
+
+ @Setup
+ public void setup(PipelineOptions options) {
+ handler = new BatchHandler<>(WriteBatches.this,
options.as(AwsOptions.class));
+ }
+
+ @StartBundle
+ public void startBundle() {
+ handler().startBundle();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext cxt) throws
Throwable {
+ handler().process(cxt.element());
+ }
+
+ @FinishBundle
+ public void finishBundle() throws Throwable {
+ handler().finishBundle();
+ }
+
+ @Teardown
+ public void teardown() throws Exception {
+ if (handler != null) {
+ handler.close();
+ handler = null;
+ }
+ }
+
+ private BatchHandler<T> handler() {
+ return checkStateNotNull(handler, "SQS handler is null");
+ }
+ }));
+ return new Result(input.getPipeline());
+ }
+
+ /** Batch entry builder. */
+ public interface EntryBuilder<T>
+ extends BiConsumer<SendMessageBatchRequestEntry.Builder, T>,
Serializable {}
+
+ /** Result of {@link #writeBatches}. */
+ public static class Result implements POutput {
+ private final Pipeline pipeline;
+
+ private Result(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ @Override
+ public Map<TupleTag<?>, PValue> expand() {
+ return ImmutableMap.of();
+ }
+
+ @Override
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform) {}
+ }
+
+ private static class BatchHandler<T> implements AutoCloseable {
+ private final WriteBatches<T> spec;
+ private final SqsAsyncClient sqs;
+ private final Batches batches;
+ private final AsyncBatchWriteHandler<SendMessageBatchRequestEntry,
BatchResultErrorEntry>
+ handler;
+
+ BatchHandler(WriteBatches<T> spec, AwsOptions options) {
+ this.spec = spec;
+ this.sqs = buildClient(options, SqsAsyncClient.builder(),
spec.clientConfiguration());
+ this.handler =
+ AsyncBatchWriteHandler.byId(
+ spec.concurrentRequests(),
+ spec.batchSize(),
+ spec.clientConfiguration().retry(),
+ Stats.NONE,
+ (queue, records) -> sendMessageBatch(sqs, queue, records),
+ error -> error.code(),
+ record -> record.id(),
+ error -> error.id());
+ if (spec.queueUrl() != null) {
+ this.batches = new Single(spec.queueUrl());
+ } else if (spec.dynamicDestination() != null) {
+ this.batches = new Dynamic(spec.dynamicDestination());
+ } else {
+ throw new IllegalStateException("queueUrl or dynamicDestination
required");
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]