[ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275383#comment-16275383
 ] 

Neville Li commented on BEAM-3234:
----------------------------------

Affects 2.2.0 as well.
https://github.com/apache/beam/blob/release-2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L873

> PubsubIO batch size should be configurable
> ------------------------------------------
>
>                 Key: BEAM-3234
>                 URL: https://issues.apache.org/jira/browse/BEAM-3234
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-gcp
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Neville Li
>            Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
>     PipelineOptions options = PipelineOptionsFactory.create();
>     Pipeline pipeline = Pipeline.create(options);
>     pipeline
>         .apply(GenerateSequence.from(0).to(100))
>         .apply(MapElements
>             .into(TypeDescriptor.of(String.class))
>             .via(x -> {
>               StringBuilder b = new StringBuilder();
>               for (int i = 0; i < 10000000; i++) {
>                 b.append("x");
>               }
>               return b.toString();
>             }))
>         .apply(PubsubIO
>             .writeStrings()
>             .to("projects/scio-playground/topics/payload-test"));
>     pipeline.run().waitUntilFinish();
>   }
> }
> {code}
> The above code throws the following error:
> {code}
> [error] Caused by: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> [error] {
> [error]   "code" : 400,
> [error]   "errors" : [ {
> [error]     "domain" : "global",
> [error]     "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error]     "reason" : "badRequest"
> [error]   } ],
> [error]   "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error]   "status" : "INVALID_ARGUMENT"
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to