[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Neville Li updated BEAM-3234: ----------------------------- Affects Version/s: 2.2.0 > PubsubIO batch size should be configurable > ------------------------------------------ > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp > Affects Versions: 2.1.0, 2.2.0 > Reporter: Neville Li > Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885 > {code} > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(MapElements > .into(TypeDescriptor.of(String.class)) > .via(x -> { > StringBuilder b = new StringBuilder(); > for (int i = 0; i < 10000000; i++) { > b.append("x"); > } > return b.toString(); > })) > .apply(PubsubIO > .writeStrings() > .to("projects/scio-playground/topics/payload-test")); > pipeline.run().waitUntilFinish(); > } > } > {code} > The above code throws the following error: > {code} > [error] Caused by: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > [error] { > [error] "code" : 400, > [error] "errors" : [ { > [error] "domain" : "global", > [error] "message" : "Request payload size exceeds the limit: 10485760 > bytes.", > [error] "reason" : "badRequest" > [error] } ], > [error] "message" : "Request payload size exceeds the limit: 10485760 > bytes.", > [error] "status" : "INVALID_ARGUMENT" > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)