[ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-3234:
-----------------------------
    Description: 
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885

{code}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(GenerateSequence.from(0).to(100))
        .apply(MapElements
            .into(TypeDescriptor.of(String.class))
            .via(x -> {
              StringBuilder b = new StringBuilder();
              for (int i = 0; i < 10000000; i++) {
                b.append("x");
              }
              return b.toString();
            }))
        .apply(PubsubIO
            .writeStrings()
            .to("projects/scio-playground/topics/payload-test"));

    pipeline.run().waitUntilFinish();
  }
}
{code}

The above code throws the following error:
{code}
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error]     "domain" : "global",
[error]     "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error]     "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
{code}

  was:
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{{
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline pipeline = Pipeline.create(options);

    pipeline
        .apply(GenerateSequence.from(0).to(100))
        .apply(MapElements
            .into(TypeDescriptor.of(String.class))
            .via(x -> {
              StringBuilder b = new StringBuilder();
              for (int i = 0; i < 10000000; i++) {
                b.append("x");
              }
              return b.toString();
            }))
        .apply(PubsubIO
            .writeStrings()
            .to("projects/scio-playground/topics/payload-test"));

    pipeline.run().waitUntilFinish();
  }
}
}}}

The above code throws the following error:
{{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error]     "domain" : "global",
[error]     "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error]     "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}}


> PubsubIO batch size should be configurable
> ------------------------------------------
>
>                 Key: BEAM-3234
>                 URL: https://issues.apache.org/jira/browse/BEAM-3234
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-gcp
>    Affects Versions: 2.1.0
>            Reporter: Neville Li
>            Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
>     PipelineOptions options = PipelineOptionsFactory.create();
>     Pipeline pipeline = Pipeline.create(options);
>     pipeline
>         .apply(GenerateSequence.from(0).to(100))
>         .apply(MapElements
>             .into(TypeDescriptor.of(String.class))
>             .via(x -> {
>               StringBuilder b = new StringBuilder();
>               for (int i = 0; i < 10000000; i++) {
>                 b.append("x");
>               }
>               return b.toString();
>             }))
>         .apply(PubsubIO
>             .writeStrings()
>             .to("projects/scio-playground/topics/payload-test"));
>     pipeline.run().waitUntilFinish();
>   }
> }
> {code}
> The above code throws the following error:
> {code}
> [error] Caused by: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> [error] {
> [error]   "code" : 400,
> [error]   "errors" : [ {
> [error]     "domain" : "global",
> [error]     "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error]     "reason" : "badRequest"
> [error]   } ],
> [error]   "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error]   "status" : "INVALID_ARGUMENT"
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to