[jira] [Resolved] (BEAM-3247) Sample.any memory constraint

2018-09-24 Thread Neville Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li resolved BEAM-3247.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

> Sample.any memory constraint
> 
>
> Key: BEAM-3247
> URL: https://issues.apache.org/jira/browse/BEAM-3247
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 2.1.0
>Reporter: Neville Li
>Assignee: Neville Li
>Priority: Minor
> Fix For: 2.2.0
>
>
> Right now {{Sample.any}} converts the collection to an iterable view and take 
> first n in a side input. This may require materializing the entire collection 
> to disk and is potentially inefficient.
> https://github.com/apache/beam/blob/v2.1.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java#L74
> It can be fixed by applying a truncating `DoFn` first, then a combine into 
> `List` which limits the list size, and finally flattening the list.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-08-29 Thread Neville Li (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596565#comment-16596565
 ] 

Neville Li commented on BEAM-5036:
--

Yeah that's why I figured. So there's no way to reduce this overhead on GCS 
unless if GCS starts to support efficient object {{rename}}.

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> --
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Tim Robertson
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-08-29 Thread Neville Li (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596565#comment-16596565
 ] 

Neville Li edited comment on BEAM-5036 at 8/29/18 4:23 PM:
---

Yeah that's what I figured. So there's no way to reduce this overhead on GCS 
unless if GCS starts to support efficient object {{rename}}.


was (Author: sinisa_lyh):
Yeah that's why I figured. So there's no way to reduce this overhead on GCS 
unless if GCS starts to support efficient object {{rename}}.

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> --
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Tim Robertson
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-08-29 Thread Neville Li (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596513#comment-16596513
 ] 

Neville Li commented on BEAM-5036:
--

{{copy+delete}} is still expensive on GCS, especially when running 10Ks of jobs 
writing TBs of data daily. My memory is a bit vague, but was there a time when 
{{AvroIO}} wrote to output files directly without a {{rename}} or 
{{copy+delete}}?

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> --
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Tim Robertson
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-08-29 Thread Neville Li (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596410#comment-16596410
 ] 

Neville Li commented on BEAM-5036:
--

Yeah that's my main concern. We use GCS almost exclusively so all our jobs are 
affected by this. 

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> --
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Tim Robertson
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5036) Optimize FileBasedSink's WriteOperation.moveToOutput()

2018-08-29 Thread Neville Li (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16596392#comment-16596392
 ] 

Neville Li commented on BEAM-5036:
--

If I understand this correctly, this issue affects all file based IOs, 
including Avro? We have a lot of jobs with huge Avro outputs.

> Optimize FileBasedSink's WriteOperation.moveToOutput()
> --
>
> Key: BEAM-5036
> URL: https://issues.apache.org/jira/browse/BEAM-5036
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-files
>Affects Versions: 2.5.0
>Reporter: Jozef Vilcek
>Assignee: Tim Robertson
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> moveToOutput() methods in FileBasedSink.WriteOperation implements move by 
> copy+delete. It would be better to use a rename() which can be much more 
> effective for some filesystems.
> Filesystem must support cross-directory rename. BEAM-4861 is related to this 
> for the case of HDFS filesystem.
> Feature was discussed here:
> http://mail-archives.apache.org/mod_mbox/beam-dev/201807.mbox/%3CCAF9t7_4Mp54pQ+vRrJrBh9Vx0=uaknupzd_qdh_qdm9vxll...@mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3234) PubsubIO batch size should be configurable

2017-12-01 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16275383#comment-16275383
 ] 

Neville Li commented on BEAM-3234:
--

Affects 2.2.0 as well.
https://github.com/apache/beam/blob/release-2.2.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L873

> PubsubIO batch size should be configurable
> --
>
> Key: BEAM-3234
> URL: https://issues.apache.org/jira/browse/BEAM-3234
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Neville Li
>Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(MapElements
> .into(TypeDescriptor.of(String.class))
> .via(x -> {
>   StringBuilder b = new StringBuilder();
>   for (int i = 0; i < 1000; i++) {
> b.append("x");
>   }
>   return b.toString();
> }))
> .apply(PubsubIO
> .writeStrings()
> .to("projects/scio-playground/topics/payload-test"));
> pipeline.run().waitUntilFinish();
>   }
> }
> {code}
> The above code throws the following error:
> {code}
> [error] Caused by: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> [error] {
> [error]   "code" : 400,
> [error]   "errors" : [ {
> [error] "domain" : "global",
> [error] "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error] "reason" : "badRequest"
> [error]   } ],
> [error]   "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error]   "status" : "INVALID_ARGUMENT"
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable

2017-12-01 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-3234:
-
Affects Version/s: 2.2.0

> PubsubIO batch size should be configurable
> --
>
> Key: BEAM-3234
> URL: https://issues.apache.org/jira/browse/BEAM-3234
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Neville Li
>Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(MapElements
> .into(TypeDescriptor.of(String.class))
> .via(x -> {
>   StringBuilder b = new StringBuilder();
>   for (int i = 0; i < 1000; i++) {
> b.append("x");
>   }
>   return b.toString();
> }))
> .apply(PubsubIO
> .writeStrings()
> .to("projects/scio-playground/topics/payload-test"));
> pipeline.run().waitUntilFinish();
>   }
> }
> {code}
> The above code throws the following error:
> {code}
> [error] Caused by: 
> com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
> Request
> [error] {
> [error]   "code" : 400,
> [error]   "errors" : [ {
> [error] "domain" : "global",
> [error] "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error] "reason" : "badRequest"
> [error]   } ],
> [error]   "message" : "Request payload size exceeds the limit: 10485760 
> bytes.",
> [error]   "status" : "INVALID_ARGUMENT"
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-991) DatastoreIO Write should flush early for large batches

2017-11-28 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-991:

Fix Version/s: 2.1.0

> DatastoreIO Write should flush early for large batches
> --
>
> Key: BEAM-991
> URL: https://issues.apache.org/jira/browse/BEAM-991
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Reporter: Vikas Kedigehalli
>Assignee: Vikas Kedigehalli
> Fix For: 2.1.0
>
>
> If entities are large (avg size > 20KB) then the a single batched write (500 
> entities) would exceed the Datastore size limit of a single request (10MB) 
> from https://cloud.google.com/datastore/docs/concepts/limits.
> First reported in: 
> http://stackoverflow.com/questions/40156400/why-does-dataflow-erratically-fail-in-datastore-access



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-3247) Sample.any memory constraint

2017-11-24 Thread Neville Li (JIRA)
Neville Li created BEAM-3247:


 Summary: Sample.any memory constraint
 Key: BEAM-3247
 URL: https://issues.apache.org/jira/browse/BEAM-3247
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core
Affects Versions: 2.1.0
Reporter: Neville Li
Assignee: Kenneth Knowles
Priority: Minor


Right now {{Sample.any}} converts the collection to an iterable view and take 
first n in a side input. This may require materializing the entire collection 
to disk and is potentially inefficient.
https://github.com/apache/beam/blob/v2.1.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java#L74

It can be fixed by applying a truncating `DoFn` first, then a combine into 
`List` which limits the list size, and finally flattening the list.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-3247) Sample.any memory constraint

2017-11-24 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li reassigned BEAM-3247:


Assignee: Neville Li  (was: Kenneth Knowles)

> Sample.any memory constraint
> 
>
> Key: BEAM-3247
> URL: https://issues.apache.org/jira/browse/BEAM-3247
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 2.1.0
>Reporter: Neville Li
>Assignee: Neville Li
>Priority: Minor
>
> Right now {{Sample.any}} converts the collection to an iterable view and take 
> first n in a side input. This may require materializing the entire collection 
> to disk and is potentially inefficient.
> https://github.com/apache/beam/blob/v2.1.0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java#L74
> It can be fixed by applying a truncating `DoFn` first, then a combine into 
> `List` which limits the list size, and finally flattening the list.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable

2017-11-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-3234:
-
Description: 
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885

{code}
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
{code}

The above code throws the following error:
{code}
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
{code}

  was:
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{{
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}}

The above code throws the following error:
{{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}}


> PubsubIO batch size should be configurable
> --
>
> Key: BEAM-3234
> URL: https://issues.apache.org/jira/browse/BEAM-3234
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Neville Li
>Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> https://github.com/apache/beam/blob/release-2.1.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L885
> {code}
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
> PipelineOptions options = 

[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable

2017-11-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-3234:
-
Description: 
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{{
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}}

The above code throws the following error:
{{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}}

  was:
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}

The above code throws the following error:
{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}


> PubsubIO batch size should be configurable
> --
>
> Key: BEAM-3234
> URL: https://issues.apache.org/jira/browse/BEAM-3234
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Neville Li
>Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> {{{
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(MapElements
> .into(TypeDescriptor.of(String.class))
> .via(x -> {
>   StringBuilder b = new StringBuilder();
>   for (int i = 0; i < 

[jira] [Updated] (BEAM-3234) PubsubIO batch size should be configurable

2017-11-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-3234:
-
Description: 
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}

The above code throws the following error:
{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}

  was:
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}

The above code throws the following error:
{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}


> PubsubIO batch size should be configurable
> --
>
> Key: BEAM-3234
> URL: https://issues.apache.org/jira/browse/BEAM-3234
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Neville Li
>Priority: Minor
>
> Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
> coded batch size that may cause this limit to be exceeded in some cases.
> {{
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.GenerateSequence;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.MapElements;
> import org.apache.beam.sdk.values.TypeDescriptor;
> public class Test {
>   public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(MapElements
> .into(TypeDescriptor.of(String.class))
> .via(x -> {
>   StringBuilder b = new StringBuilder();
>   for (int i = 0; i < 

[jira] [Created] (BEAM-3234) PubsubIO batch size should be configurable

2017-11-21 Thread Neville Li (JIRA)
Neville Li created BEAM-3234:


 Summary: PubsubIO batch size should be configurable
 Key: BEAM-3234
 URL: https://issues.apache.org/jira/browse/BEAM-3234
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-gcp
Affects Versions: 2.1.0
Reporter: Neville Li
Priority: Minor


Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard 
coded batch size that may cause this limit to be exceeded in some cases.

{{import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class Test {
  public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(MapElements
.into(TypeDescriptor.of(String.class))
.via(x -> {
  StringBuilder b = new StringBuilder();
  for (int i = 0; i < 1000; i++) {
b.append("x");
  }
  return b.toString();
}))
.apply(PubsubIO
.writeStrings()
.to("projects/scio-playground/topics/payload-test"));

pipeline.run().waitUntilFinish();
  }
}
}}

The above code throws the following error:
{{
[error] Caused by: 
com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad 
Request
[error] {
[error]   "code" : 400,
[error]   "errors" : [ {
[error] "domain" : "global",
[error] "message" : "Request payload size exceeds the limit: 10485760 
bytes.",
[error] "reason" : "badRequest"
[error]   } ],
[error]   "message" : "Request payload size exceeds the limit: 10485760 bytes.",
[error]   "status" : "INVALID_ARGUMENT"
}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2960) Missing type parameter in some AvroIO.Write API

2017-09-15 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li reassigned BEAM-2960:


Assignee: Neville Li  (was: Kenneth Knowles)

> Missing type parameter in some AvroIO.Write API
> ---
>
> Key: BEAM-2960
> URL: https://issues.apache.org/jira/browse/BEAM-2960
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.1.0
>Reporter: Neville Li
>Assignee: Neville Li
>Priority: Minor
>
> Like
> {{public Write to(DynamicAvroDestinations dynamicDestinations)}}
> {{public Write withSchema(Schema schema)}}
> {{public Write withWindowedWrites()}}
> {{public Write withMetadata(Map metadata)}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2766) HadoopInputFormatIO should support Void/null key/values

2017-08-14 Thread Neville Li (JIRA)
Neville Li created BEAM-2766:


 Summary: HadoopInputFormatIO should support Void/null key/values
 Key: BEAM-2766
 URL: https://issues.apache.org/jira/browse/BEAM-2766
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-extensions
Affects Versions: 2.2.0
Reporter: Neville Li
Assignee: Reuven Lax
Priority: Minor


Many Hadoop {{InputFormat}} implementations use {{Void}} as key/value type and 
generates null values which causes {{NullPointerException}} in 
https://github.com/apache/beam/blob/master/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java#L714

{{HadoopInputFormatIO}} should ignore these and not clone them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2765) HadoopInputFormatIO should support custom key/value coder

2017-08-14 Thread Neville Li (JIRA)
Neville Li created BEAM-2765:


 Summary: HadoopInputFormatIO should support custom key/value coder
 Key: BEAM-2765
 URL: https://issues.apache.org/jira/browse/BEAM-2765
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-extensions
Affects Versions: 2.2.0
Reporter: Neville Li
Assignee: Reuven Lax
Priority: Minor


Right now {{HadoopInputFormatIO}} infers coders with {{getDefaultCoder}} but 
this doesn't work for cases like {{AvroCoder}} with {{GenericRecord}}, for 
example, when Avro is used with Parquet.

https://github.com/apache/beam/blob/master/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java#L288



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder

2017-07-23 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16097699#comment-16097699
 ] 

Neville Li commented on BEAM-2658:
--

However I'd still argue that {{DefaultCoder}} and {{SerializableCoder}} are 2 
special cases. {{DefaultCoder}} should have the highest precedence and 
overrides typed based lookup while {{SerializableCoder}} should have the lowest 
as a fallback. Other coders in between don't really overlap.

> SerializableCoder has higher precedence over ProtoCoder in 
> CoderRegistry#getCoder
> -
>
> Key: BEAM-2658
> URL: https://issues.apache.org/jira/browse/BEAM-2658
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0
>Reporter: Neville Li
>Assignee: Davor Bonaci
>Priority: Minor
>
> {code}
> import com.google.protobuf.Timestamp;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.coders.CannotProvideCoderException;
> import org.apache.beam.sdk.coders.Coder;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> public class CoderTest {
>   public static void main(String[] args) throws CannotProvideCoderException {
> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
> Pipeline pipeline = Pipeline.create(options);
> Coder coder = 
> pipeline.getCoderRegistry().getCoder(Timestamp.class);
> // class org.apache.beam.sdk.coders.SerializableCoder
> System.out.println(coder.getClass());
>   }
> }
> {code}
> Right now we're sorting {{CoderProviderRegistrar}}s by canonical name but 
> {{SerializableCoderProvider}} should be added last as a fallback if there're 
> other {{CoderProvider}}s that support the same type.
> {code}
> Set registrars = 
> Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder

2017-07-23 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16097693#comment-16097693
 ] 

Neville Li commented on BEAM-2658:
--

Types covered by each {{CoderProvider}} may overlap and we might want sort them 
based on scope and not name.

2 ideas:
1. Add a `List getSupportedTypes()` and make sure that when 2 
providers overlap, the one with the narrower supported types gets precedence. 
Won't work with `DefaultCoder` though. Also a class can implement multiple 
interfaces and it doesn't specify which one has higher precedence. For example 
`ProtoCoder` supports `Message.class` while `SerializableCoder` supports 
`Serializable.class` but generated Protobuf classes also implements 
`Serializable`.
2. Assign an arbitrary int precedent to each coder, similar to UNIX rc/motd 
files, and store registered coders in a `TreeMap`. Not elegant but works.

> SerializableCoder has higher precedence over ProtoCoder in 
> CoderRegistry#getCoder
> -
>
> Key: BEAM-2658
> URL: https://issues.apache.org/jira/browse/BEAM-2658
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0
>Reporter: Neville Li
>Assignee: Davor Bonaci
>Priority: Minor
>
> {code}
> import com.google.protobuf.Timestamp;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.coders.CannotProvideCoderException;
> import org.apache.beam.sdk.coders.Coder;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> public class CoderTest {
>   public static void main(String[] args) throws CannotProvideCoderException {
> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
> Pipeline pipeline = Pipeline.create(options);
> Coder coder = 
> pipeline.getCoderRegistry().getCoder(Timestamp.class);
> // class org.apache.beam.sdk.coders.SerializableCoder
> System.out.println(coder.getClass());
>   }
> }
> {code}
> Right now we're sorting {{CoderProviderRegistrar}}s by canonical name but 
> {{SerializableCoderProvider}} should be added last as a fallback if there're 
> other {{CoderProvider}}s that support the same type.
> {code}
> Set registrars = 
> Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder

2017-07-22 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-2658:
-
Description: 
{code}
import com.google.protobuf.Timestamp;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class CoderTest {
  public static void main(String[] args) throws CannotProvideCoderException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

Coder coder = 
pipeline.getCoderRegistry().getCoder(Timestamp.class);

// class org.apache.beam.sdk.coders.SerializableCoder
System.out.println(coder.getClass());
  }
}
{code}

Right now we're sorting {{CoderProviderRegistrar}}s by canonical name but 
{{SerializableCoderProvider}} should be added last as a fallback if there're 
other {{CoderProvider}}s that support the same type.
{code}
Set registrars = 
Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
{code}

  was:
{code}
import com.google.protobuf.Timestamp;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class CoderTest {
  public static void main(String[] args) throws CannotProvideCoderException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

Coder coder = 
pipeline.getCoderRegistry().getCoder(Timestamp.class);

// class org.apache.beam.sdk.coders.SerializableCoder
System.out.println(coder.getClass());
  }
}
{code}


> SerializableCoder has higher precedence over ProtoCoder in 
> CoderRegistry#getCoder
> -
>
> Key: BEAM-2658
> URL: https://issues.apache.org/jira/browse/BEAM-2658
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0
>Reporter: Neville Li
>Assignee: Davor Bonaci
>Priority: Minor
>
> {code}
> import com.google.protobuf.Timestamp;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.coders.CannotProvideCoderException;
> import org.apache.beam.sdk.coders.Coder;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> public class CoderTest {
>   public static void main(String[] args) throws CannotProvideCoderException {
> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
> Pipeline pipeline = Pipeline.create(options);
> Coder coder = 
> pipeline.getCoderRegistry().getCoder(Timestamp.class);
> // class org.apache.beam.sdk.coders.SerializableCoder
> System.out.println(coder.getClass());
>   }
> }
> {code}
> Right now we're sorting {{CoderProviderRegistrar}}s by canonical name but 
> {{SerializableCoderProvider}} should be added last as a fallback if there're 
> other {{CoderProvider}}s that support the same type.
> {code}
> Set registrars = 
> Sets.newTreeSet(ObjectsClassComparator.INSTANCE);
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder

2017-07-22 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-2658:
-
Description: 
{code)
public class CoderTest {
  public static void main(String[] args) throws CannotProvideCoderException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

Coder coder = 
pipeline.getCoderRegistry().getCoder(Timestamp.class);

// class org.apache.beam.sdk.coders.SerializableCoder
System.out.println(coder.getClass());
  }
}
{code)

  was:
{{
public class CoderTest {
  public static void main(String[] args) throws CannotProvideCoderException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

Coder coder = 
pipeline.getCoderRegistry().getCoder(Timestamp.class);

// class org.apache.beam.sdk.coders.SerializableCoder
System.out.println(coder.getClass());
  }
}
}}


> SerializableCoder has higher precedence over ProtoCoder in 
> CoderRegistry#getCoder
> -
>
> Key: BEAM-2658
> URL: https://issues.apache.org/jira/browse/BEAM-2658
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0
>Reporter: Neville Li
>Assignee: Davor Bonaci
>Priority: Minor
>
> {code)
> public class CoderTest {
>   public static void main(String[] args) throws CannotProvideCoderException {
> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
> Pipeline pipeline = Pipeline.create(options);
> Coder coder = 
> pipeline.getCoderRegistry().getCoder(Timestamp.class);
> // class org.apache.beam.sdk.coders.SerializableCoder
> System.out.println(coder.getClass());
>   }
> }
> {code)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder

2017-07-22 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-2658:
-
Description: 
{code}
import com.google.protobuf.Timestamp;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class CoderTest {
  public static void main(String[] args) throws CannotProvideCoderException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

Coder coder = 
pipeline.getCoderRegistry().getCoder(Timestamp.class);

// class org.apache.beam.sdk.coders.SerializableCoder
System.out.println(coder.getClass());
  }
}
{code}

  was:
{code}
public class CoderTest {
  public static void main(String[] args) throws CannotProvideCoderException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

Coder coder = 
pipeline.getCoderRegistry().getCoder(Timestamp.class);

// class org.apache.beam.sdk.coders.SerializableCoder
System.out.println(coder.getClass());
  }
}
{code}


> SerializableCoder has higher precedence over ProtoCoder in 
> CoderRegistry#getCoder
> -
>
> Key: BEAM-2658
> URL: https://issues.apache.org/jira/browse/BEAM-2658
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0
>Reporter: Neville Li
>Assignee: Davor Bonaci
>Priority: Minor
>
> {code}
> import com.google.protobuf.Timestamp;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.coders.CannotProvideCoderException;
> import org.apache.beam.sdk.coders.Coder;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> public class CoderTest {
>   public static void main(String[] args) throws CannotProvideCoderException {
> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
> Pipeline pipeline = Pipeline.create(options);
> Coder coder = 
> pipeline.getCoderRegistry().getCoder(Timestamp.class);
> // class org.apache.beam.sdk.coders.SerializableCoder
> System.out.println(coder.getClass());
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder

2017-07-22 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-2658:
-
Description: 
{code}
public class CoderTest {
  public static void main(String[] args) throws CannotProvideCoderException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

Coder coder = 
pipeline.getCoderRegistry().getCoder(Timestamp.class);

// class org.apache.beam.sdk.coders.SerializableCoder
System.out.println(coder.getClass());
  }
}
{code}

  was:
{code)
public class CoderTest {
  public static void main(String[] args) throws CannotProvideCoderException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

Coder coder = 
pipeline.getCoderRegistry().getCoder(Timestamp.class);

// class org.apache.beam.sdk.coders.SerializableCoder
System.out.println(coder.getClass());
  }
}
{code)


> SerializableCoder has higher precedence over ProtoCoder in 
> CoderRegistry#getCoder
> -
>
> Key: BEAM-2658
> URL: https://issues.apache.org/jira/browse/BEAM-2658
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0
>Reporter: Neville Li
>Assignee: Davor Bonaci
>Priority: Minor
>
> {code}
> public class CoderTest {
>   public static void main(String[] args) throws CannotProvideCoderException {
> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
> Pipeline pipeline = Pipeline.create(options);
> Coder coder = 
> pipeline.getCoderRegistry().getCoder(Timestamp.class);
> // class org.apache.beam.sdk.coders.SerializableCoder
> System.out.println(coder.getClass());
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2658) SerializableCoder has high precedence over ProtoCoder in CoderRegistry#getCoder

2017-07-22 Thread Neville Li (JIRA)
Neville Li created BEAM-2658:


 Summary: SerializableCoder has high precedence over ProtoCoder in 
CoderRegistry#getCoder
 Key: BEAM-2658
 URL: https://issues.apache.org/jira/browse/BEAM-2658
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Affects Versions: 2.0.0
Reporter: Neville Li
Assignee: Davor Bonaci
Priority: Minor


{{
public class CoderTest {
  public static void main(String[] args) throws CannotProvideCoderException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);

Coder coder = 
pipeline.getCoderRegistry().getCoder(Timestamp.class);

// class org.apache.beam.sdk.coders.SerializableCoder
System.out.println(coder.getClass());
  }
}
}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2658) SerializableCoder has higher precedence over ProtoCoder in CoderRegistry#getCoder

2017-07-22 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-2658:
-
Summary: SerializableCoder has higher precedence over ProtoCoder in 
CoderRegistry#getCoder  (was: SerializableCoder has high precedence over 
ProtoCoder in CoderRegistry#getCoder)

> SerializableCoder has higher precedence over ProtoCoder in 
> CoderRegistry#getCoder
> -
>
> Key: BEAM-2658
> URL: https://issues.apache.org/jira/browse/BEAM-2658
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Affects Versions: 2.0.0
>Reporter: Neville Li
>Assignee: Davor Bonaci
>Priority: Minor
>
> {{
> public class CoderTest {
>   public static void main(String[] args) throws CannotProvideCoderException {
> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
> Pipeline pipeline = Pipeline.create(options);
> Coder coder = 
> pipeline.getCoderRegistry().getCoder(Timestamp.class);
> // class org.apache.beam.sdk.coders.SerializableCoder
> System.out.println(coder.getClass());
>   }
> }
> }}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2453) The Java DirectRunner should exercise all parts of a CombineFn

2017-07-19 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16093359#comment-16093359
 ] 

Neville Li commented on BEAM-2453:
--

Here's an example of incorrect use of {{Combine.perKey}} that could be 
identified by this fix:
https://github.com/spotify/scio/issues/729

> The Java DirectRunner should exercise all parts of a CombineFn
> --
>
> Key: BEAM-2453
> URL: https://issues.apache.org/jira/browse/BEAM-2453
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>
> Specifically it should:
> Create some number of accumulators; add elements to these accumulators, merge 
> the created accumulators, and extract the output.
> This can be performed by replacing the {{Combine.perKey}} composite transform 
> with a multi-step combine {{CombineBundles -> GroupByKey -> 
> MergeAccumulators}}
> Where {{CombineBundles}} is a {{ParDo}} which takes input {{KV}} 
> and produces {{KV}}, outputting in {{FinishBundle}} (this can only 
> be performed if the Combine takes no side inputs or does not have merging 
> windows). {{MergeAccumulators}} takes in {{KV}} and 
> produces {{KV}} by merging all of the accumulators and extracting 
> the output.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2532) BigQueryIO source should avoid expensive JSON schema parsing for every record

2017-07-17 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16090599#comment-16090599
 ] 

Neville Li commented on BEAM-2532:
--

Would love to see a fix in the next release. This is a big performance 
regression for us since we use BigQuery heavily.

> BigQueryIO source should avoid expensive JSON schema parsing for every record
> -
>
> Key: BEAM-2532
> URL: https://issues.apache.org/jira/browse/BEAM-2532
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Affects Versions: 2.0.0
>Reporter: Marian Dvorsky
>Assignee: Chamikara Jayalath
>Priority: Minor
>
> BigQueryIO source converts the schema from JSON for every input row, here:
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java#L159
> This is the performance bottleneck in a simple pipeline with BigQueryIO 
> source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam

2017-05-04 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996831#comment-15996831
 ] 

Neville Li commented on BEAM-302:
-

Yes that ecosystem has too many build params, scala version, spark version, 
hadoop version, etc.
2.10 is outdated, quite different from 2.11/2.12 and hard to maintain. That's 
why we stopped supporting it. 2.12 support should be available soon once some 
compiler lambda serialization issues are addressed.

> Add Scio Scala DSL to Beam
> --
>
> Key: BEAM-302
> URL: https://issues.apache.org/jira/browse/BEAM-302
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-ideas
>Reporter: Jean-Baptiste Onofré
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam

2017-05-03 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15995075#comment-15995075
 ] 

Neville Li commented on BEAM-302:
-

Looks like Spark runner still depends on 1.6.3. Can you give Spark 1.6 a shot 
instead?
https://mvnrepository.com/artifact/org.apache.beam/beam-runners-spark/0.6.0

We'd love to support all runners but we use Dataflow runner only and vanilla 
Spark. Contributions will be awesome and are definitely welcome. Feel free to 
submit issues or PRs on our GH repo. There's also a GItter room and a Google 
group for discussions.
https://github.com/spotify/scio

> Add Scio Scala DSL to Beam
> --
>
> Key: BEAM-302
> URL: https://issues.apache.org/jira/browse/BEAM-302
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-ideas
>Reporter: Jean-Baptiste Onofré
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam

2017-05-02 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15994005#comment-15994005
 ] 

Neville Li commented on BEAM-302:
-

You need the spark runner dependency which is not included by default.

> Add Scio Scala DSL to Beam
> --
>
> Key: BEAM-302
> URL: https://issues.apache.org/jira/browse/BEAM-302
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-ideas
>Reporter: Jean-Baptiste Onofré
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam

2017-04-03 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953584#comment-15953584
 ] 

Neville Li commented on BEAM-302:
-

We prefer to keep it separate for now mainly for logistics reasons:

- we use SBT with lots of custom logic
- we release very often, once every 1-2 weeks
- we monkey patch Beam bugs, test in our production jobs, before upstream Beam 
release
- we use a lightweight collaboration model, mainly just Github issues & PRs
- there're only 3 Scio developers at Spotify supporting 150+ internal users and 
many external ones, all running on Dataflow

However I also want to point out that nothing should stop those interested from 
trying it out or contributing
- we decoupled Dataflow runner as much as possible
- Scio should run on other runners without modification, just a matter of 
changing dependencies and arguments
- there're still parts coupled with GCP and Dataflow runner but hopefully we 
can gradually decouple them as the file system and other related API improves
- it'd be great to see bug reports and PRs from the community

> Add Scio Scala DSL to Beam
> --
>
> Key: BEAM-302
> URL: https://issues.apache.org/jira/browse/BEAM-302
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-ideas
>Reporter: Jean-Baptiste Onofré
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-302) Add Scio Scala DSL to Beam

2017-04-01 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li reassigned BEAM-302:
---

Assignee: (was: Neville Li)

> Add Scio Scala DSL to Beam
> --
>
> Key: BEAM-302
> URL: https://issues.apache.org/jira/browse/BEAM-302
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-ideas
>Reporter: Jean-Baptiste Onofré
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-1518) Support deflate (zlib) in CompressedSource and FileBasedSink

2017-03-20 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li closed BEAM-1518.

Resolution: Fixed

> Support deflate (zlib) in CompressedSource and FileBasedSink
> 
>
> Key: BEAM-1518
> URL: https://issues.apache.org/jira/browse/BEAM-1518
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 0.5.0
>Reporter: Neville Li
>Assignee: Neville Li
>Priority: Minor
> Fix For: 0.6.0
>
>
> `.deflate` files are quite common in Hadoop and also supported by TensorFlow 
> in TFRecord file format.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1518) Support deflate (zlib) in CompressedSource and FileBasedSink

2017-03-20 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-1518:
-
Fix Version/s: 0.6.0

> Support deflate (zlib) in CompressedSource and FileBasedSink
> 
>
> Key: BEAM-1518
> URL: https://issues.apache.org/jira/browse/BEAM-1518
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 0.5.0
>Reporter: Neville Li
>Assignee: Neville Li
>Priority: Minor
> Fix For: 0.6.0
>
>
> `.deflate` files are quite common in Hadoop and also supported by TensorFlow 
> in TFRecord file format.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1520) Implement TFRecordIO (Reading/writing Tensorflow Standard format)

2017-02-21 Thread Neville Li (JIRA)
Neville Li created BEAM-1520:


 Summary: Implement TFRecordIO (Reading/writing Tensorflow Standard 
format)
 Key: BEAM-1520
 URL: https://issues.apache.org/jira/browse/BEAM-1520
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core
Affects Versions: 0.5.0
Reporter: Neville Li
Assignee: Davor Bonaci
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1520) Implement TFRecordIO (Reading/writing Tensorflow Standard format)

2017-02-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li reassigned BEAM-1520:


Assignee: Neville Li  (was: Davor Bonaci)

> Implement TFRecordIO (Reading/writing Tensorflow Standard format)
> -
>
> Key: BEAM-1520
> URL: https://issues.apache.org/jira/browse/BEAM-1520
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 0.5.0
>Reporter: Neville Li
>Assignee: Neville Li
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1519) Support snappy in CompressedSource and FileBasedSink

2017-02-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li reassigned BEAM-1519:


Assignee: (was: Neville Li)

> Support snappy in CompressedSource and FileBasedSink
> 
>
> Key: BEAM-1519
> URL: https://issues.apache.org/jira/browse/BEAM-1519
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 0.5.0
>Reporter: Neville Li
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1518) Support deflate (zlib) in CompressedSource and FileBasedSink

2017-02-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-1518:
-
Summary: Support deflate (zlib) in CompressedSource and FileBasedSink  
(was: Support ZLIB (deflate) in CompressedSource and FileBasedSink)

> Support deflate (zlib) in CompressedSource and FileBasedSink
> 
>
> Key: BEAM-1518
> URL: https://issues.apache.org/jira/browse/BEAM-1518
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 0.5.0
>Reporter: Neville Li
>Assignee: Neville Li
>Priority: Minor
>
> `.deflate` files are quite common in Hadoop and also supported by TensorFlow 
> in TFRecord file format.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1518) Support ZLIB (deflate) in CompressedSource and FileBasedSink

2017-02-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-1518:
-
Description: `.deflate` files are quite common in Hadoop and also supported 
by TensorFlow in TFRecord file format.

> Support ZLIB (deflate) in CompressedSource and FileBasedSink
> 
>
> Key: BEAM-1518
> URL: https://issues.apache.org/jira/browse/BEAM-1518
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 0.5.0
>Reporter: Neville Li
>Assignee: Neville Li
>Priority: Minor
>
> `.deflate` files are quite common in Hadoop and also supported by TensorFlow 
> in TFRecord file format.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1519) CLONE - Support snappy in CompressedSource and FileBasedSink

2017-02-21 Thread Neville Li (JIRA)
Neville Li created BEAM-1519:


 Summary: CLONE - Support snappy in CompressedSource and 
FileBasedSink
 Key: BEAM-1519
 URL: https://issues.apache.org/jira/browse/BEAM-1519
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core
Affects Versions: 0.5.0
Reporter: Neville Li
Assignee: Neville Li
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1519) Support snappy in CompressedSource and FileBasedSink

2017-02-21 Thread Neville Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neville Li updated BEAM-1519:
-
Summary: Support snappy in CompressedSource and FileBasedSink  (was: CLONE 
- Support snappy in CompressedSource and FileBasedSink)

> Support snappy in CompressedSource and FileBasedSink
> 
>
> Key: BEAM-1519
> URL: https://issues.apache.org/jira/browse/BEAM-1519
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Affects Versions: 0.5.0
>Reporter: Neville Li
>Assignee: Neville Li
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1518) Support ZLIB (deflate) in CompressedSource and FileBasedSink

2017-02-21 Thread Neville Li (JIRA)
Neville Li created BEAM-1518:


 Summary: Support ZLIB (deflate) in CompressedSource and 
FileBasedSink
 Key: BEAM-1518
 URL: https://issues.apache.org/jira/browse/BEAM-1518
 Project: Beam
  Issue Type: Improvement
  Components: sdk-java-core
Affects Versions: 0.5.0
Reporter: Neville Li
Assignee: Neville Li
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-298) Make TestPipeline implement the TestRule interface

2017-02-10 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862201#comment-15862201
 ] 

Neville Li edited comment on BEAM-298 at 2/11/17 4:12 AM:
--

That didn't work for me. I had to add it as a {{compile}} scope.


was (Author: sinisa_lyh):
That didn't work for me. I had to add it as a {compile} scope.

> Make TestPipeline implement the TestRule interface
> --
>
> Key: BEAM-298
> URL: https://issues.apache.org/jira/browse/BEAM-298
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Thomas Groh
>Assignee: Stas Levin
>Priority: Minor
> Fix For: 0.5.0
>
>
> https://github.com/junit-team/junit4/wiki/Rules
> A JUnit Rule allows a test to use a field annotated with @Rule to wrap 
> executing tests. Doing so allows the TestPipeline to, at the time the test 
> completes, assert that all applied transforms have been executed. This 
> ensures that all unit tests that utilize a TestPipeline rule either are 
> declared to explicitly not expect to execute or have executed the pipeline.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-298) Make TestPipeline implement the TestRule interface

2017-02-10 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15862201#comment-15862201
 ] 

Neville Li commented on BEAM-298:
-

That didn't work for me. I had to add it as a {compile} scope.

> Make TestPipeline implement the TestRule interface
> --
>
> Key: BEAM-298
> URL: https://issues.apache.org/jira/browse/BEAM-298
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Thomas Groh
>Assignee: Stas Levin
>Priority: Minor
> Fix For: 0.5.0
>
>
> https://github.com/junit-team/junit4/wiki/Rules
> A JUnit Rule allows a test to use a field annotated with @Rule to wrap 
> executing tests. Doing so allows the TestPipeline to, at the time the test 
> completes, assert that all applied transforms have been executed. This 
> ensures that all unit tests that utilize a TestPipeline rule either are 
> declared to explicitly not expect to execute or have executed the pipeline.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-298) Make TestPipeline implement the TestRule interface

2017-02-10 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15861795#comment-15861795
 ] 

Neville Li edited comment on BEAM-298 at 2/10/17 8:08 PM:
--

As a result of this change I need to include {{junit}} in my dependencies or 
it'll fail with a class not found exception. Is this expected?

{code}
java.lang.NoClassDefFoundError: org/junit/rules/TestRule
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.spotify.scio.ContextAndArgs$.apply(ScioContext.scala:65)
at 
com.spotify.scio.examples.MinimalWordCount$.main(MinimalWordCount.scala:35)
at 
com.spotify.scio.examples.MinimalWordCount.main(MinimalWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sbt.Run.invokeMain(Run.scala:67)
at sbt.Run.run0(Run.scala:61)
at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
at sbt.Logger$$anon$4.apply(Logger.scala:84)
at sbt.TrapExit$App.run(TrapExit.scala:248)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.junit.rules.TestRule
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
{code}


was (Author: sinisa_lyh):
As a result of this change I need to include {{junit}} in my dependencies or 
it'll fail with a class not found exception. Is this expected?

{{
java.lang.NoClassDefFoundError: org/junit/rules/TestRule
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.spotify.scio.ContextAndArgs$.apply(ScioContext.scala:65)
at 
com.spotify.scio.examples.MinimalWordCount$.main(MinimalWordCount.scala:35)
at 
com.spotify.scio.examples.MinimalWordCount.main(MinimalWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sbt.Run.invokeMain(Run.scala:67)
at sbt.Run.run0(Run.scala:61)
at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
at sbt.Logger$$anon$4.apply(Logger.scala:84)
at sbt.TrapExit$App.run(TrapExit.scala:248)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.junit.rules.TestRule
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
}}

> Make TestPipeline implement the TestRule interface
> --
>
> Key: BEAM-298
> URL: 

[jira] [Commented] (BEAM-298) Make TestPipeline implement the TestRule interface

2017-02-10 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15861795#comment-15861795
 ] 

Neville Li commented on BEAM-298:
-

As a result of this change I need to include {{junit}} in my dependencies or 
it'll fail with a class not found exception. Is this expected?

{{
java.lang.NoClassDefFoundError: org/junit/rules/TestRule
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at com.spotify.scio.ContextAndArgs$.apply(ScioContext.scala:65)
at 
com.spotify.scio.examples.MinimalWordCount$.main(MinimalWordCount.scala:35)
at 
com.spotify.scio.examples.MinimalWordCount.main(MinimalWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sbt.Run.invokeMain(Run.scala:67)
at sbt.Run.run0(Run.scala:61)
at sbt.Run.sbt$Run$$execute$1(Run.scala:51)
at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:55)
at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
at sbt.Run$$anonfun$run$1.apply(Run.scala:55)
at sbt.Logger$$anon$4.apply(Logger.scala:84)
at sbt.TrapExit$App.run(TrapExit.scala:248)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.junit.rules.TestRule
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
}}

> Make TestPipeline implement the TestRule interface
> --
>
> Key: BEAM-298
> URL: https://issues.apache.org/jira/browse/BEAM-298
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Thomas Groh
>Assignee: Stas Levin
>Priority: Minor
> Fix For: 0.5.0
>
>
> https://github.com/junit-team/junit4/wiki/Rules
> A JUnit Rule allows a test to use a field annotated with @Rule to wrap 
> executing tests. Doing so allows the TestPipeline to, at the time the test 
> completes, assert that all applied transforms have been executed. This 
> ensures that all unit tests that utilize a TestPipeline rule either are 
> declared to explicitly not expect to execute or have executed the pipeline.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-302) Add Scio Scala DSL to Beam

2017-01-24 Thread Neville Li (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836553#comment-15836553
 ] 

Neville Li commented on BEAM-302:
-

WIP branch here using 0.4.0 https://github.com/spotify/scio/tree/apache-beam
Ticket https://github.com/spotify/scio/issues/279

> Add Scio Scala DSL to Beam
> --
>
> Key: BEAM-302
> URL: https://issues.apache.org/jira/browse/BEAM-302
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-ideas
>Reporter: Jean-Baptiste Onofré
>Assignee: Neville Li
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)