[ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16286710#comment-16286710
 ] 

Eugene Kirpichov commented on BEAM-3234:
----------------------------------------

Agreed, based on looking at the code, this seems easy to fix in 
PubsubBoundedWriter. The unbounded write path seems to already handle byte size 
limits properly. I'm not quite sure why these use separate codepaths.

> PubsubIO batch size should be configurable
> ------------------------------------------
>
>                 Key: BEAM-3234
>                 URL: https://issues.apache.org/jira/browse/BEAM-3234
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-gcp
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Neville Li
>            Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
>     PipelineOptions options = PipelineOptionsFactory.create();
>     Pipeline pipeline = Pipeline.create(options);
>     pipeline
>         .apply(GenerateSequence.from(0).to(100))
>         .apply(MapElements
>             .into(TypeDescriptor.of(String.class))
>             .via(x -> {
>               StringBuilder b = new StringBuilder();
>               for (int i = 0; i < 10000000; i++) {
>                 b.append("x");
>               }
>               return b.toString();
>             }))
>         .apply(PubsubIO
>             .writeStrings()
>             .to("projects/scio-playground/topics/payload-test"));
>     pipeline.run().waitUntilFinish();
>   }
> }
> {code}
> The above code throws the following error:
> {code}
> [error] Caused by: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> [error] {
> [error]   "code" : 400,
> [error]   "errors" : [ {
> [error]     "domain" : "global",
> [error]     "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error]     "reason" : "badRequest"
> [error]   } ],
> [error]   "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error]   "status" : "INVALID_ARGUMENT"
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to