[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable

2017-11-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-3234:
-
Description: 
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}

The above code throws the following error:
{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}

  was:
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}

The above code throws the following error:
{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}


> PubsubIO batch size should be configurable
> --
>
> Key: BEAM-3234
> URL: https://issues.apache.org/jira/browse/BEAM-3234
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Neville Li
>Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> {{
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(MapElements
> .into(TypeDescriptor.of(String.class))
> .via(x -> {
>   StringBuilder b = new StringBuilder();
>   for (int i = 0; i < 1000;

[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable

2017-11-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-3234:
-
Description: 
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{{
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}}

The above code throws the following error:
{{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}}

  was:
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}

The above code throws the following error:
{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}


> PubsubIO batch size should be configurable
> --
>
> Key: BEAM-3234
> URL: https://issues.apache.org/jira/browse/BEAM-3234
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Neville Li
>Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> {{{
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(MapElements
> .into(TypeDescriptor.of(String.class))
> .via(x -> {
>   StringBuilder b = new StringBuilder();
>   for (int i = 0; i < 100

[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable

2017-11-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-3234:
-
Description: 
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885

{code}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
{code}

The above code throws the following error:
{code}
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
{code}

  was:
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{{
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}}

The above code throws the following error:
{{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}}


> PubsubIO batch size should be configurable
> --
>
> Key: BEAM-3234
> URL: https://issues.apache.org/jira/browse/BEAM-3234
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Neville Li
>Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.crea

[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable

2017-12-01 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-3234:
-
Affects Version/s: 2.2.0

> PubsubIO batch size should be configurable
> --
>
> Key: BEAM-3234
> URL: https://issues.apache.org/jira/browse/BEAM-3234
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Neville Li
>Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(MapElements
> .into(TypeDescriptor.of(String.class))
> .via(x -> {
>   StringBuilder b = new StringBuilder();
>   for (int i = 0; i < 1000; i++) {
> b.append("x");
>   }
>   return b.toString();
> }))
> .apply(PubsubIO
> .writeStrings()
> .to("projects/scio-playground/topics/payload-test"));
> pipeline.run().waitUntilFinish();
>   }
> }
> {code}
> The above code throws the following error:
> {code}
> [error] Caused by: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> [error] {
> [error]   "code" : 400,
> [error]   "errors" : [ {
> [error] "domain" : "global",
> [error] "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error] "reason" : "badRequest"
> [error]   } ],
> [error]   "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error]   "status" : "INVALID_ARGUMENT"
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)