[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable
[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-3234: - Description: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }} The above code throws the following error: {{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }} was: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }} The above code throws the following error: {{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }} > PubsubIO batch size should be configurable > -- > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp >Affects Versions: 2.1.0 >Reporter: Neville Li >Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > {{ > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(MapElements > .into(TypeDescriptor.of(String.class)) > .via(x -> { > StringBuilder b = new StringBuilder(); > for (int i = 0; i < 1000;
[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable
[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-3234: - Description: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{{ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }}} The above code throws the following error: {{{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }}} was: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }} The above code throws the following error: {{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }} > PubsubIO batch size should be configurable > -- > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp >Affects Versions: 2.1.0 >Reporter: Neville Li >Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > {{{ > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(MapElements > .into(TypeDescriptor.of(String.class)) > .via(x -> { > StringBuilder b = new StringBuilder(); > for (int i = 0; i < 100
[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable
[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-3234: - Description: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885 {code} import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } {code} The above code throws the following error: {code} [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" {code} was: Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{{ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 1000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }}} The above code throws the following error: {{{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }}} > PubsubIO batch size should be configurable > -- > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp >Affects Versions: 2.1.0 >Reporter: Neville Li >Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885 > {code} > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.crea
[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable
[ https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neville Li updated BEAM-3234: - Affects Version/s: 2.2.0 > PubsubIO batch size should be configurable > -- > > Key: BEAM-3234 > URL: https://issues.apache.org/jira/browse/BEAM-3234 > Project: Beam > Issue Type: Bug > Components: sdk-java-gcp >Affects Versions: 2.1.0, 2.2.0 >Reporter: Neville Li >Priority: Minor > > Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard > coded batch size that may cause this limit to be exceeded in some cases. > https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885 > {code} > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.GenerateSequence; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.MapElements; > import org.apache.beam.sdk.values.TypeDescriptor; > public class Test { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline pipeline = Pipeline.create(options); > pipeline > .apply(GenerateSequence.from(0).to(100)) > .apply(MapElements > .into(TypeDescriptor.of(String.class)) > .via(x -> { > StringBuilder b = new StringBuilder(); > for (int i = 0; i < 1000; i++) { > b.append("x"); > } > return b.toString(); > })) > .apply(PubsubIO > .writeStrings() > .to("projects/scio-playground/topics/payload-test")); > pipeline.run().waitUntilFinish(); > } > } > {code} > The above code throws the following error: > {code} > [error] Caused by: > com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad > Request > [error] { > [error] "code" : 400, > [error] "errors" : [ { > [error] "domain" : "global", > [error] "message" : "Request payload size exceeds the limit: 10485760 > bytes.", > [error] "reason" : "badRequest" > [error] } ], > [error] "message" : "Request payload size exceeds the limit: 10485760 > bytes.", > [error] "status" : "INVALID_ARGUMENT" > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)