[jira] [Comment Edited] (BEAM-2980) BagState.isEmpty needs a tighter spec

2018-05-23 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487964#comment-16487964
 ] 

Ben Chambers edited comment on BEAM-2980 at 5/23/18 8:15 PM:
-

I think `StateFuture` was considered, but intentionally not used because it 
would generally suggest we snapshot the value from the time the future is 
created. The intuition behind `ReadableState` should be it is like a ref-cell. 
As a reference, when you `read` it you get the current value stored in the 
location it references.

This is also why `read()` javadoc indicates the *current* value is returned, 
rather than the value from when the `ReadableState` was created.


was (Author: bchambers):
I think `StateFuture` was considered, but intentionally not used because it 
would generally suggest we snapshot the value from the time the future is 
created. The intuition behind `ReadableState` should be it is like a ref-cell. 
As a reference, when you `read` it you get the current value stored in the 
location it references.

> BagState.isEmpty needs a tighter spec
> -
>
> Key: BEAM-2980
> URL: https://issues.apache.org/jira/browse/BEAM-2980
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Kenneth Knowles
>Assignee: Daniel Mills
>Priority: Major
>
> Consider the following:
> {code}
> BagState myBag = // empty
> ReadableState isMyBagEmpty = myBag.isEmpty();
> myBag.add(bizzle);
> bool empty = isMyBagEmpty.read();
> {code}
> Should {{empty}} be true or false? We need a consistent answer, across all 
> kinds of state, when snapshots are required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-2980) BagState.isEmpty needs a tighter spec

2018-05-23 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16487964#comment-16487964
 ] 

Ben Chambers commented on BEAM-2980:


I think `StateFuture` was considered, but intentionally not used because it 
would generally suggest we snapshot the value from the time the future is 
created. The intuition behind `ReadableState` should be it is like a ref-cell. 
As a reference, when you `read` it you get the current value stored in the 
location it references.

> BagState.isEmpty needs a tighter spec
> -
>
> Key: BEAM-2980
> URL: https://issues.apache.org/jira/browse/BEAM-2980
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Kenneth Knowles
>Assignee: Daniel Mills
>Priority: Major
>
> Consider the following:
> {code}
> BagState myBag = // empty
> ReadableState isMyBagEmpty = myBag.isEmpty();
> myBag.add(bizzle);
> bool empty = isMyBagEmpty.read();
> {code}
> Should {{empty}} be true or false? We need a consistent answer, across all 
> kinds of state, when snapshots are required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-3806) DirectRunner hangs if multiple timers set in the same bundle

2018-03-09 Thread Ben Chambers (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chambers resolved BEAM-3806.

   Resolution: Fixed
Fix Version/s: 2.5.0

Fixed by PR https://github.com/apache/beam/pull/4829

> DirectRunner hangs if multiple timers set in the same bundle
> 
>
> Key: BEAM-3806
> URL: https://issues.apache.org/jira/browse/BEAM-3806
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Reporter: Ben Chambers
>Assignee: Thomas Groh
>Priority: Major
> Fix For: 2.5.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> See the repro below:
> {code:java}
> package com.simbly.beam.cassandra;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.coders.StringUtf8Coder;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.testing.PAssert;
> import org.apache.beam.sdk.testing.TestPipeline;
> import org.apache.beam.sdk.testing.TestStream;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.junit.Rule;
> import org.junit.Test;
> public class DirectRunnerTest {
>   @Rule
>   public TestPipeline pipeline = TestPipeline.create();
>   @Test
>   public void badTimerBehavior() {
> TestStream> stream = TestStream
> .create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
> .addElements(KV.of("key1", "v1"))
> .advanceWatermarkToInfinity();
> PCollection result = pipeline
> .apply(stream)
> .apply(ParDo.of(new TestDoFn()));
> PAssert.that(result).containsInAnyOrder("It works");
> pipeline.run().waitUntilFinish();
>   }
>   private static class TestDoFn extends DoFn, String> {
> @TimerId("timer")
> private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
> @ProcessElement
> public void process(ProcessContext c,
> @TimerId("timer") Timer timer) {
>   timer.offset(Duration.standardMinutes(10)).setRelative();
>   timer.offset(Duration.standardMinutes(30)).setRelative();
> }
> @OnTimer("timer")
> public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
>   c.output("It works");
> }
>   }
> }
> {code}
> From inspection, this seems to be caused by the logic in 
> [WatermarkManager|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java#L313],
>  which does the following if there are multiple timers for akey:
>  # Adds the first timer to the `pendingTimers`, `keyTimers`, and 
> `existingTimersForKey`.
>  # Removes the first timer from `keyTimers`
>  # Adds the second timer to `keyTimers` and `existingTimersForKey`.
> This leads to inconsistencies since pendingTimers has only the first timer, 
> keyTimers only the second, and existingTimers has both. This becomes more 
> problematic since one of these lists is used for *firing* (and thus releasing 
> holds) and the other is used for holds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3825) Gradle presubmit not running all tests

2018-03-09 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-3825:
--

 Summary: Gradle presubmit not running all tests
 Key: BEAM-3825
 URL: https://issues.apache.org/jira/browse/BEAM-3825
 Project: Beam
  Issue Type: Bug
  Components: testing
Reporter: Ben Chambers
Assignee: Jason Kuster


For PR https://github.com/apache/beam/pull/4829

The following test failed (flakily) and succeeded on retry in Maven:

[https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/org.apache.beam$beam-runners-direct-java/18505/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/]

 

While investigating the original failure, I was unable to find it in the logs 
of the Gradle presubmit, making we worry that it may not be covering all tsets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3806) DirectRunner hangs if multiple timers set in the same bundle

2018-03-07 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-3806:
--

 Summary: DirectRunner hangs if multiple timers set in the same 
bundle
 Key: BEAM-3806
 URL: https://issues.apache.org/jira/browse/BEAM-3806
 Project: Beam
  Issue Type: Bug
  Components: runner-direct
Reporter: Ben Chambers
Assignee: Thomas Groh


See the repro below:
{code:java}
package com.simbly.beam.cassandra;

import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;

public class DirectRunnerTest {

  @Rule
  public TestPipeline pipeline = TestPipeline.create();

  @Test
  public void badTimerBehavior() {
TestStream> stream = TestStream
.create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.addElements(KV.of("key1", "v1"))
.advanceWatermarkToInfinity();

PCollection result = pipeline
.apply(stream)
.apply(ParDo.of(new TestDoFn()));
PAssert.that(result).containsInAnyOrder("It works");

pipeline.run().waitUntilFinish();
  }

  private static class TestDoFn extends DoFn, String> {
@TimerId("timer")
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void process(ProcessContext c,
@TimerId("timer") Timer timer) {
  timer.offset(Duration.standardMinutes(10)).setRelative();
  timer.offset(Duration.standardMinutes(30)).setRelative();
}

@OnTimer("timer")
public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
  c.output("It works");
}
  }
}
{code}
>From inspection, this seems to be caused by the logic in 
>[WatermarkManager|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java#L313],
> which does the following if there are multiple timers for akey:
 # Adds the first timer to the `pendingTimers`, `keyTimers`, and 
`existingTimersForKey`.
 # Removes the first timer from `keyTimers`
 # Adds the second timer to `keyTimers` and `existingTimersForKey`.

This leads to inconsistencies since pendingTimers has only the first timer, 
keyTimers only the second, and existingTimers has both. This becomes more 
problematic since one of these lists is used for *firing* (and thus releasing 
holds) and the other is used for holds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-3159) DoFnTester should be deprecated in favor of TestPipeline

2017-11-08 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-3159:
--

 Summary: DoFnTester should be deprecated in favor of TestPipeline
 Key: BEAM-3159
 URL: https://issues.apache.org/jira/browse/BEAM-3159
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Ben Chambers
Assignee: Kenneth Knowles
Priority: Minor


Reasons:

1. The logical unit within a Beam pipeline is a transform. Either a small 
transform like a ParDo or a larger composite transform. Unit tests should focus 
on these units, rather than probing specific behaviors of the user-defined 
functions.

2. The way that a runner interacts with a user-defined function is not 
necessarily obvious. DoFnTester allows testing non-sensical cases that wouldn't 
arise in practice, since it allows low-level interactions with the actual UDFs.

Instead, we should encourage the use of TestPipeline with the direct runner. 
This allows testing a single transform (such as a ParDo running a UDF) in 
context. It also makes it easier to test things like side-inputs and multiple 
outputs, since you use the same techniques in the test as you would in a real 
pipeline, rather than requiring a whole new API.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2058) BigQuery load job id should be generated at run time, not submission time

2017-09-27 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16183303#comment-16183303
 ] 

Ben Chambers commented on BEAM-2058:


Reuven -- it looks like the PR went in to fix this. Should it be marked as 
closed / added to appropriate release notes / etc?

> BigQuery load job id should be generated at run time, not submission time
> -
>
> Key: BEAM-2058
> URL: https://issues.apache.org/jira/browse/BEAM-2058
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-gcp
>Reporter: Reuven Lax
>Assignee: Reuven Lax
>
> Currently the job id is generated at submission time, which means that 
> rerunning template jobs will produce the same job id. Generate at run time 
> instead, so a different job id is generated on each execution.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2996) Metric names should not be null or empty

2017-09-27 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-2996:
--

 Summary: Metric names should not be null or empty
 Key: BEAM-2996
 URL: https://issues.apache.org/jira/browse/BEAM-2996
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core, sdk-py-core
Reporter: Ben Chambers
Assignee: Ben Chambers
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2982) PubSubIO.readMessages().fromSubscription(...) doesn't work with ValueProvider

2017-09-22 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-2982:
--

 Summary: PubSubIO.readMessages().fromSubscription(...) doesn't 
work with ValueProvider
 Key: BEAM-2982
 URL: https://issues.apache.org/jira/browse/BEAM-2982
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-gcp
Reporter: Ben Chambers
Assignee: Thomas Groh


Originally reported on Stack Overflow:

https://stackoverflow.com/questions/46360584/apache-beam-template-runtime-context-error

---

In the `PubsubUnboundedSource#expand` method we create the PubsubSource:

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L1399

Creating the PubsubSource calls `getSubscription` which attempts to get the 
value out of a value provider.

To support templatization, the PubsubSource needs to take the ValueProvider, 
and only get the subscription out at pipeline execution time.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2577) IO tests should exercise Runtime Values where supported

2017-08-23 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16139077#comment-16139077
 ] 

Ben Chambers commented on BEAM-2577:


I think that PR demonstrates one place we could do this. This bug is to track 
the fact that it should be done for every IO that uses Runtime Values, and it 
should be done using the runner, etc.

> IO tests should exercise Runtime Values where supported
> ---
>
> Key: BEAM-2577
> URL: https://issues.apache.org/jira/browse/BEAM-2577
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions, testing
>Reporter: Ben Chambers
>Assignee: Davor Bonaci
>
> The only tests I have found for `ValueProvider` parameterized methods is 
> that they are not evaluated during pipeline construction time. This is 
> missing out on several important pieces:
> 1. 
> https://stackoverflow.com/questions/44967898/notify-when-textio-is-done-writing-a-file
>  seems to be a problem with an AvroIO write using a RuntimeValueProvider 
> being non-serializable (current theory is because of an anonymous inner class 
> capturing the enclosing AvroIO.Write instance which has non-serializable 
> fields).
> 2. Testing that the code paths that actually read the file do so correctly 
> when parameterized.
> We should update the developer documentation to describe what the 
> requirements are for a parameterized IO and provide guidance on what tests 
> are needed and how to write them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2758) ParDo should indicate what "features" are used in DisplayData

2017-08-09 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-2758:
--

 Summary: ParDo should indicate what "features" are used in 
DisplayData
 Key: BEAM-2758
 URL: https://issues.apache.org/jira/browse/BEAM-2758
 Project: Beam
  Issue Type: Improvement
  Components: runner-core
Reporter: Ben Chambers
Assignee: Kenneth Knowles


ParDo now exposes numerous features, such as SplittableDoFn, State, Timers, 
etc. It would be good if the specific features being used where readily visible 
within the Display Data of the given Pardo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2708) Decompressing bzip2 files with multiple "streams" only reads the first stream

2017-08-01 Thread Ben Chambers (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chambers updated BEAM-2708:
---
Summary: Decompressing bzip2 files with multiple "streams" only reads the 
first stream  (was: Support for pbzip2 in IO)

> Decompressing bzip2 files with multiple "streams" only reads the first stream
> -
>
> Key: BEAM-2708
> URL: https://issues.apache.org/jira/browse/BEAM-2708
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions, sdk-py
>Reporter: Pablo Estrada
>Assignee: Ben Chambers
>
> I'm not sure which components to file this against. A user has observed that 
> pbzip2 files are not being properly decompressed:
> https://stackoverflow.com/questions/45439117/google-dataflow-only-partly-uncompressing-files-compressed-with-pbzip2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-2708) Support for pbzip2 in IO

2017-08-01 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16109703#comment-16109703
 ] 

Ben Chambers edited comment on BEAM-2708 at 8/1/17 8:35 PM:


This looks to be a bug in the CompressedSource support for BZIP2. Specifically, 
we create the stream with:

{code:java}
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
{code}

Which defaults to {{decompressConcatenated = false}}. As a result only the 
first "stream" within the {{bz2}} file is actually read.

The fix is easy -- change that code to:

{code:java}

return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel), 
true));
{code}

But coming up with a test is a bit harder.


was (Author: bchambers):
This looks to be a bug in the CompressedSource support for BZIP2. Specifically, 
we create the stream with:

{{
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
}}

Which defaults to {{decompressConcatenated = false}}. As a result only the 
first "stream" within the {{bz2}} file is actually read.

The fix is easy -- change that code to:

{{
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel), 
true));
}}

But coming up with a test is a bit harder.

> Support for pbzip2 in IO
> 
>
> Key: BEAM-2708
> URL: https://issues.apache.org/jira/browse/BEAM-2708
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions, sdk-py
>Reporter: Pablo Estrada
>Assignee: Ben Chambers
>
> I'm not sure which components to file this against. A user has observed that 
> pbzip2 files are not being properly decompressed:
> https://stackoverflow.com/questions/45439117/google-dataflow-only-partly-uncompressing-files-compressed-with-pbzip2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-2708) Support for pbzip2 in IO

2017-08-01 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16109703#comment-16109703
 ] 

Ben Chambers edited comment on BEAM-2708 at 8/1/17 8:35 PM:


This looks to be a bug in the CompressedSource support for BZIP2. Specifically, 
we create the stream with:

{code:java}
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
{code}

Which defaults to {{decompressConcatenated = false}}. As a result only the 
first "stream" within the {{bz2}} file is actually read.

The fix is easy -- change that code to:

{code:java}
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel), true));
{code}

But coming up with a test is a bit harder.


was (Author: bchambers):
This looks to be a bug in the CompressedSource support for BZIP2. Specifically, 
we create the stream with:

{code:java}
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
{code}

Which defaults to {{decompressConcatenated = false}}. As a result only the 
first "stream" within the {{bz2}} file is actually read.

The fix is easy -- change that code to:

{code:java}

return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel), 
true));
{code}

But coming up with a test is a bit harder.

> Support for pbzip2 in IO
> 
>
> Key: BEAM-2708
> URL: https://issues.apache.org/jira/browse/BEAM-2708
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions, sdk-py
>Reporter: Pablo Estrada
>Assignee: Ben Chambers
>
> I'm not sure which components to file this against. A user has observed that 
> pbzip2 files are not being properly decompressed:
> https://stackoverflow.com/questions/45439117/google-dataflow-only-partly-uncompressing-files-compressed-with-pbzip2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-2708) Support for pbzip2 in IO

2017-08-01 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16109703#comment-16109703
 ] 

Ben Chambers edited comment on BEAM-2708 at 8/1/17 8:34 PM:


This looks to be a bug in the CompressedSource support for BZIP2. Specifically, 
we create the stream with:

{{
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
}}

Which defaults to {{decompressConcatenated = false}}. As a result only the 
first "stream" within the {{bz2}} file is actually read.

The fix is easy -- change that code to:

{{
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel), 
true));
}}

But coming up with a test is a bit harder.


was (Author: bchambers):
This looks to be a bug in the CompressedSource support for BZIP2. Specifically, 
we create the stream with:

```
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
```

Which defaults to `decompressConcatenated = false`. As a result only the first 
"stream" within the `bz2` file is actually read.

The fix is easy -- change that code to:

```
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel), 
true));
```

But coming up with a test is a bit harder.

> Support for pbzip2 in IO
> 
>
> Key: BEAM-2708
> URL: https://issues.apache.org/jira/browse/BEAM-2708
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions, sdk-py
>Reporter: Pablo Estrada
>Assignee: Ben Chambers
>
> I'm not sure which components to file this against. A user has observed that 
> pbzip2 files are not being properly decompressed:
> https://stackoverflow.com/questions/45439117/google-dataflow-only-partly-uncompressing-files-compressed-with-pbzip2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2708) Support for pbzip2 in IO

2017-08-01 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16109703#comment-16109703
 ] 

Ben Chambers commented on BEAM-2708:


This looks to be a bug in the CompressedSource support for BZIP2. Specifically, 
we create the stream with:

```
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel)));
```

Which defaults to `decompressConcatenated = false`. As a result only the first 
"stream" within the `bz2` file is actually read.

The fix is easy -- change that code to:

```
return Channels.newChannel(
new BZip2CompressorInputStream(Channels.newInputStream(channel), 
true));
```

But coming up with a test is a bit harder.

> Support for pbzip2 in IO
> 
>
> Key: BEAM-2708
> URL: https://issues.apache.org/jira/browse/BEAM-2708
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions, sdk-py
>Reporter: Pablo Estrada
>Assignee: Ben Chambers
>
> I'm not sure which components to file this against. A user has observed that 
> pbzip2 files are not being properly decompressed:
> https://stackoverflow.com/questions/45439117/google-dataflow-only-partly-uncompressing-files-compressed-with-pbzip2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2708) Support for pbzip2 in IO

2017-08-01 Thread Ben Chambers (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chambers reassigned BEAM-2708:
--

Assignee: Ben Chambers  (was: Chamikara Jayalath)

> Support for pbzip2 in IO
> 
>
> Key: BEAM-2708
> URL: https://issues.apache.org/jira/browse/BEAM-2708
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions, sdk-py
>Reporter: Pablo Estrada
>Assignee: Ben Chambers
>
> I'm not sure which components to file this against. A user has observed that 
> pbzip2 files are not being properly decompressed:
> https://stackoverflow.com/questions/45439117/google-dataflow-only-partly-uncompressing-files-compressed-with-pbzip2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (BEAM-2059) Implement Metrics support for streaming Dataflow runner

2017-07-12 Thread Ben Chambers (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chambers resolved BEAM-2059.

Resolution: Fixed

> Implement Metrics support for streaming Dataflow runner
> ---
>
> Key: BEAM-2059
> URL: https://issues.apache.org/jira/browse/BEAM-2059
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-dataflow
>Reporter: Devon Meunier
>Assignee: Ben Chambers
>Priority: Minor
> Fix For: 2.1.0
>
>
> Metrics are currently only available in batch mode.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2059) Implement Metrics support for streaming Dataflow runner

2017-07-12 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084320#comment-16084320
 ] 

Ben Chambers commented on BEAM-2059:


This should be fixed in the Dataflow service in the next release.

> Implement Metrics support for streaming Dataflow runner
> ---
>
> Key: BEAM-2059
> URL: https://issues.apache.org/jira/browse/BEAM-2059
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-dataflow
>Reporter: Devon Meunier
>Priority: Minor
> Fix For: 2.1.0
>
>
> Metrics are currently only available in batch mode.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2059) Implement Metrics support for streaming Dataflow runner

2017-07-12 Thread Ben Chambers (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chambers reassigned BEAM-2059:
--

 Assignee: Ben Chambers
Fix Version/s: 2.1.0

> Implement Metrics support for streaming Dataflow runner
> ---
>
> Key: BEAM-2059
> URL: https://issues.apache.org/jira/browse/BEAM-2059
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-dataflow
>Reporter: Devon Meunier
>Assignee: Ben Chambers
>Priority: Minor
> Fix For: 2.1.0
>
>
> Metrics are currently only available in batch mode.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2577) IO tests should exercise Runtime Values where supported

2017-07-10 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080633#comment-16080633
 ] 

Ben Chambers commented on BEAM-2577:


I looked for other places in the codebase that did this testing to base things 
on and couldn't find any. I suspect one of the larger difficulties here is that 
there isn't a uniform API for providing values when running a pipeline, which 
makes it hard to write this test.

> IO tests should exercise Runtime Values where supported
> ---
>
> Key: BEAM-2577
> URL: https://issues.apache.org/jira/browse/BEAM-2577
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions, testing
>Reporter: Ben Chambers
>Assignee: Davor Bonaci
>
> The only tests I have found for `ValueProvider` parameterized methods is 
> that they are not evaluated during pipeline construction time. This is 
> missing out on several important pieces:
> 1. 
> https://stackoverflow.com/questions/44967898/notify-when-textio-is-done-writing-a-file
>  seems to be a problem with an AvroIO write using a RuntimeValueProvider 
> being non-serializable (current theory is because of an anonymous inner class 
> capturing the enclosing AvroIO.Write instance which has non-serializable 
> fields).
> 2. Testing that the code paths that actually read the file do so correctly 
> when parameterized.
> We should update the developer documentation to describe what the 
> requirements are for a parameterized IO and provide guidance on what tests 
> are needed and how to write them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2577) IO tests should exercise Runtime Values where supported

2017-07-10 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-2577:
--

 Summary: IO tests should exercise Runtime Values where supported
 Key: BEAM-2577
 URL: https://issues.apache.org/jira/browse/BEAM-2577
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-extensions, testing
Reporter: Ben Chambers
Assignee: Davor Bonaci


The only tests I have found for `ValueProvider` parameterized methods is 
that they are not evaluated during pipeline construction time. This is missing 
out on several important pieces:

1. 
https://stackoverflow.com/questions/44967898/notify-when-textio-is-done-writing-a-file
 seems to be a problem with an AvroIO write using a RuntimeValueProvider being 
non-serializable (current theory is because of an anonymous inner class 
capturing the enclosing AvroIO.Write instance which has non-serializable 
fields).

2. Testing that the code paths that actually read the file do so correctly when 
parameterized.

We should update the developer documentation to describe what the requirements 
are for a parameterized IO and provide guidance on what tests are needed and 
how to write them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2491) Duplicate org.apache.beam.runners.core.metrics.pachage-info.class

2017-06-21 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16057756#comment-16057756
 ] 

Ben Chambers commented on BEAM-2491:


That seems OK. I think both of the classes in the core-construction-java module 
are only used internally by some runners to implement filtering, etc.

> Duplicate org.apache.beam.runners.core.metrics.pachage-info.class
> -
>
> Key: BEAM-2491
> URL: https://issues.apache.org/jira/browse/BEAM-2491
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Etienne Chauchot
>Assignee: Etienne Chauchot
>Priority: Trivial
>
> There is twice the package org.apache.beam.runners.core.metrics in the code 
> base:
> * one in the module beam-runners-core-construction-java
> * one in the module beam-runners-core-java
> Consequently there will be two 
> org.apache.beam.runners.core.metrics.pachage-info.class in the classpath.
> Minor comment : some tools (e.g. Elasticsearch test framework) detect this 
> duplication and fail at runtime.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (BEAM-2406) NullPointerException when writing an empty table to BigQuery

2017-06-02 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-2406:
--

 Summary: NullPointerException when writing an empty table to 
BigQuery
 Key: BEAM-2406
 URL: https://issues.apache.org/jira/browse/BEAM-2406
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-gcp
Affects Versions: 2.0.0
Reporter: Ben Chambers
Assignee: Reuven Lax
Priority: Minor


Originally reported on Stackoverflow:
https://stackoverflow.com/questions/44314030/handling-empty-pcollections-with-bigquery-in-apache-beam

It looks like if there is no data to write, then WritePartitions will return a 
null destination, as explicitly stated in the comments:

https://github.com/apache/beam/blob/v2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java#L126

But, the ConstantTableDestination doesn't turn that into the constant 
destination as the comment promises, instead it returns that `null` destination:

https://github.com/apache/beam/blob/53c9bf4cd325035fabde192c63652ef6d591b93c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java#L74

This leads to a null pointer error here since the `tableDestination` is that 
null result from calling `getTable`:

https://github.com/apache/beam/blob/v2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L97



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-2244) Move runner-facing Metrics classes to runners core

2017-05-09 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-2244:
--

 Summary: Move runner-facing Metrics classes to runners core
 Key: BEAM-2244
 URL: https://issues.apache.org/jira/browse/BEAM-2244
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Ben Chambers
Assignee: Davor Bonaci






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-2186) Use Accumulable MetricsContainers in the DirectRunner

2017-05-05 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-2186:
--

 Summary: Use Accumulable MetricsContainers in the DirectRunner
 Key: BEAM-2186
 URL: https://issues.apache.org/jira/browse/BEAM-2186
 Project: Beam
  Issue Type: Bug
  Components: runner-direct
Reporter: Ben Chambers
Assignee: Thomas Groh


Once PR 2649 is submitted, we should be able to use the accumulable 
functionality to clean up the DirectRunner. Since this runner updates metrics 
from multiple processing threads simultaneously, we will need to figure out how 
to either (1) make the accumulable behavior thread safe or (2) have a 
non-blocking way of performing the accumulation sequentially.

If we go with (2), we can put updated containers in a queue and then have a 
single thread trying to apply those updates.

https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java#L912)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-2143) (Mis)Running Dataflow Wordcount gives non-helpful errors

2017-05-04 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15997526#comment-15997526
 ] 

Ben Chambers commented on BEAM-2143:


I only noticed this when trying to run a Java SDK pipeline, but it may affect 
both.

> (Mis)Running Dataflow Wordcount gives non-helpful errors
> 
>
> Key: BEAM-2143
> URL: https://issues.apache.org/jira/browse/BEAM-2143
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ben Chambers
>Assignee: Sourabh Bajaj
>Priority: Blocker
> Fix For: First stable release
>
>
> If you run a pipeline and forget to specify `tempLocation` (but specify 
> something else, such as `stagingLocation`) you get two messages indicating 
> you forgot to specify `stagingLocation`. 
> One says "no stagingLocation specified, choosing ..." the other says "error, 
> the staging location isn't readable" (if you give it just a bucket and not an 
> object within a bucket).
> This is surprising to me as a user, since (1) I specified a staging location 
> and (2) the flag I actually need to modify is `--tempLocation`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-2162) Add logging during and after long running BigQuery jobs

2017-05-03 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-2162:
--

 Summary: Add logging during and after long running BigQuery jobs
 Key: BEAM-2162
 URL: https://issues.apache.org/jira/browse/BEAM-2162
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-gcp
Reporter: Ben Chambers
Assignee: Ben Chambers
Priority: Minor


Currently, we log when the export job begins, but don't clearly log when it has 
completed.

If it takes a long time (over an hour) it would also be useful to periodically 
log to remind the reader that the job is still running, and make it clear the 
pipeline isn't hung, it's just waiting.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-32) Consider not emitting empty ON_TIME pane unless requested

2017-05-03 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-32?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15995613#comment-15995613
 ] 

Ben Chambers commented on BEAM-32:
--

http://stackoverflow.com/questions/43765921/combine-perkey-receives-empty-groups-when-a-repeatedly-trigger-is-used/43769182#43769182

Emmitting an empty creates a problem for a CombineFn. Specifically, it requires 
an identity. For some classes of combine Fn there is no natural identity. For 
example, "the smallest string combineFn" would have "the largest string" as its 
identity. But we shouldn't actually instantiate the largest string, so then the 
combineFn needs to have an artificial identity (such as "null") created, and 
handled.

> Consider not emitting empty ON_TIME pane unless requested
> -
>
> Key: BEAM-32
> URL: https://issues.apache.org/jira/browse/BEAM-32
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, runner-core
>Reporter: Kenneth Knowles
>Priority: Minor
>  Labels: Triggers, Windowing, backward-incompatible
> Fix For: First stable release
>
>
> Today, the ReduceFnRunner sets a timers and emits an empty ON_TIME pane as 
> long as the trigger allows it. This could be controlled in a manner analogous 
> to the empty final pane at window expiration (also owned by the 
> ReduceFnRunner).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-2143) (Mis)Running Dataflow Wordcount gives non-helpful errors

2017-05-02 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-2143:
--

 Summary: (Mis)Running Dataflow Wordcount gives non-helpful errors
 Key: BEAM-2143
 URL: https://issues.apache.org/jira/browse/BEAM-2143
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Reporter: Ben Chambers
Assignee: Daniel Halperin


If you run a pipeline and forget to specify `tempLocation` (but specify 
something else, such as `stagingLocation`) you get two messages indicating you 
forgot to specify `stagingLocation`. 

One says "no stagingLocation specified, choosing ..." the other says "error, 
the staging location isn't readable" (if you give it just a bucket and not an 
object within a bucket).

This is surprising to me as a user, since (1) I specified a staging location 
and (2) the flag I actually need to modify is `--tempLocation`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-135) Utilities for "batching" elements in a DoFn

2017-04-05 Thread Ben Chambers (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chambers resolved BEAM-135.
---
   Resolution: Fixed
Fix Version/s: First stable release

> Utilities for "batching" elements in a DoFn
> ---
>
> Key: BEAM-135
> URL: https://issues.apache.org/jira/browse/BEAM-135
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Ben Chambers
>Assignee: Etienne Chauchot
> Fix For: First stable release
>
>
> We regularly receive questions about how to write a {{DoFn}} that operates on 
> batches of elements. Example answers include:
> http://stackoverflow.com/questions/35065109/can-datastore-input-in-google-dataflow-pipeline-be-processed-in-a-batch-of-n-ent/35068341#35068341
> http://stackoverflow.com/questions/30177812/partition-data-coming-from-csv-so-i-can-process-larger-patches-rather-then-indiv/30178170#30178170
> Possible APIs could be to wrap a {{DoFn}} and include a batch size, or to 
> create a utility like {{Filter}}, {{Partition}}, etc. that takes a 
> {{SerializableFunction}} or a {{SimpleFunction}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1261) State API should allow state to be managed in different windows

2017-03-30 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949730#comment-15949730
 ] 

Ben Chambers commented on BEAM-1261:


I believe this use case can be handled with a global window (for example) where 
things are stored, and once both the person and auction have been encountered, 
a timer is set to eventually clear them out. Why are windows necessary for this 
case?

> State API should allow state to be managed in different windows
> ---
>
> Key: BEAM-1261
> URL: https://issues.apache.org/jira/browse/BEAM-1261
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core
>Reporter: Ben Chambers
>
> For example, even if the elements are being processed in fixed windows of an 
> hour, it may be desirable for the state to "roll over" between windows (or be 
> available to all windows).
> It will also be necessary to figure out when this state should be deleted 
> (TTL? maximum retention?)
> Another problem is how to deal with out of order data. If data comes in from 
> the 10:00 AM window, should its state changes be visible to the data in the 
> 9:00 AM window? 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-324) Improve TypeDescriptor inference of DoFn's created inside a generic PTransform

2017-03-30 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949444#comment-15949444
 ] 

Ben Chambers commented on BEAM-324:
---

I think the specific work would be using the "infer TypeDescriptor from an 
instance" in other places (such as DoFn's, etc.)

I think it is a non-API change, so likely OK to leave as a wish list item.

> Improve TypeDescriptor inference of DoFn's created inside a generic PTransform
> --
>
> Key: BEAM-324
> URL: https://issues.apache.org/jira/browse/BEAM-324
> Project: Beam
>  Issue Type: Wish
>  Components: sdk-java-core
>Reporter: Ben Chambers
>Priority: Minor
>
> Commit 
> https://github.com/apache/incubator-beam/commit/aa7f07fa5b22f3656d52dc9e1d4557bceb87c013
>  introduced the ability to infer a {{TypeDescriptor}} from an object created 
> inside a concrete instance of a {{PTransform}} and used it to simplify 
> {{SimpleFunction}} usage.
> We should probably look at using the same mechanism elsewhere, such as when 
> inferring the output type of a {{ParDo}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-775) Remove Aggregators from the Java SDK

2017-03-21 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15935735#comment-15935735
 ] 

Ben Chambers commented on BEAM-775:
---

(Note BEAM-1148 originally tracked porting PAssert off of Aggregators)

I don't believe Metrics will work for this. Specifically since most runners 
don't track values across committed bundles and instead only support attempted 
metrics, they may over or undercount.

Depending on a flaky value from Metrics for testing seems like it would be a 
step backwards in terms of flakiness of tests.

It may be useful to think about how to move PAsserts off aggregators/metrics 
towards something more reliable -- perhaps a deterministic completion sink that 
just writes the name of the assert that has been executed.

> Remove Aggregators from the Java SDK
> 
>
> Key: BEAM-775
> URL: https://issues.apache.org/jira/browse/BEAM-775
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Ben Chambers
>Assignee: Pablo Estrada
>  Labels: backward-incompatible
> Fix For: First stable release
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-775) Remove Aggregators from the Java SDK

2017-03-21 Thread Ben Chambers (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chambers reassigned BEAM-775:
-

Assignee: Pablo Estrada  (was: Ben Chambers)

> Remove Aggregators from the Java SDK
> 
>
> Key: BEAM-775
> URL: https://issues.apache.org/jira/browse/BEAM-775
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Ben Chambers
>Assignee: Pablo Estrada
>  Labels: backward-incompatible
> Fix For: First stable release
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1738) DataflowRunner should override Reshuffle transform

2017-03-16 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-1738:
--

 Summary: DataflowRunner should override Reshuffle transform
 Key: BEAM-1738
 URL: https://issues.apache.org/jira/browse/BEAM-1738
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Reporter: Ben Chambers
Assignee: Thomas Groh


Verify that the code works, and then remove the reification of windows for the 
Dataflow Runner since it handles Reshufle specially and doesn't need the 
explicit reification.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1656) DirectRunner should not call finalize twice in UnboundedSourceExecutorFactory

2017-03-08 Thread Ben Chambers (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chambers updated BEAM-1656:
---

Even then, we shouldn't call finalize before finishing using the restored
instance, I think.




> DirectRunner should not call finalize twice in UnboundedSourceExecutorFactory
> -
>
> Key: BEAM-1656
> URL: https://issues.apache.org/jira/browse/BEAM-1656
> Project: Beam
>  Issue Type: Bug
>  Components: runner-direct
>Reporter: Ben Chambers
>Assignee: Thomas Groh
>
> In 
> [getReader](https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L174)
>  we call finalize on the checkpoint that arrived in the incoming shard. Then 
> later in 
> [finishRead](https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L205)
>  we finalize the old checkpoint again.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1658) DirectRunner should sometimes encode/decode checkpoints

2017-03-08 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-1658:
--

 Summary: DirectRunner should sometimes encode/decode checkpoints
 Key: BEAM-1658
 URL: https://issues.apache.org/jira/browse/BEAM-1658
 Project: Beam
  Issue Type: Bug
  Components: runner-direct
Reporter: Ben Chambers
Assignee: Thomas Groh


In UnboundedSourceExecutorFactory the checkpoints should be encodeable, since 
they may need to be encoded and decoded. We should test that case.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1657) DirectRunner should not call close twice in UnboundedSourceExecutorFactory

2017-03-08 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-1657:
--

 Summary: DirectRunner should not call close twice in 
UnboundedSourceExecutorFactory
 Key: BEAM-1657
 URL: https://issues.apache.org/jira/browse/BEAM-1657
 Project: Beam
  Issue Type: Bug
  Components: runner-direct
Reporter: Ben Chambers
Assignee: Thomas Groh


At the end of the processElement, in `finishRead` there is a 5% chance we 
decide to `close` the reader. If that happens, it will null-out the reader in 
the returned shard. But then the end of the `processElement` call will attempt 
too `close` the reader again, violating the spec.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1656) DirectRunner should not call finalize twice in UnboundedSourceExecutorFactory

2017-03-08 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-1656:
--

 Summary: DirectRunner should not call finalize twice in 
UnboundedSourceExecutorFactory
 Key: BEAM-1656
 URL: https://issues.apache.org/jira/browse/BEAM-1656
 Project: Beam
  Issue Type: Bug
  Components: runner-direct
Reporter: Ben Chambers
Assignee: Thomas Groh


In 
[getReader](https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L174)
 we call finalize on the checkpoint that arrived in the incoming shard. Then 
later in 
[finishRead](https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L205)
 we finalize the old checkpoint again.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1655) Evaluate the PubsubUnboundedSource

2017-03-08 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-1655:
--

 Summary: Evaluate the PubsubUnboundedSource
 Key: BEAM-1655
 URL: https://issues.apache.org/jira/browse/BEAM-1655
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-extensions
Reporter: Ben Chambers
Assignee: Davor Bonaci


This source includes a lot of assumptions & assertions that may cause problems 
on runners that implement UnboundedSources differently. For example:

1. It requires that finalizeCheckpoint is called at most once.
2. It requires that the checkpoint be finalized within the pubsub timeout, or 
the messages will be redelivered.
... (possibly other assumptions) ... 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1654) Tests that UnboundedSources are executed correctly

2017-03-08 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-1654:
--

 Summary: Tests that UnboundedSources are executed correctly
 Key: BEAM-1654
 URL: https://issues.apache.org/jira/browse/BEAM-1654
 Project: Beam
  Issue Type: Bug
  Components: testing
Reporter: Ben Chambers
Assignee: Davor Bonaci


Specifically, develop a set of RunnableOnService tests that validate runner 
behavior when executing an Unbounded Source. Validations should include 
behaviors such as finalizeCheckpoint being called at most once, etc.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1616) Gauge Metric type

2017-03-06 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15898267#comment-15898267
 ] 

Ben Chambers commented on BEAM-1616:


I believe the other metrics have solved/documented this:

Attempted = "Aggregate across all attempts at bundles"
Committed = "Aggregate across all committed attempts at bundles"

The point is that the general use case of a Gauge as a "I sampled this value at 
this point in time" doesn't fit that model of aggregation.

So maybe it is enough to say that for a gauge, the attempted = committed = the 
latest value observed?

> Gauge Metric type
> -
>
> Key: BEAM-1616
> URL: https://issues.apache.org/jira/browse/BEAM-1616
> Project: Beam
>  Issue Type: New Feature
>  Components: beam-model, sdk-java-core, sdk-py
>Reporter: Aviem Zur
>Assignee: Ben Chambers
>
> Add support for Gauge metric type to the SDK.
> This will serve to get the last value reported.
> Interface should be along the lines of:
> {code}
> void set(long value);
> {code}
> Compare to 
> http://metrics.dropwizard.io/3.1.0/apidocs/com/codahale/metrics/Gauge.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1616) Gauge Metric type

2017-03-03 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15894733#comment-15894733
 ] 

Ben Chambers commented on BEAM-1616:


One thing we'll need to document clearly is what the behavior is across retries 
and such. Specifically, is this:

1. Last value reported across the pipeline
2. Last value reported for each worker, and then summed (in which case, what 
happens when old workers are shut down)
3. Last value reported for each work unit, and then summed (discarding failed 
work-units?)

> Gauge Metric type
> -
>
> Key: BEAM-1616
> URL: https://issues.apache.org/jira/browse/BEAM-1616
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model, sdk-java-core, sdk-py
>Reporter: Aviem Zur
>Assignee: Ben Chambers
>
> Add support for Gauge metric type to the SDK.
> This will serve to get the last value reported.
> Interface should be along the lines of:
> {code}
> void set(long value);
> {code}
> Compare to 
> http://metrics.dropwizard.io/3.1.0/apidocs/com/codahale/metrics/Gauge.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (BEAM-351) Add DisplayData to KafkaIO

2017-03-03 Thread Ben Chambers (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chambers closed BEAM-351.
-
Resolution: Fixed

> Add DisplayData to KafkaIO
> --
>
> Key: BEAM-351
> URL: https://issues.apache.org/jira/browse/BEAM-351
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Ben Chambers
>Assignee: Aviem Zur
>Priority: Minor
>  Labels: starter
> Fix For: 0.6.0
>
>
> Any interesting parameters of the sources/sinks should be exposed as display 
> data. See any of the sources/sinks that already export this (BigQuery, 
> PubSub, etc.) for examples. Also look at the DisplayData builder and 
> HasDisplayData interface for how to wire these up.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1344) Uniform metrics step name semantics

2017-03-02 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15892620#comment-15892620
 ] 

Ben Chambers commented on BEAM-1344:


Proposed semantics (implemented in BEAM-1572) are to have a query string match 
as a substring of the actual path, *but* require that each transform in the 
query are completely matched. So:

"foo/bar" matches "a/foo/bar/b" but *not* "a/afoo/bar/b" or "a/foo/bard/b"

When the query string is the name of a single tranfsorm such as "foo", this 
will match any step that a step named "foo".
When the query string is a full path, this will match only the specific step.

Do these semantics sound reasonable for querying?

> Uniform metrics step name semantics
> ---
>
> Key: BEAM-1344
> URL: https://issues.apache.org/jira/browse/BEAM-1344
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Ben Chambers
>
> Agree on and implement uniform metrics step name semantics which runners 
> would adhere to.
> Current discussion seems to point at a string with the pipeline graph path to 
> the step's transform. Something along the lines of: 
> "PBegin/SomeInputTransform/SomeParDo/...MyTransform.#Running_number_for_collisions".
> Also agree on and implement metrics querying semantics. Current discussion 
> seems to point at a substring or regex matching of steps on given string 
> input.
> [Original dev list 
> discussion|https://lists.apache.org/thread.html/476bf8f8b1bd63ec49a9f4f45d87402d49b9c887216f3b54cb748a12@%3Cdev.beam.apache.org%3E]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1289) TextIO (and others) should provide more informative messages when encountering RVP

2017-01-20 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-1289:
--

 Summary: TextIO (and others) should provide more informative 
messages when encountering RVP
 Key: BEAM-1289
 URL: https://issues.apache.org/jira/browse/BEAM-1289
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-extensions
Reporter: Ben Chambers
Assignee: Davor Bonaci
Priority: Minor


https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L301

When reporting failing to validate because a parameter comes from a 
RuntimeValueProvider (such as when creating templates) we report:

"Cannot validate with a RVP"

We should instead provide a message saying what went wrong and how to fix it. 
Something like:

"Unable to validate parameters that aren't available until runtime. Disable 
validation using .withoutValidation()."



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (BEAM-1261) State API should allow state to be managed in different windows

2017-01-10 Thread Ben Chambers (JIRA)
Ben Chambers created BEAM-1261:
--

 Summary: State API should allow state to be managed in different 
windows
 Key: BEAM-1261
 URL: https://issues.apache.org/jira/browse/BEAM-1261
 Project: Beam
  Issue Type: Bug
  Components: beam-model, sdk-java-core
Reporter: Ben Chambers
Assignee: Kenneth Knowles


For example, even if the elements are being processed in fixed windows of an 
hour, it may be desirable for the state to "roll over" between windows (or be 
available to all windows).

It will also be necessary to figure out when this state should be deleted (TTL? 
maximum retention?)

Another problem is how to deal with out of order data. If data comes in from 
the 10:00 AM window, should its state changes be visible to the data in the 
9:00 AM window? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (BEAM-370) Remove the .named() methods from PTransforms and sub-classes

2017-01-03 Thread Ben Chambers (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Chambers resolved BEAM-370.
---
   Resolution: Fixed
Fix Version/s: 0.5.0

> Remove the .named() methods from PTransforms and sub-classes
> 
>
> Key: BEAM-370
> URL: https://issues.apache.org/jira/browse/BEAM-370
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Ben Chambers
>Assignee: Ben Chambers
>Priority: Minor
>  Labels: backward-incompatible
> Fix For: 0.5.0
>
>
> 1. Update examples/tests/etc. to use named application instead of `.named()`
> 2. Remove the `.named()` methods from composite PTransforms
> 3. Where appropriate, use the the PTransform constructor which takes a string 
> to use as the default name.
> See further discussion in the related thread 
> (http://mail-archives.apache.org/mod_mbox/incubator-beam-dev/201606.mbox/%3ccan-7fgzuz1f_szzd2orfyd2pk2_prymhgwjepjpefp01h7s...@mail.gmail.com%3E).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-370) Remove the .named() methods from PTransforms and sub-classes

2016-12-29 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15785974#comment-15785974
 ] 

Ben Chambers commented on BEAM-370:
---

Not yet -- there are still a few methods called ".named()" that should be 
removed. Specifically, Combine#named still exists, and is used to provide nicer 
default names for composite transforms such as Max.


https://github.com/apache/beam/blob/f62d04e22679ee2ac19e3ae37dec487d953d51c1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Max.java#L55

> Remove the .named() methods from PTransforms and sub-classes
> 
>
> Key: BEAM-370
> URL: https://issues.apache.org/jira/browse/BEAM-370
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Ben Chambers
>Assignee: Ben Chambers
>Priority: Minor
>  Labels: backward-incompatible
>
> 1. Update examples/tests/etc. to use named application instead of `.named()`
> 2. Remove the `.named()` methods from composite PTransforms
> 3. Where appropriate, use the the PTransform constructor which takes a string 
> to use as the default name.
> See further discussion in the related thread 
> (http://mail-archives.apache.org/mod_mbox/incubator-beam-dev/201606.mbox/%3ccan-7fgzuz1f_szzd2orfyd2pk2_prymhgwjepjpefp01h7s...@mail.gmail.com%3E).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-773) Implement Metrics support for Flink runner

2016-12-29 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15785789#comment-15785789
 ] 

Ben Chambers commented on BEAM-773:
---

I'm not sure that it is being worked on currently. If you're interested in 
working on this, it would probably worth having a discussion on the mailing 
list to get some guidance from the Flink experts on how this should be 
implemented. My best guess would be that it should involve connecting Beam 
Metrics to Flink Metrics, although that will only support getting the attempted 
values (eg., it will count all attempts across retries). I'm also happy to 
discuss what has been built so far and how it is wired up to the Direct Runner.

> Implement Metrics support for Flink runner
> --
>
> Key: BEAM-773
> URL: https://issues.apache.org/jira/browse/BEAM-773
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Ben Chambers
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (BEAM-147) Introduce an easy API for pipeline metrics

2016-12-29 Thread Ben Chambers (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15785784#comment-15785784
 ] 

Ben Chambers commented on BEAM-147:
---

See https://issues.apache.org/jira/browse/BEAM-773 for more details on the 
status of the Flink implementation.

> Introduce an easy API for pipeline metrics
> --
>
> Key: BEAM-147
> URL: https://issues.apache.org/jira/browse/BEAM-147
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model, sdk-java-core, sdk-py
>Reporter: Robert Bradshaw
>Assignee: Ben Chambers
>
> The existing Aggregators are confusing both because of their name and because 
> they serve multiple purposes.
> Previous discussions around Aggregators/metrics/etc.
> See discussion at 
> http://mail-archives.apache.org/mod_mbox/incubator-beam-user/201603.mbox/browser
>  and 
> http://mail-archives.apache.org/mod_mbox/incubator-beam-dev/201603.mbox/browser
>  . Exact name still being bikeshedded.
> Design document: http://s.apache.org/beam-metrics-api



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)