[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Neville Li updated BEAM-3234: ----------------------------- Description: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 10000000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }} The above code throws the following error: {{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }} was: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 10000000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }} The above code throws the following error: {{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }} > PubsubIO batch size should be configurable > ------------------------------------------ > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp > Affects Versions: 2.1.0 > Reporter: Neville Li > Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > {{ > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(MapElements > .into(TypeDescriptor.of(String.class)) > .via(x -> { > StringBuilder b = new StringBuilder(); > for (int i = 0; i < 10000000; i++) { > b.append("x"); > } > return b.toString(); > })) > .apply(PubsubIO > .writeStrings() > .to("projects/scio-playground/topics/payload-test")); > pipeline.run().waitUntilFinish(); > } > } > }} > The above code throws the following error: > {{ > [error] Caused by: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > [error] { > [error] "code" : 400, > [error] "errors" : [ { > [error] "domain" : "global", > [error] "message" : "Request payload size exceeds the limit: 10485760 > bytes.", > [error] "reason" : "badRequest" > [error] } ], > [error] "message" : "Request payload size exceeds the limit: 10485760 > bytes.", > [error] "status" : "INVALID_ARGUMENT" > }} -- This message was sent by Atlassian JIRA (v6.4.14#64029)