Neville Li created BEAM-3234:
--------------------------------

             Summary: PubsubIO batch size should be configurable
                 Key: BEAM-3234
                 URL: https://issues.apache.org/jira/browse/BEAM-3234
             Project: Beam
          Issue Type: Bug
          Components: sdk-java-gcp
    Affects Versions: 2.1.0
            Reporter: Neville Li
            Priority: Minor


Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(GenerateSequence.from(0).to(100))
        .apply(MapElements
            .into(TypeDescriptor.of(String.class))
            .via(x -> {
              StringBuilder b = new StringBuilder();
              for (int i = 0; i < 10000000; i++) {
                b.append("x");
              }
              return b.toString();
            }))
        .apply(PubsubIO
            .writeStrings()
            .to("projects/scio-playground/topics/payload-test"));

    pipeline.run().waitUntilFinish();
  }
}
}}

The above code throws the following error:
{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error]     "domain" : "global",
[error]     "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error]     "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to