[jira] [Commented] (BEAM-1396) GABWVOBDoFn expects grouped values to be ordered by their timestamp but there is no such guarantee

2017-02-05 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853642#comment-15853642
 ] 

Amit Sela commented on BEAM-1396:
-

It was always called {{SparkGroupAlsoByWindow}}, and it actually implements a 
Spark {{FlatMapFunction}} :)

https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java


> GABWVOBDoFn expects grouped values to be ordered by their timestamp but there 
> is no such guarantee
> --
>
> Key: BEAM-1396
> URL: https://issues.apache.org/jira/browse/BEAM-1396
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Amit Sela
>Assignee: Kenneth Knowles
>
> GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but 
> nothing in the SDK guarantees this: 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java#L86
> If such a chunk of timestamped values will be processed out-of-order I assume 
> we'd end up with an {{IllegalStateException}} thrown here:
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L191
> I suggest we go ahead and add sorting before processing the bundle in chunks 
> - this might prove expensive in extreme cases where a very large bundle with 
> very few keys is processed, but it seems that timestamp order is necessary.
> As for runners who provide order guarantee, since GABW is optional I don't 
> see an issue here, though [~dhalp...@google.com] suggested we add a 
> "shouldSort" flag.
> Also, probably worth creating a test for this, though it would prove 
> difficult since we would have to preset the order which is the problem to 
> begin with :-)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1223) Replace public constructors with static factory methods for Sum.[*]Fn classes

2017-02-05 Thread Daniel Halperin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853621#comment-15853621
 ] 

Daniel Halperin commented on BEAM-1223:
---

Hey [~staslev],

I just found in testing the 0.5.0 that this JIRA ended up also reducing 
visibility for {{Sum.SumIntegersFn}} and friends. This is a breaking change, 
which we are trying to keep track of for ourselves and for our users, using the 
{{backward-incompatible}} label -- I added it.

I hope you don't mind: I also reworded the JIRA in order to show a more obvious 
message in the Release Notes.

Thanks!
Dan

> Replace public constructors with static factory methods for Sum.[*]Fn classes
> -
>
> Key: BEAM-1223
> URL: https://issues.apache.org/jira/browse/BEAM-1223
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Stas Levin
>Assignee: Stas Levin
>Priority: Minor
>  Labels: backward-incompatible
> Fix For: 0.5.0
>
>
> {{Sum.SumDoubleFn}}, {{SumIntegerFn}} and {{SumLongFn}} are not using the 
> {{X.of()}} or {{X.from()}} or other instance creation via static method 
> patterns that are ubiquitous in Beam.
> Following a discussion on the dev list, it would be great to preserve a 
> consistent look and feel and change the creation patterns for these classes 
> to something like {{SumFn.ofLong()}} etc.
> See also the corresponding [dev list 
> thread|https://lists.apache.org/thread.html/5d8e905ee49b116d13811c2a96da65eeb44ab7c002870f50936ee1ad@%3Cdev.beam.apache.org%3E].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1223) Replace public constructors with static factory methods for Sum.[*]Fn classes

2017-02-05 Thread Daniel Halperin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Halperin updated BEAM-1223:
--
Summary: Replace public constructors with static factory methods for 
Sum.[*]Fn classes  (was: Add static factory methods for Sum.[*]Fn classes)

> Replace public constructors with static factory methods for Sum.[*]Fn classes
> -
>
> Key: BEAM-1223
> URL: https://issues.apache.org/jira/browse/BEAM-1223
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Stas Levin
>Assignee: Stas Levin
>Priority: Minor
>  Labels: backward-incompatible
> Fix For: 0.5.0
>
>
> {{Sum.SumDoubleFn}}, {{SumIntegerFn}} and {{SumLongFn}} are not using the 
> {{X.of()}} or {{X.from()}} or other instance creation via static method 
> patterns that are ubiquitous in Beam.
> Following a discussion on the dev list, it would be great to preserve a 
> consistent look and feel and change the creation patterns for these classes 
> to something like {{SumFn.ofLong()}} etc.
> See also the corresponding [dev list 
> thread|https://lists.apache.org/thread.html/5d8e905ee49b116d13811c2a96da65eeb44ab7c002870f50936ee1ad@%3Cdev.beam.apache.org%3E].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1223) Add static factory methods for Sum.[*]Fn classes

2017-02-05 Thread Daniel Halperin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Halperin updated BEAM-1223:
--
Labels: backward-incompatible  (was: )

> Add static factory methods for Sum.[*]Fn classes
> 
>
> Key: BEAM-1223
> URL: https://issues.apache.org/jira/browse/BEAM-1223
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Stas Levin
>Assignee: Stas Levin
>Priority: Minor
>  Labels: backward-incompatible
> Fix For: 0.5.0
>
>
> {{Sum.SumDoubleFn}}, {{SumIntegerFn}} and {{SumLongFn}} are not using the 
> {{X.of()}} or {{X.from()}} or other instance creation via static method 
> patterns that are ubiquitous in Beam.
> Following a discussion on the dev list, it would be great to preserve a 
> consistent look and feel and change the creation patterns for these classes 
> to something like {{SumFn.ofLong()}} etc.
> See also the corresponding [dev list 
> thread|https://lists.apache.org/thread.html/5d8e905ee49b116d13811c2a96da65eeb44ab7c002870f50936ee1ad@%3Cdev.beam.apache.org%3E].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: beam_PostCommit_Java_RunnableOnService_Dataflow #2208

2017-02-05 Thread Apache Jenkins Server
See 


--
[...truncated 26076 lines...]
[INFO] 2017-02-06T06:57:28.688Z: (2526ad5fb5c41518): Fusing consumer 
PAssert$266/GroupGlobally/RemoveActualsTriggering/Identity into 
PAssert$266/GroupGlobally/KeyForDummy/AddKeys/Map
[INFO] 2017-02-06T06:57:28.690Z: (2526ad5fb5c412fa): Fusing consumer 
PAssert$266/GroupGlobally/RewindowActuals into 
PAssert$266/GroupGlobally/GatherAllOutputs/Values/Values/Map
[INFO] 2017-02-06T06:57:28.692Z: (2526ad5fb5c410dc): Fusing consumer 
PAssert$266/GroupGlobally/GatherAllOutputs/ParDo(ReifyTimestampsAndWindows) 
into PAssert$266/GroupGlobally/Window.Into()
[INFO] 2017-02-06T06:57:28.694Z: (2526ad5fb5c41ebe): Fusing consumer 
PAssert$266/GroupGlobally/GatherAllOutputs/Values/Values/Map into 
PAssert$266/GroupGlobally/GatherAllOutputs/GroupByKey/GroupByWindow
[INFO] 2017-02-06T06:57:28.697Z: (2526ad5fb5c41ca0): Fusing consumer 
Reshuffle/GroupByKey/Write into Reshuffle/GroupByKey/Reify
[INFO] 2017-02-06T06:57:28.700Z: (2526ad5fb5c41a82): Fusing consumer 
PAssert$266/GroupGlobally/GatherAllOutputs/GroupByKey/GroupByWindow into 
PAssert$266/GroupGlobally/GatherAllOutputs/GroupByKey/Read
[INFO] 2017-02-06T06:57:28.705Z: (2526ad5fb5c41864): Fusing consumer 
PAssert$266/GroupGlobally/Window.Into() into Reshuffle/ExpandIterable
[INFO] 2017-02-06T06:57:28.707Z: (2526ad5fb5c41646): Fusing consumer 
PAssert$266/GroupGlobally/GatherAllOutputs/GroupByKey/Reify into 
PAssert$266/GroupGlobally/GatherAllOutputs/Window.Into()
[INFO] 2017-02-06T06:57:28.710Z: (2526ad5fb5c41428): Fusing consumer 
Reshuffle/Window.Into() into Create.Values/Read(CreateSource)
[INFO] 2017-02-06T06:57:28.715Z: (2526ad5fb5c4120a): Fusing consumer 
Reshuffle/GroupByKey/Reify into Reshuffle/Window.Into()
[INFO] 2017-02-06T06:57:28.720Z: (2526ad5fb5c41fec): Fusing consumer 
PAssert$266/GroupGlobally/GatherAllOutputs/GroupByKey/Write into 
PAssert$266/GroupGlobally/GatherAllOutputs/GroupByKey/Reify
[INFO] 2017-02-06T06:57:28.723Z: (2526ad5fb5c41dce): Fusing consumer 
Reshuffle/ExpandIterable into Reshuffle/GroupByKey/GroupByWindow
[INFO] 2017-02-06T06:57:28.725Z: (2526ad5fb5c41bb0): Fusing consumer 
PAssert$266/GroupGlobally/GatherAllOutputs/WithKeys/AddKeys/Map into 
PAssert$266/GroupGlobally/GatherAllOutputs/ParDo(ReifyTimestampsAndWindows)
[INFO] 2017-02-06T06:57:28.727Z: (2526ad5fb5c41992): Fusing consumer 
PAssert$266/GroupGlobally/GatherAllOutputs/Window.Into() into 
PAssert$266/GroupGlobally/GatherAllOutputs/WithKeys/AddKeys/Map
[INFO] 2017-02-06T06:57:28.729Z: (2526ad5fb5c41774): Fusing consumer 
Reshuffle/GroupByKey/GroupByWindow into Reshuffle/GroupByKey/Read
[INFO] 2017-02-06T06:57:28.731Z: (2526ad5fb5c41556): Fusing consumer 
PAssert$266/GroupGlobally/GroupDummyAndContents/Write into 
PAssert$266/GroupGlobally/GroupDummyAndContents/Reify
[INFO] 2017-02-06T06:57:28.734Z: (2526ad5fb5c41338): Fusing consumer 
PAssert$266/GroupGlobally/GroupDummyAndContents/Reify into 
PAssert$266/GroupGlobally/NeverTrigger/Identity
[INFO] 2017-02-06T06:57:28.736Z: (2526ad5fb5c4111a): Fusing consumer 
PAssert$266/GroupGlobally/NeverTrigger/Identity into 
PAssert$266/GroupGlobally/RemoveDummyTriggering/Identity
[INFO] 2017-02-06T06:57:28.739Z: (2526ad5fb5c41efc): Fusing consumer 
PAssert$266/GroupGlobally/WindowIntoDummy into 
PAssert$266/GroupGlobally/Create.Values/Read(CreateSource)
[INFO] 2017-02-06T06:57:28.742Z: (2526ad5fb5c41cde): Fusing consumer 
PAssert$266/GroupGlobally/RemoveDummyTriggering/Identity into 
PAssert$266/GroupGlobally/WindowIntoDummy
[INFO] 2017-02-06T06:57:28.885Z: (2526ad5fb5c41d98): Adding StepResource setup 
and teardown to workflow graph.
[INFO] 2017-02-06T06:57:29.396Z: S02: (2526ad5fb5c41b7a): Executing operation 
Create.Values/Read(CreateSource)+Reshuffle/Window.Into()+Reshuffle/GroupByKey/Reify+Reshuffle/GroupByKey/Write
[INFO] 2017-02-06T06:57:28.552Z: S03: (ee5e317269c22790): Executing operation 
PAssert$259/GroupGlobally/GatherAllOutputs/GroupByKey/Close
[INFO] 2017-02-06T06:57:28.578Z: S04: (705611cdf06d6617): Executing operation 
PAssert$259/GroupGlobally/GroupDummyAndContents/Create
[INFO] 2017-02-06T06:57:28.765Z: S05: (705611cdf06d65bf): Executing operation 
PAssert$259/GroupGlobally/Create.Values/Read(CreateSource)+PAssert$259/GroupGlobally/WindowIntoDummy+PAssert$259/GroupGlobally/RemoveDummyTriggering/Identity+PAssert$259/GroupGlobally/NeverTrigger/Identity+PAssert$259/GroupGlobally/GroupDummyAndContents/Reify+PAssert$259/GroupGlobally/GroupDummyAndContents/Write
[INFO] 2017-02-06T06:57:28.767Z: S06: (acbb81d729b839a1): Executing operation 
PAssert$259/GroupGlobally/GatherAllOutputs/GroupByKey/Read+PAssert$259/GroupGlobally/GatherAllOutputs/GroupByKey/GroupByWindow+PAssert$259/GroupGlobally/GatherAllOutputs/Values/Values/Map+PAssert$259/GroupGlobally/RewindowActuals+PAssert$259/GroupGlobally/KeyForDummy/AddKeys/Map+PAssert$259/GroupGlobally/RemoveActualsTriggerin

[jira] [Commented] (BEAM-1396) GABWVOBDoFn expects grouped values to be ordered by their timestamp but there is no such guarantee

2017-02-05 Thread Daniel Halperin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853587#comment-15853587
 ] 

Daniel Halperin commented on BEAM-1396:
---

:+1:

> GABWVOBDoFn expects grouped values to be ordered by their timestamp but there 
> is no such guarantee
> --
>
> Key: BEAM-1396
> URL: https://issues.apache.org/jira/browse/BEAM-1396
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Amit Sela
>Assignee: Kenneth Knowles
>
> GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but 
> nothing in the SDK guarantees this: 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java#L86
> If such a chunk of timestamped values will be processed out-of-order I assume 
> we'd end up with an {{IllegalStateException}} thrown here:
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L191
> I suggest we go ahead and add sorting before processing the bundle in chunks 
> - this might prove expensive in extreme cases where a very large bundle with 
> very few keys is processed, but it seems that timestamp order is necessary.
> As for runners who provide order guarantee, since GABW is optional I don't 
> see an issue here, though [~dhalp...@google.com] suggested we add a 
> "shouldSort" flag.
> Also, probably worth creating a test for this, though it would prove 
> difficult since we would have to preset the order which is the problem to 
> begin with :-)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] beam pull request #1925: Add git .mailmap file

2017-02-05 Thread kennknowles
GitHub user kennknowles opened a pull request:

https://github.com/apache/beam/pull/1925

Add git .mailmap file

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

No JIRA, just a little fun `git` hack. Makes the names in the log nice. I 
only used information you could find in the log itself - the classic pattern is 
that someone contributes for a while before setting up their `git` 
configuration to have good metadata. Or in this case, also there was a tool 
that doesn't preserve good metadata)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/beam mailmap

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1925.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1925


commit 9679f13bb5b04d8708bfac13b116b701e3d61b72
Author: Kenneth Knowles 
Date:   2017-01-25T04:50:50Z

Add a .mailmap




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-1395) SparkGroupAlsoByWindowFn not sorting grouped elements by timestamp

2017-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853499#comment-15853499
 ] 

ASF GitHub Bot commented on BEAM-1395:
--

GitHub user kennknowles opened a pull request:

https://github.com/apache/beam/pull/1924

[BEAM-1395] Remove needless assumptions and complexity from 
GABWViaOutputBufferDoFn

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/beam GABWViaOutputBuffer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1924


commit 781137fc2d2ce16c99e3294213755dd5645da832
Author: Kenneth Knowles 
Date:   2017-02-06T04:42:03Z

Remove extraneous chunking from GroupAlsoByWindowsViaOutputBufferDoFn

commit d8d74f689d91f5b9252fffec7d64b4f9fbd6bb56
Author: Kenneth Knowles 
Date:   2017-02-06T04:42:51Z

Remove incorrect pluralization from GroupAlsoByWindowsViaOutputBufferDoFn




> SparkGroupAlsoByWindowFn not sorting grouped elements by timestamp
> --
>
> Key: BEAM-1395
> URL: https://issues.apache.org/jira/browse/BEAM-1395
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> SparkGroupAlsoByWindowFn relies on the grouped elements (pre key) to be 
> sorted by their timestamp, which is not the case, and so could cause: 
> {code}
> IllegalStateException: Cannot move input watermark time backwards
> {code}
> We should sort the values first, just like with {{Combine}} implementations: 
> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L73



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] beam pull request #1924: [BEAM-1395] Remove needless assumptions and complex...

2017-02-05 Thread kennknowles
GitHub user kennknowles opened a pull request:

https://github.com/apache/beam/pull/1924

[BEAM-1395] Remove needless assumptions and complexity from 
GABWViaOutputBufferDoFn

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [x] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kennknowles/beam GABWViaOutputBuffer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1924


commit 781137fc2d2ce16c99e3294213755dd5645da832
Author: Kenneth Knowles 
Date:   2017-02-06T04:42:03Z

Remove extraneous chunking from GroupAlsoByWindowsViaOutputBufferDoFn

commit d8d74f689d91f5b9252fffec7d64b4f9fbd6bb56
Author: Kenneth Knowles 
Date:   2017-02-06T04:42:51Z

Remove incorrect pluralization from GroupAlsoByWindowsViaOutputBufferDoFn




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (BEAM-1400) Transient Apex runner postcommit failure in WindowTest.testOutputTimeFnDefault

2017-02-05 Thread Kenneth Knowles (JIRA)
Kenneth Knowles created BEAM-1400:
-

 Summary: Transient Apex runner postcommit failure in 
WindowTest.testOutputTimeFnDefault
 Key: BEAM-1400
 URL: https://issues.apache.org/jira/browse/BEAM-1400
 Project: Beam
  Issue Type: Bug
  Components: runner-apex
Reporter: Kenneth Knowles


The output timestamp appears to have come back as {{now()}} instead of the end 
of the window.

https://builds.apache.org/job/beam_PostCommit_Java_RunnableOnService_Apex/411/org.apache.beam$beam-runners-apex/testReport/junit/org.apache.beam.sdk.transforms.windowing/WindowTest/testOutputTimeFnDefault/

It is possible that this is a flake in some runners/core-java code. Keeping it 
here for now until we see it happen somewhere else.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-1396) GABWVOBDoFn expects grouped values to be ordered by their timestamp but there is no such guarantee

2017-02-05 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853491#comment-15853491
 ] 

Kenneth Knowles edited comment on BEAM-1396 at 2/6/17 4:30 AM:
---

Glad you brought this to me without spending too much time on it. This code is 
actually meant to be used in batch mode by any runner; it takes advantage of 
the batch style to be much more efficient than {{GABWViaActiveWindowSets}}.

For the landscape of GABW:

1. {{GroupAlsoByWindowViaWindowSetDoFn}} / 
{{GroupAlsoByWindowViaWindowSetNewDoFn}} are the "works no matter what" 
implementation.
2. {{GroupAlsoByWindowsViaOutputBufferDoFn}} is the "works if you don't need to 
deal with the watermark, just move it from -infinity to infinity" but then we 
made it weird and added an incidental requirement that we should remove.
3. Any runner-specific hackery that is harder to describe and not generally 
useful (feel free to write {{WorksOnlyForTheSparkRunnerGABW}} :-))

For this particular issue, the fix I will take is to remove the for loop over 
chunks of 1000, which is the only reason sorting mattered. Essentially this 
GABW implementation runs "like" a fully batch-centric version over 1000 
elements at a time. The chunking was added to make batch act somewhat like 
streaming - multiple outputs per key in GBK - to catch bugs etc. But now we 
have a streaming direct runner and we should just focus this on simplicity, 
correctness, performance, and usefulness to all the runners.



was (Author: kenn):
Glad you brought this to me without spending too much time on it. This code is 
actually meant to be used in batch mode by any runner; it takes advantage of 
the batch style to be much more efficient than {{GABWViaActiveWindowSets}}.

For the landscape of GABW:

1. {{GroupAlsoByWindowViaWindowSetDoFn}} / 
{{GroupAlsoByWindowViaWindowSetNewDoFn}} are the "works no matter what" 
implementation.
2. {{GroupAlsoByWindowsViaOutputBufferDoFn}} is the "works if you don't need to 
deal with the watermark, just move it from 0 to infinity, and also the input is 
sorted by timestamp" but then we made it weird and added an incidental 
requirement that we should remove.
3. Any runner-specific hackery that is harder to describe and not generally 
useful (feel free to write {{WorksOnlyForTheSparkRunnerGABW}} :-))

For this particular issue, the fix I will take is to remove the for loop over 
chunks of 1000, which is the only reason sorting mattered. Essentially this 
GABW implementation runs "like" a fully batch-centric version over 1000 
elements at a time. The chunking was added to make batch act somewhat like 
streaming - multiple outputs per key in GBK - to catch bugs etc. But now we 
have a streaming direct runner and we should just focus this on simplicity, 
correctness, performance, and usefulness to all the runners.


> GABWVOBDoFn expects grouped values to be ordered by their timestamp but there 
> is no such guarantee
> --
>
> Key: BEAM-1396
> URL: https://issues.apache.org/jira/browse/BEAM-1396
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Amit Sela
>Assignee: Kenneth Knowles
>
> GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but 
> nothing in the SDK guarantees this: 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java#L86
> If such a chunk of timestamped values will be processed out-of-order I assume 
> we'd end up with an {{IllegalStateException}} thrown here:
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L191
> I suggest we go ahead and add sorting before processing the bundle in chunks 
> - this might prove expensive in extreme cases where a very large bundle with 
> very few keys is processed, but it seems that timestamp order is necessary.
> As for runners who provide order guarantee, since GABW is optional I don't 
> see an issue here, though [~dhalp...@google.com] suggested we add a 
> "shouldSort" flag.
> Also, probably worth creating a test for this, though it would prove 
> difficult since we would have to preset the order which is the problem to 
> begin with :-)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (BEAM-1396) GABWVOBDoFn expects grouped values to be ordered by their timestamp but there is no such guarantee

2017-02-05 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853491#comment-15853491
 ] 

Kenneth Knowles edited comment on BEAM-1396 at 2/6/17 4:25 AM:
---

Glad you brought this to me without spending too much time on it. This code is 
actually meant to be used in batch mode by any runner; it takes advantage of 
the batch style to be much more efficient than {{GABWViaActiveWindowSets}}.

For the landscape of GABW:

1. {{GroupAlsoByWindowViaWindowSetDoFn}} / 
{{GroupAlsoByWindowViaWindowSetNewDoFn}} are the "works no matter what" 
implementation.
2. {{GroupAlsoByWindowsViaOutputBufferDoFn}} is the "works if you don't need to 
deal with the watermark, just move it from 0 to infinity, and also the input is 
sorted by timestamp" but then we made it weird and added an incidental 
requirement that we should remove.
3. Any runner-specific hackery that is harder to describe and not generally 
useful (feel free to write {{WorksOnlyForTheSparkRunnerGABW}} :-))

For this particular issue, the fix I will take is to remove the for loop over 
chunks of 1000, which is the only reason sorting mattered. Essentially this 
GABW implementation runs "like" a fully batch-centric version over 1000 
elements at a time. The chunking was added to make batch act somewhat like 
streaming - multiple outputs per key in GBK - to catch bugs etc. But now we 
have a streaming direct runner and we should just focus this on simplicity, 
correctness, performance, and usefulness to all the runners.



was (Author: kenn):
Glad you brought this to me without spending too much time on it. This code is 
actually meant to be used in batch mode by any runner; it takes advantage of 
the batch style to be much more efficient than {{GABWViaActiveWindowSets}}.

For the landscape of GABW:

1. {{GroupAlsoByWindowViaWindowSetDoFn}} / 
{{GroupAlsoByWindowViaWindowSetNewDoFn}} are the "works no matter what" 
implementation.
2. {{GroupAlsoByWindowsViaOutputBufferDoFn}} is the "works if you don't need to 
deal with the watermark, just move it from 0 to infinity, and also the input is 
sorted by timestamp" but then we made it weird and added an incidental 
requirement that we should remove.
3. Any runner-specific hackery that is harder to describe and not generally 
useful (feel free to write {{WorksOnlyForTheSparkRunnerGABW}} :-)

For this particular issue, the fix I will take is to remove the for loop over 
chunks of 1000, which is the only reason sorting mattered. Essentially this 
GABW implementation runs "like" a fully batch-centric version over 1000 
elements at a time. The chunking was added to make batch act somewhat like 
streaming - multiple outputs per key in GBK - to catch bugs etc. But now we 
have a streaming direct runner and we should just focus this on simplicity, 
correctness, performance, and usefulness to all the runners.


> GABWVOBDoFn expects grouped values to be ordered by their timestamp but there 
> is no such guarantee
> --
>
> Key: BEAM-1396
> URL: https://issues.apache.org/jira/browse/BEAM-1396
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Amit Sela
>Assignee: Kenneth Knowles
>
> GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but 
> nothing in the SDK guarantees this: 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java#L86
> If such a chunk of timestamped values will be processed out-of-order I assume 
> we'd end up with an {{IllegalStateException}} thrown here:
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L191
> I suggest we go ahead and add sorting before processing the bundle in chunks 
> - this might prove expensive in extreme cases where a very large bundle with 
> very few keys is processed, but it seems that timestamp order is necessary.
> As for runners who provide order guarantee, since GABW is optional I don't 
> see an issue here, though [~dhalp...@google.com] suggested we add a 
> "shouldSort" flag.
> Also, probably worth creating a test for this, though it would prove 
> difficult since we would have to preset the order which is the problem to 
> begin with :-)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1396) GABWVOBDoFn expects grouped values to be ordered by their timestamp but there is no such guarantee

2017-02-05 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853491#comment-15853491
 ] 

Kenneth Knowles commented on BEAM-1396:
---

Glad you brought this to me without spending too much time on it. This code is 
actually meant to be used in batch mode by any runner; it takes advantage of 
the batch style to be much more efficient than {{GABWViaActiveWindowSets}}.

For the landscape of GABW:

1. {{GroupAlsoByWindowViaWindowSetDoFn}} / 
{{GroupAlsoByWindowViaWindowSetNewDoFn}} are the "works no matter what" 
implementation.
2. {{GroupAlsoByWindowsViaOutputBufferDoFn}} is the "works if you don't need to 
deal with the watermark, just move it from 0 to infinity, and also the input is 
sorted by timestamp" but then we made it weird and added an incidental 
requirement that we should remove.
3. Any runner-specific hackery that is harder to describe and not generally 
useful (feel free to write {{WorksOnlyForTheSparkRunnerGABW}} :-)

For this particular issue, the fix I will take is to remove the for loop over 
chunks of 1000, which is the only reason sorting mattered. Essentially this 
GABW implementation runs "like" a fully batch-centric version over 1000 
elements at a time. The chunking was added to make batch act somewhat like 
streaming - multiple outputs per key in GBK - to catch bugs etc. But now we 
have a streaming direct runner and we should just focus this on simplicity, 
correctness, performance, and usefulness to all the runners.


> GABWVOBDoFn expects grouped values to be ordered by their timestamp but there 
> is no such guarantee
> --
>
> Key: BEAM-1396
> URL: https://issues.apache.org/jira/browse/BEAM-1396
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Amit Sela
>Assignee: Kenneth Knowles
>
> GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but 
> nothing in the SDK guarantees this: 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java#L86
> If such a chunk of timestamped values will be processed out-of-order I assume 
> we'd end up with an {{IllegalStateException}} thrown here:
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L191
> I suggest we go ahead and add sorting before processing the bundle in chunks 
> - this might prove expensive in extreme cases where a very large bundle with 
> very few keys is processed, but it seems that timestamp order is necessary.
> As for runners who provide order guarantee, since GABW is optional I don't 
> see an issue here, though [~dhalp...@google.com] suggested we add a 
> "shouldSort" flag.
> Also, probably worth creating a test for this, though it would prove 
> difficult since we would have to preset the order which is the problem to 
> begin with :-)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1369) some unit tests in python take longer than 10s to run

2017-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853489#comment-15853489
 ] 

ASF GitHub Bot commented on BEAM-1369:
--

GitHub user sb2nov opened a pull request:

https://github.com/apache/beam/pull/1923

[BEAM-1369] Reduce test times for two retry based tests

R: @charlesccychen PTAL

- The gcsio test was taking 2s so reducing the segment override makes it 
faster.
- The retry test now just starts at a lower time to speed it up from 1s -> 
0.1s

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sb2nov/beam BEAM-reduce-test-times

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1923.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1923


commit d6b9e554df334acfc1af0ba29d4e591376474ed2
Author: Sourabh Bajaj 
Date:   2017-02-06T04:18:52Z

Reduce test times for two retry based tests




> some unit tests in python take longer than 10s to run
> -
>
> Key: BEAM-1369
> URL: https://issues.apache.org/jira/browse/BEAM-1369
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py
>Reporter: Sourabh Bajaj
>Assignee: Sourabh Bajaj
>Priority: Minor
>
> 4 or 5 take up 50% of the time to run the unit tests.
> apache_beam.transforms.combiners_test.CombineTest.test_sample: 17.1545s
> apache_beam.io.avroio_test.TestAvro.test_dynamic_work_rebalancing_exhaustive: 
> 16.s
> apache_beam.examples.complete.estimate_pi_test.EstimatePiTest.test_basics: 
> 8.3667s
> apache_beam.io.textio_test.TextSourceTest.test_dynamic_work_rebalancing: 
> 1.9172s
> apache_beam.io.avroio_test.TestAvro.test_read_with_splitting_multiple_blocks: 
> 1.3939s
> apache_beam.io.textio_test.TextSourceTest.test_read_gzip_large: 1.2832s
> apache_beam.io.source_test_utils_test.SourceTestUtilsTest.test_split_at_fraction_exhaustive:
>  1.2787s
> apache_beam.io.bigquery_test.TestBigQueryReader.test_read_from_table_and_job_complete_retry:
>  1.0082s



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] beam pull request #1923: [BEAM-1369] Reduce test times for two retry based t...

2017-02-05 Thread sb2nov
GitHub user sb2nov opened a pull request:

https://github.com/apache/beam/pull/1923

[BEAM-1369] Reduce test times for two retry based tests

R: @charlesccychen PTAL

- The gcsio test was taking 2s so reducing the segment override makes it 
faster.
- The retry test now just starts at a lower time to speed it up from 1s -> 
0.1s

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [x] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sb2nov/beam BEAM-reduce-test-times

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1923.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1923


commit d6b9e554df334acfc1af0ba29d4e591376474ed2
Author: Sourabh Bajaj 
Date:   2017-02-06T04:18:52Z

Reduce test times for two retry based tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (BEAM-425) Create Elasticsearch IO

2017-02-05 Thread Reza Nouri (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853477#comment-15853477
 ] 

Reza Nouri commented on BEAM-425:
-

Hi [~echauchot],

I am trying to use elasticIO, but I am getting the following error while 
writing data to elastic:

threw exception [org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
org.elasticsearch.client.ResponseException: POST 
http://127.0.0.1:9200/namedentities/keyword/_bulk: HTTP/1.1 400 Bad Request
{"error":{"root_cause":[{"type":"parse_exception","reason":"Failed to derive 
xcontent"}],"type":"parse_exception","reason":"Failed to derive 
xcontent"},"status":400}] with root cause
org.elasticsearch.client.ResponseException: POST 
http://127.0.0.1:9200/namedentities/keyword/_bulk: HTTP/1.1 400 Bad Request
{"error":{"root_cause":[{"type":"parse_exception","reason":"Failed to derive 
xcontent"}],"type":"parse_exception","reason":"Failed to derive 
xcontent"},"status":400}
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:310)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:299)
at 
org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:119)
at 
org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:177)
at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:436)
at 
org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:326)
at 
org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at 
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at 
org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at 
org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at 
org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at 
org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at 
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)

I wrote something similar to ElasticsearchIOTest test case and the data is JSON 
format exactly same as the test file:

data.add(String.format("{\"scientist\":\"%s\", \"id\":%d}", scientists[index], 
i));

Also, I am using elastic 5.2 running on a single node.

Any idea?

Thanks, 
Reza



> Create Elasticsearch IO
> ---
>
> Key: BEAM-425
> URL: https://issues.apache.org/jira/browse/BEAM-425
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-extensions
>Reporter: Jean-Baptiste Onofré
>Assignee: Etienne Chauchot
> Fix For: 0.5.0
>
>
> I'm working on a new ElasticsearchIO providing both bounded source and sink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Jenkins build is back to stable : beam_PostCommit_Java_RunnableOnService_Apex #412

2017-02-05 Thread Apache Jenkins Server
See 




[jira] [Commented] (BEAM-886) Support new DoFn in Python SDK

2017-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853441#comment-15853441
 ] 

ASF GitHub Bot commented on BEAM-886:
-

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1917


> Support new DoFn in Python SDK
> --
>
> Key: BEAM-886
> URL: https://issues.apache.org/jira/browse/BEAM-886
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py
>Reporter: Ahmet Altay
>Assignee: Sourabh Bajaj
>  Labels: backward-incompatible, sdk-consistency
>
> Figure out what is needed for supporting new DoFns, add support and removed 
> old DoFns.
> Related Docs from Java:
> Original Proposal email:
> https://lists.apache.org/thread.html/2abf32d528dbb64b79853552c5d10c217e2194f0685af21aeb4635dd@%3Cdev.beam.apache.org%3E
> Presentation & Doc (with short Python sections):
> https://s.apache.org/presenting-a-new-dofn
> https://s.apache.org/a-new-dofn



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] beam pull request #1917: [BEAM-886] Remove the usage of OldDoFn and clean up...

2017-02-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1917


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: Remove the usage of OldDoFn and clean up function names

2017-02-05 Thread altay
Repository: beam
Updated Branches:
  refs/heads/master 4d0e8ecf1 -> b3d962df2


Remove the usage of OldDoFn and clean up function names


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/137d392e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/137d392e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/137d392e

Branch: refs/heads/master
Commit: 137d392e79080f67a24fa418c474e4f852d1dd74
Parents: 4d0e8ec
Author: Sourabh Bajaj 
Authored: Sun Feb 5 11:11:25 2017 -0800
Committer: Sourabh Bajaj 
Committed: Sun Feb 5 16:01:54 2017 -0800

--
 sdks/python/apache_beam/runners/common.pxd  |  12 +-
 sdks/python/apache_beam/runners/common.py   | 225 +++
 .../runners/direct/transform_evaluator.py   |  15 +-
 sdks/python/apache_beam/transforms/core.py  |  87 +--
 sdks/python/apache_beam/typehints/typecheck.py  | 133 ---
 5 files changed, 92 insertions(+), 380 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/runners/common.pxd
--
diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index dbb08f0..f36fdd0 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -37,22 +37,20 @@ cdef class DoFnRunner(Receiver):
   cdef object tagged_receivers
   cdef LoggingContext logging_context
   cdef object step_name
-  cdef bint is_new_dofn
   cdef list args
   cdef dict kwargs
   cdef ScopedMetricsContainer scoped_metrics_container
   cdef list side_inputs
-  cdef bint has_windowed_side_inputs
+  cdef bint has_windowed_inputs
   cdef list placeholders
-  cdef bint simple_process
+  cdef bint use_simple_invoker
 
   cdef Receiver main_receivers
 
   cpdef process(self, WindowedValue element)
-  cdef old_dofn_process(self, WindowedValue element)
-  cdef new_dofn_process(self, WindowedValue element)
-  cdef new_dofn_simple_process(self, WindowedValue element)
-  cdef _new_dofn_window_process(
+  cdef _dofn_invoker(self, WindowedValue element)
+  cdef _dofn_simple_invoker(self, WindowedValue element)
+  cdef _dofn_window_invoker(
   self, WindowedValue element, list args, dict kwargs, object window)
 
   @cython.locals(windowed_value=WindowedValue)

http://git-wip-us.apache.org/repos/asf/beam/blob/137d392e/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 9c942c0..aa6c2dd 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -115,141 +115,89 @@ class DoFnRunner(Receiver):
   assert context is not None
   self.context = context
 
-# TODO(Sourabhbajaj): Remove the usage of OldDoFn
-if isinstance(fn, core.DoFn):
-
-  class ArgPlaceholder(object):
-def __init__(self, placeholder):
-  self.placeholder = placeholder
-
-  self.is_new_dofn = True
+class ArgPlaceholder(object):
+  def __init__(self, placeholder):
+self.placeholder = placeholder
+
+# Stash values for use in dofn_process.
+self.side_inputs = side_inputs
+self.has_windowed_inputs = not all(
+si.is_globally_windowed() for si in self.side_inputs)
+
+self.args = args if args else []
+self.kwargs = kwargs if kwargs else {}
+self.dofn = fn
+self.dofn_process = fn.process
+
+arguments, _, _, defaults = self.dofn.get_function_arguments('process')
+defaults = defaults if defaults else []
+self_in_args = int(self.dofn.is_process_bounded())
+
+self.use_simple_invoker = (
+not side_inputs and not args and not kwargs and not defaults)
+if self.use_simple_invoker:
+  # As we're using the simple invoker we don't need to compute placeholders
+  return
 
-  # Stash values for use in new_dofn_process.
-  self.side_inputs = side_inputs
-  self.has_windowed_side_inputs = not all(
-  si.is_globally_windowed() for si in self.side_inputs)
+self.has_windowed_inputs = (self.has_windowed_inputs or
+core.DoFn.WindowParam in defaults)
 
-  self.args = args if args else []
-  self.kwargs = kwargs if kwargs else {}
-  self.dofn = fn
-  self.dofn_process = fn.process
+# Try to prepare all the arguments that can just be filled in
+# without any additional work. in the process function.
+# Also cache all the placeholders needed in the process function.
 
-  arguments, _, _, defaults = self.dofn.get_function_arguments('process')
-  defaults = defaults if defaults else []
-  self_in_args = int(self.dofn.is_

[2/2] beam git commit: This closes #1917

2017-02-05 Thread altay
This closes #1917


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b3d962df
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b3d962df
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b3d962df

Branch: refs/heads/master
Commit: b3d962df230e5ed72ef04bf0bf410e8f925ff656
Parents: 4d0e8ec 137d392
Author: Ahmet Altay 
Authored: Sun Feb 5 17:57:31 2017 -0800
Committer: Ahmet Altay 
Committed: Sun Feb 5 17:57:31 2017 -0800

--
 sdks/python/apache_beam/runners/common.pxd  |  12 +-
 sdks/python/apache_beam/runners/common.py   | 225 +++
 .../runners/direct/transform_evaluator.py   |  15 +-
 sdks/python/apache_beam/transforms/core.py  |  87 +--
 sdks/python/apache_beam/typehints/typecheck.py  | 133 ---
 5 files changed, 92 insertions(+), 380 deletions(-)
--




Jenkins build became unstable: beam_PostCommit_Java_RunnableOnService_Apex #411

2017-02-05 Thread Apache Jenkins Server
See 




[jira] [Resolved] (BEAM-1387) DatastoreIO: use batch-datastore.googleapis.com as the endpoint

2017-02-05 Thread Daniel Halperin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Halperin resolved BEAM-1387.
---
   Resolution: Fixed
Fix Version/s: 0.6.0

> DatastoreIO: use batch-datastore.googleapis.com as the endpoint
> ---
>
> Key: BEAM-1387
> URL: https://issues.apache.org/jira/browse/BEAM-1387
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp, sdk-py
>Reporter: Daniel Halperin
>Assignee: Daniel Halperin
> Fix For: 0.6.0
>
>
> Google Cloud Datastore has added a new API endpoint for big data processing 
> systems. We should use it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1387) DatastoreIO: use batch-datastore.googleapis.com as the endpoint

2017-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853396#comment-15853396
 ] 

ASF GitHub Bot commented on BEAM-1387:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1915


> DatastoreIO: use batch-datastore.googleapis.com as the endpoint
> ---
>
> Key: BEAM-1387
> URL: https://issues.apache.org/jira/browse/BEAM-1387
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp, sdk-py
>Reporter: Daniel Halperin
>Assignee: Daniel Halperin
>
> Google Cloud Datastore has added a new API endpoint for big data processing 
> systems. We should use it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] beam pull request #1915: [BEAM-1387] DatastoreV1: use batch-datastore.google...

2017-02-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1915


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] beam git commit: [BEAM-1387] DatastoreV1: use batch-datastore.googleapis.com endpoint

2017-02-05 Thread dhalperi
Repository: beam
Updated Branches:
  refs/heads/master c442ef81a -> 4d0e8ecf1


[BEAM-1387] DatastoreV1: use batch-datastore.googleapis.com endpoint

If localhost is not set.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62dab951
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62dab951
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62dab951

Branch: refs/heads/master
Commit: 62dab951009a4df69dd05b281e0e47cc97bc42a9
Parents: c442ef8
Author: Dan Halperin 
Authored: Fri Feb 3 14:58:19 2017 -0800
Committer: Dan Halperin 
Committed: Sun Feb 5 14:31:31 2017 -0800

--
 pom.xml  | 4 ++--
 .../java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java   | 2 ++
 2 files changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/62dab951/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 6776db1..4ed676e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,8 +109,8 @@
 v2-rev8-1.22.0
 v1b3-rev43-1.22.0
 0.5.160222
-1.2.0
-1.2.0
+1.4.0
+1.3.0
 1.0-rc2
 1.3
 0.6.0

http://git-wip-us.apache.org/repos/asf/beam/blob/62dab951/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
--
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 4a219aa..85c0744 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -1067,6 +1067,8 @@ public class DatastoreV1 {
 
   if (localhost != null) {
 builder.localHost(localhost);
+  } else {
+builder.host("batch-datastore.googleapis.com");
   }
 
   return DatastoreFactory.get().create(builder.build());



[2/2] beam git commit: This closes #1915

2017-02-05 Thread dhalperi
This closes #1915


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d0e8ecf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d0e8ecf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d0e8ecf

Branch: refs/heads/master
Commit: 4d0e8ecf1f0ba8784652839d47c1def72ac6ae24
Parents: c442ef8 62dab95
Author: Dan Halperin 
Authored: Sun Feb 5 14:31:34 2017 -0800
Committer: Dan Halperin 
Committed: Sun Feb 5 14:31:34 2017 -0800

--
 pom.xml  | 4 ++--
 .../java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java   | 2 ++
 2 files changed, 4 insertions(+), 2 deletions(-)
--




[jira] [Updated] (BEAM-1398) KafkaIO metrics

2017-02-05 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur updated BEAM-1398:

Description: 
Add metrics to {{KafkaIO}} using the metrics API.

Metrics (Feel free to add more ideas here) per topic/split (Where applicable):
* Backlog in bytes.
* Backlog in number of messages.
* Messages consumed.
* Bytes consumed.
* Error counts.
* Messages produced.
* Time spent reading.
* Time spent writing.

Add {{RunnableOnService}} test which creates a pipeline and asserts metrics 
values.

  was:
Add metrics to {{KafkaIO}} using the metrics API.

Metrics (Feel free to add more ideas here) per topic/split (Where applicable):
* Backlog in bytes.
* Backlog in number of messages.
* Messages consumed.
* Bytes consumed.
* Error counts.
* Messages produced.
* Time spent reading.
* Time spent writing.


> KafkaIO metrics
> ---
>
> Key: BEAM-1398
> URL: https://issues.apache.org/jira/browse/BEAM-1398
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-extensions
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>
> Add metrics to {{KafkaIO}} using the metrics API.
> Metrics (Feel free to add more ideas here) per topic/split (Where applicable):
> * Backlog in bytes.
> * Backlog in number of messages.
> * Messages consumed.
> * Bytes consumed.
> * Error counts.
> * Messages produced.
> * Time spent reading.
> * Time spent writing.
> Add {{RunnableOnService}} test which creates a pipeline and asserts metrics 
> values.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1399) Code coverage numbers are not accurate

2017-02-05 Thread Daniel Halperin (JIRA)
Daniel Halperin created BEAM-1399:
-

 Summary: Code coverage numbers are not accurate
 Key: BEAM-1399
 URL: https://issues.apache.org/jira/browse/BEAM-1399
 Project: Beam
  Issue Type: Bug
  Components: build-system, sdk-java-core, testing
Reporter: Daniel Halperin


We've started adding Java Code Coverage numbers to PRs using the jacoco plugin. 
However, we are getting very low coverage reported for things like the Java SDK 
core.

My belief is that this is happening because we test the bulk of the SDK not in 
the SDK module , but in fact in the DirectRunner and other similar modules.

JaCoCo has a {{report:aggregate}} target that might do the trick, but with a 
few minutes of playing with it I wasn't able to make it work satisfactorily. 
Basic work in https://github.com/apache/beam/pull/1800

This is a good "random improvement" issue for anyone to pick up.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1398) KafkaIO metrics

2017-02-05 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur reassigned BEAM-1398:
---

Assignee: Daniel Halperin  (was: Davor Bonaci)

> KafkaIO metrics
> ---
>
> Key: BEAM-1398
> URL: https://issues.apache.org/jira/browse/BEAM-1398
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-extensions
>Reporter: Aviem Zur
>Assignee: Daniel Halperin
>
> Add metrics to {{KafkaIO}} using the metrics API.
> Metrics (Feel free to add more ideas here) per topic/split (Where applicable):
> * Backlog in bytes.
> * Backlog in number of messages.
> * Messages consumed.
> * Bytes consumed.
> * Error counts.
> * Messages produced.
> * Time spent reading.
> * Time spent writing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1398) KafkaIO metrics

2017-02-05 Thread Aviem Zur (JIRA)
Aviem Zur created BEAM-1398:
---

 Summary: KafkaIO metrics
 Key: BEAM-1398
 URL: https://issues.apache.org/jira/browse/BEAM-1398
 Project: Beam
  Issue Type: New Feature
  Components: sdk-java-extensions
Reporter: Aviem Zur
Assignee: Davor Bonaci


Add metrics to {{KafkaIO}} using the metrics API.

Metrics (Feel free to add more ideas here) per topic/split (Where applicable):
* Backlog in bytes.
* Backlog in number of messages.
* Messages consumed.
* Bytes consumed.
* Error counts.
* Messages produced.
* Time spent reading.
* Time spent writing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1398) KafkaIO metrics

2017-02-05 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur reassigned BEAM-1398:
---

Assignee: Aviem Zur  (was: Daniel Halperin)

> KafkaIO metrics
> ---
>
> Key: BEAM-1398
> URL: https://issues.apache.org/jira/browse/BEAM-1398
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-extensions
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>
> Add metrics to {{KafkaIO}} using the metrics API.
> Metrics (Feel free to add more ideas here) per topic/split (Where applicable):
> * Backlog in bytes.
> * Backlog in number of messages.
> * Messages consumed.
> * Bytes consumed.
> * Error counts.
> * Messages produced.
> * Time spent reading.
> * Time spent writing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1397) Introduce IO metrics

2017-02-05 Thread Aviem Zur (JIRA)
Aviem Zur created BEAM-1397:
---

 Summary: Introduce IO metrics
 Key: BEAM-1397
 URL: https://issues.apache.org/jira/browse/BEAM-1397
 Project: Beam
  Issue Type: New Feature
  Components: sdk-java-core
Reporter: Aviem Zur
Assignee: Davor Bonaci


Introduce the usage of metrics API in IOs.

POC using {{CountingInput}}:
* Add metrics to {{CountingInput}}
* {{RunnableOnService}} test which creates a pipeline which asserts these 
metrics.
* Close any gaps in Direct runner and Spark runner to support these metrics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1397) Introduce IO metrics

2017-02-05 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur reassigned BEAM-1397:
---

Assignee: Daniel Halperin  (was: Davor Bonaci)

> Introduce IO metrics
> 
>
> Key: BEAM-1397
> URL: https://issues.apache.org/jira/browse/BEAM-1397
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Daniel Halperin
>
> Introduce the usage of metrics API in IOs.
> POC using {{CountingInput}}:
> * Add metrics to {{CountingInput}}
> * {{RunnableOnService}} test which creates a pipeline which asserts these 
> metrics.
> * Close any gaps in Direct runner and Spark runner to support these metrics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1397) Introduce IO metrics

2017-02-05 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur reassigned BEAM-1397:
---

Assignee: Aviem Zur  (was: Daniel Halperin)

> Introduce IO metrics
> 
>
> Key: BEAM-1397
> URL: https://issues.apache.org/jira/browse/BEAM-1397
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>
> Introduce the usage of metrics API in IOs.
> POC using {{CountingInput}}:
> * Add metrics to {{CountingInput}}
> * {{RunnableOnService}} test which creates a pipeline which asserts these 
> metrics.
> * Close any gaps in Direct runner and Spark runner to support these metrics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1126) Expose UnboundedSource split backlog in number of events

2017-02-05 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur resolved BEAM-1126.
-
   Resolution: Invalid
Fix Version/s: Not applicable

> Expose UnboundedSource split backlog in number of events
> 
>
> Key: BEAM-1126
> URL: https://issues.apache.org/jira/browse/BEAM-1126
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Daniel Halperin
>Priority: Minor
> Fix For: Not applicable
>
>
> Today {{UnboundedSource}} exposes split backlog in bytes via 
> {{getSplitBacklogBytes()}}
> There is value in exposing backlog in number of events as well, since this 
> number can be more human comprehensible than bytes. something like 
> {{getSplitBacklogEvents()}} or {{getSplitBacklogCount()}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1379) Shade Guava in beam-sdks-java-io-kafka module

2017-02-05 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur resolved BEAM-1379.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> Shade Guava in beam-sdks-java-io-kafka module
> -
>
> Key: BEAM-1379
> URL: https://issues.apache.org/jira/browse/BEAM-1379
> Project: Beam
>  Issue Type: Task
>  Components: sdk-java-extensions
>Reporter: Aviem Zur
>Assignee: Davor Bonaci
> Fix For: 0.6.0
>
>
> Shade Guava in Kafka IO module to avoid collisions with Guava versions 
> supplied in different clusters.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1334) Split UsesMetrics category and tests into UsesCommittedMetrics and UsesAttemptedMetrics

2017-02-05 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur resolved BEAM-1334.
-
   Resolution: Done
Fix Version/s: 0.6.0

> Split UsesMetrics category and tests into UsesCommittedMetrics and 
> UsesAttemptedMetrics
> ---
>
> Key: BEAM-1334
> URL: https://issues.apache.org/jira/browse/BEAM-1334
> Project: Beam
>  Issue Type: Task
>  Components: sdk-java-core
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>Priority: Minor
> Fix For: 0.6.0
>
>
> Some runners may not be able to implement both {{committed}} and 
> {{attempted}} results in {{MetricResult}}.
> Seeing this, split the current {{RunnableOnService}} test 
> {{org.apache.beam.sdk.metrics.MetricsTest#metricsReportToQuery}} into two 
> tests, which test attempted and committed results separately with categories 
> {{UsesCommittedMetrics}} and {{UsesAttemptedMetrics}} instead of the current 
> category {{UsesMetrics}}.
> Discussion that led to this can be seen in the this 
> [PR|https://github.com/apache/beam/pull/1750#issuecomment-275412984]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1395) SparkGroupAlsoByWindowFn not sorting grouped elements by timestamp

2017-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853309#comment-15853309
 ] 

ASF GitHub Bot commented on BEAM-1395:
--

GitHub user amitsela opened a pull request:

https://github.com/apache/beam/pull/1922

[BEAM-1395] SparkGroupAlsoByWindowFn not sorting grouped elements by 
timestamp

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/beam BEAM-1395

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1922.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1922


commit d2b1c678a45dd740c4ec4dbdf508564dd78e3abe
Author: Sela 
Date:   2017-02-05T18:17:24Z

[BEAM-1395] SparkGroupAlsoByWindowFn now sorts grouped elements by 
timestamp.




> SparkGroupAlsoByWindowFn not sorting grouped elements by timestamp
> --
>
> Key: BEAM-1395
> URL: https://issues.apache.org/jira/browse/BEAM-1395
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> SparkGroupAlsoByWindowFn relies on the grouped elements (pre key) to be 
> sorted by their timestamp, which is not the case, and so could cause: 
> {code}
> IllegalStateException: Cannot move input watermark time backwards
> {code}
> We should sort the values first, just like with {{Combine}} implementations: 
> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L73



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] beam pull request #1922: [BEAM-1395] SparkGroupAlsoByWindowFn not sorting gr...

2017-02-05 Thread amitsela
GitHub user amitsela opened a pull request:

https://github.com/apache/beam/pull/1922

[BEAM-1395] SparkGroupAlsoByWindowFn not sorting grouped elements by 
timestamp

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/amitsela/beam BEAM-1395

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1922.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1922


commit d2b1c678a45dd740c4ec4dbdf508564dd78e3abe
Author: Sela 
Date:   2017-02-05T18:17:24Z

[BEAM-1395] SparkGroupAlsoByWindowFn now sorts grouped elements by 
timestamp.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (BEAM-1396) GABWVOBDoFn expects grouped values to be ordered by their timestamp but there is no such guarantee

2017-02-05 Thread Daniel Halperin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853298#comment-15853298
 ] 

Daniel Halperin edited comment on BEAM-1396 at 2/5/17 5:54 PM:
---

To add a little more flavor: I'm not sure what right thing to do with unused or 
little-used code in {{runners-core}}. These are meant to be illustrative and/or 
useful to show runners how key functionality can be implemented. So, among many 
courses of action:

1. More clearly document expectations in 
{{GroupAlsoByWindowsViaOutputBufferDoFn}}
2. Possibly add another version ({{GroupAlsoByWindowsUsingSortedInputDoFn}} or 
a flag that indicates whether input is already sorted. (ViaSortedOutputBuffer)?
3. .


was (Author: dhalp...@google.com):
To add a little more flavor: I'm not sure what right thing to do with unused or 
little-used code in {{runners-core}}. These are meant to be illustrative and/or 
useful to show runners how key functionality can be implemented. So, among many 
courses of action:

1. More clearly document expectations in 
{{GroupAlsoByWindowsViaOutputBufferDoFn}}
1. Possibly add another version ({{GroupAlsoByWindowsUsingSortedInputDoFn}} or 
a flag that indicates whether input is already sorted. (ViaSortedOutputBuffer)?
1 .

> GABWVOBDoFn expects grouped values to be ordered by their timestamp but there 
> is no such guarantee
> --
>
> Key: BEAM-1396
> URL: https://issues.apache.org/jira/browse/BEAM-1396
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Amit Sela
>Assignee: Kenneth Knowles
>
> GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but 
> nothing in the SDK guarantees this: 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java#L86
> If such a chunk of timestamped values will be processed out-of-order I assume 
> we'd end up with an {{IllegalStateException}} thrown here:
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L191
> I suggest we go ahead and add sorting before processing the bundle in chunks 
> - this might prove expensive in extreme cases where a very large bundle with 
> very few keys is processed, but it seems that timestamp order is necessary.
> As for runners who provide order guarantee, since GABW is optional I don't 
> see an issue here, though [~dhalp...@google.com] suggested we add a 
> "shouldSort" flag.
> Also, probably worth creating a test for this, though it would prove 
> difficult since we would have to preset the order which is the problem to 
> begin with :-)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1396) GABWVOBDoFn expects grouped values to be ordered by their timestamp but there is no such guarantee

2017-02-05 Thread Daniel Halperin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853298#comment-15853298
 ] 

Daniel Halperin commented on BEAM-1396:
---

To add a little more flavor: I'm not sure what right thing to do with unused or 
little-used code in {{runners-core}}. These are meant to be illustrative and/or 
useful to show runners how key functionality can be implemented. So, among many 
courses of action:

1. More clearly document expectations in 
{{GroupAlsoByWindowsViaOutputBufferDoFn}}
1. Possibly add another version ({{GroupAlsoByWindowsUsingSortedInputDoFn}} or 
a flag that indicates whether input is already sorted. (ViaSortedOutputBuffer)?
1 .

> GABWVOBDoFn expects grouped values to be ordered by their timestamp but there 
> is no such guarantee
> --
>
> Key: BEAM-1396
> URL: https://issues.apache.org/jira/browse/BEAM-1396
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Amit Sela
>Assignee: Kenneth Knowles
>
> GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but 
> nothing in the SDK guarantees this: 
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java#L86
> If such a chunk of timestamped values will be processed out-of-order I assume 
> we'd end up with an {{IllegalStateException}} thrown here:
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L191
> I suggest we go ahead and add sorting before processing the bundle in chunks 
> - this might prove expensive in extreme cases where a very large bundle with 
> very few keys is processed, but it seems that timestamp order is necessary.
> As for runners who provide order guarantee, since GABW is optional I don't 
> see an issue here, though [~dhalp...@google.com] suggested we add a 
> "shouldSort" flag.
> Also, probably worth creating a test for this, though it would prove 
> difficult since we would have to preset the order which is the problem to 
> begin with :-)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1396) GABWVOBDoFn expects grouped values to be ordered by their timestamp but there is no such guarantee

2017-02-05 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1396:
---

 Summary: GABWVOBDoFn expects grouped values to be ordered by their 
timestamp but there is no such guarantee
 Key: BEAM-1396
 URL: https://issues.apache.org/jira/browse/BEAM-1396
 Project: Beam
  Issue Type: Bug
  Components: sdk-java-core
Reporter: Amit Sela
Assignee: Kenneth Knowles


GABWVOBDoFn relies on the grouped values to be ordered by their timestamp but 
nothing in the SDK guarantees this: 
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java#L86

If such a chunk of timestamped values will be processed out-of-order I assume 
we'd end up with an {{IllegalStateException}} thrown here:
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L191

I suggest we go ahead and add sorting before processing the bundle in chunks - 
this might prove expensive in extreme cases where a very large bundle with very 
few keys is processed, but it seems that timestamp order is necessary.
As for runners who provide order guarantee, since GABW is optional I don't see 
an issue here, though [~dhalp...@google.com] suggested we add a "shouldSort" 
flag.

Also, probably worth creating a test for this, though it would prove difficult 
since we would have to preset the order which is the problem to begin with :-)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1395) SparkGroupAlsoByWindowFn not sorting grouped elements by timestamp

2017-02-05 Thread Amit Sela (JIRA)
Amit Sela created BEAM-1395:
---

 Summary: SparkGroupAlsoByWindowFn not sorting grouped elements by 
timestamp
 Key: BEAM-1395
 URL: https://issues.apache.org/jira/browse/BEAM-1395
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Amit Sela
Assignee: Amit Sela


SparkGroupAlsoByWindowFn relies on the grouped elements (pre key) to be sorted 
by their timestamp, which is not the case, and so could cause: 
{code}
IllegalStateException: Cannot move input watermark time backwards
{code}

We should sort the values first, just like with {{Combine}} implementations: 
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkKeyedCombineFn.java#L73



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (BEAM-1304) NPE if trying to get value of Aggregator that does not exist.

2017-02-05 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin resolved BEAM-1304.
--
   Resolution: Fixed
Fix Version/s: 0.6.0

> NPE if trying to get value of Aggregator that does not exist.
> -
>
> Key: BEAM-1304
> URL: https://issues.apache.org/jira/browse/BEAM-1304
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Stas Levin
> Fix For: 0.6.0
>
>
> Querying a specific aggregator value in {{SparkPipelineResult}} could throw 
> NPE if the get Aggregator name doesn't exist.
> This is the root of the problem: 
> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java#L74
> {{Map#get()}} will return {{null}}.  
> We could either return {{null}}, though this could simply propagate the 
> problem upstream, or use {{Optional}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1304) NPE if trying to get value of Aggregator that does not exist.

2017-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853232#comment-15853232
 ] 

ASF GitHub Bot commented on BEAM-1304:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1921


> NPE if trying to get value of Aggregator that does not exist.
> -
>
> Key: BEAM-1304
> URL: https://issues.apache.org/jira/browse/BEAM-1304
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Stas Levin
>
> Querying a specific aggregator value in {{SparkPipelineResult}} could throw 
> NPE if the get Aggregator name doesn't exist.
> This is the root of the problem: 
> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java#L74
> {{Map#get()}} will return {{null}}.  
> We could either return {{null}}, though this could simply propagate the 
> problem upstream, or use {{Optional}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] beam pull request #1921: [BEAM-1304] Checking for nullity before trying to o...

2017-02-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1921


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #1921

2017-02-05 Thread staslevin
This closes #1921


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c442ef81
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c442ef81
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c442ef81

Branch: refs/heads/master
Commit: c442ef81aa5bfbe84c0e3344ed8dc1d15d6e9a36
Parents: e5afbb2 882c654
Author: Stas Levin 
Authored: Sun Feb 5 15:51:27 2017 +0200
Committer: Stas Levin 
Committed: Sun Feb 5 15:51:27 2017 +0200

--
 .../runners/spark/aggregators/NamedAggregators.java|  6 --
 .../aggregators/metrics/sink/NamedAggregatorsTest.java | 13 +
 2 files changed, 17 insertions(+), 2 deletions(-)
--




[1/2] beam git commit: [BEAM-1304] Checking for nullity before trying to obtain an aggregator's value.

2017-02-05 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master e5afbb27f -> c442ef81a


[BEAM-1304] Checking for nullity before trying to obtain an aggregator's value.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/882c654b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/882c654b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/882c654b

Branch: refs/heads/master
Commit: 882c654b1a8aefd2e4281d786448734731db7816
Parents: e5afbb2
Author: Stas Levin 
Authored: Sun Feb 5 12:17:35 2017 +0200
Committer: Stas Levin 
Committed: Sun Feb 5 15:51:18 2017 +0200

--
 .../runners/spark/aggregators/NamedAggregators.java|  6 --
 .../aggregators/metrics/sink/NamedAggregatorsTest.java | 13 +
 2 files changed, 17 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/882c654b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index b5aec32..c876c07 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -68,10 +68,12 @@ public class NamedAggregators implements Serializable {
* @param name  Name of aggregator to retrieve.
* @param typeClass Type class to cast the value to.
* @paramType to be returned.
-   * @return the value of the aggregator associated with the specified name
+   * @return the value of the aggregator associated with the specified name,
+   * or null if the specified aggregator could not be found.
*/
   public  T getValue(String name, Class typeClass) {
-return typeClass.cast(mNamedAggregators.get(name).render());
+final State state = mNamedAggregators.get(name);
+return state != null ? typeClass.cast(state.render()) : null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/882c654b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
index 3b5dd21..8646510 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -28,10 +28,13 @@ import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule;
+import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import 
org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -95,4 +98,14 @@ public class NamedAggregatorsTest {
 assertThat(InMemoryMetrics.valueOf("emptyLines"), is(1d));
 
   }
+
+  @Test
+  public void testNonExistingAggregatorName() throws Exception {
+final SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
+final Long valueOf =
+SparkAggregators.valueOf(
+"myMissingAggregator", Long.class, 
SparkContextFactory.getSparkContext(options));
+
+assertThat(valueOf, is(nullValue()));
+  }
 }



[jira] [Assigned] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-05 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1393:
--

Assignee: Jingsong Lee

> Update Flink Runner to Flink 1.2.0
> --
>
> Key: BEAM-1393
> URL: https://issues.apache.org/jira/browse/BEAM-1393
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> When we update to 1.2.0 we can use the new internal Timer API that is 
> available to Flink operators: {{InternalTimerService}} and also use broadcast 
> state to store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1394) Use Flink InternalTimerService for TimerInternals

2017-02-05 Thread Jingsong Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned BEAM-1394:
--

Assignee: Jingsong Lee

> Use Flink InternalTimerService for TimerInternals
> -
>
> Key: BEAM-1394
> URL: https://issues.apache.org/jira/browse/BEAM-1394
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Jingsong Lee
>
> When updating our Flink version to 1.2 we can use the new internal timer API 
> for both the windowing and for wiring in the Beam user-facing Timer API.
> By using the internal timer API we make operators rescalable, that is, we can 
> change the parallelism of a running Beam on Flink job by performing a 
> savepoint and then restarting with a different parallelism.
> An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
> {{open()}} using:
> {code}
> /**
>  * Returns a {@link InternalTimerService} that can be used to query current 
> processing time
>  * and event time and to set timers. An operator can have several timer 
> services, where
>  * each has its own namespace serializer. Timer services are differentiated 
> by the string
>  * key that is given when requesting them, if you call this method with the 
> same key
>  * multiple times you will get the same timer service instance in subsequent 
> requests.
>  *
>  * Timers are always scoped to a key, the currently active key of a keyed 
> stream operation.
>  * When a timer fires, this key will also be set as the currently active key.
>  *
>  * Each timer has attached metadata, the namespace. Different timer 
> services
>  * can have a different namespace type. If you don't need namespace 
> differentiation you
>  * can use {@link VoidNamespaceSerializer} as the namespace serializer.
>  *
>  * @param name The name of the requested timer service. If no service exists 
> under the given
>  * name a new one will be created and returned.
>  * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
>  * @param triggerable The {@link Triggerable} that should be invoked when 
> timers fire
>  *
>  * @param  The type of the timer namespace.
>  */
> public  InternalTimerService getInternalTimerService(
> String name,
> TypeSerializer namespaceSerializer,
> Triggerable triggerable);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1304) NPE if trying to get value of Aggregator that does not exist.

2017-02-05 Thread Stas Levin (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stas Levin reassigned BEAM-1304:


Assignee: Stas Levin  (was: Amit Sela)

> NPE if trying to get value of Aggregator that does not exist.
> -
>
> Key: BEAM-1304
> URL: https://issues.apache.org/jira/browse/BEAM-1304
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Stas Levin
>
> Querying a specific aggregator value in {{SparkPipelineResult}} could throw 
> NPE if the get Aggregator name doesn't exist.
> This is the root of the problem: 
> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java#L74
> {{Map#get()}} will return {{null}}.  
> We could either return {{null}}, though this could simply propagate the 
> problem upstream, or use {{Optional}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-1393:
--

Assignee: (was: Aljoscha Krettek)

> Update Flink Runner to Flink 1.2.0
> --
>
> Key: BEAM-1393
> URL: https://issues.apache.org/jira/browse/BEAM-1393
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>
> When we update to 1.2.0 we can use the new internal Timer API that is 
> available to Flink operators: {{InternalTimerService}} and also use broadcast 
> state to store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (BEAM-1394) Use Flink InternalTimerService for TimerInternals

2017-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned BEAM-1394:
--

Assignee: (was: Aljoscha Krettek)

> Use Flink InternalTimerService for TimerInternals
> -
>
> Key: BEAM-1394
> URL: https://issues.apache.org/jira/browse/BEAM-1394
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>
> When updating our Flink version to 1.2 we can use the new internal timer API 
> for both the windowing and for wiring in the Beam user-facing Timer API.
> By using the internal timer API we make operators rescalable, that is, we can 
> change the parallelism of a running Beam on Flink job by performing a 
> savepoint and then restarting with a different parallelism.
> An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
> {{open()}} using:
> {code}
> /**
>  * Returns a {@link InternalTimerService} that can be used to query current 
> processing time
>  * and event time and to set timers. An operator can have several timer 
> services, where
>  * each has its own namespace serializer. Timer services are differentiated 
> by the string
>  * key that is given when requesting them, if you call this method with the 
> same key
>  * multiple times you will get the same timer service instance in subsequent 
> requests.
>  *
>  * Timers are always scoped to a key, the currently active key of a keyed 
> stream operation.
>  * When a timer fires, this key will also be set as the currently active key.
>  *
>  * Each timer has attached metadata, the namespace. Different timer 
> services
>  * can have a different namespace type. If you don't need namespace 
> differentiation you
>  * can use {@link VoidNamespaceSerializer} as the namespace serializer.
>  *
>  * @param name The name of the requested timer service. If no service exists 
> under the given
>  * name a new one will be created and returned.
>  * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
>  * @param triggerable The {@link Triggerable} that should be invoked when 
> timers fire
>  *
>  * @param  The type of the timer namespace.
>  */
> public  InternalTimerService getInternalTimerService(
> String name,
> TypeSerializer namespaceSerializer,
> Triggerable triggerable);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (BEAM-1394) Use Flink InternalTimerService for TimerInternals

2017-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated BEAM-1394:
---
Description: 
When updating our Flink version to 1.2 we can use the new internal timer API 
for both the windowing and for wiring in the Beam user-facing Timer API.

By using the internal timer API we make operators rescalable, that is, we can 
change the parallelism of a running Beam on Flink job by performing a savepoint 
and then restarting with a different parallelism.

An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
{{open()}} using:
{code}
/**
 * Returns a {@link InternalTimerService} that can be used to query current 
processing time
 * and event time and to set timers. An operator can have several timer 
services, where
 * each has its own namespace serializer. Timer services are differentiated by 
the string
 * key that is given when requesting them, if you call this method with the 
same key
 * multiple times you will get the same timer service instance in subsequent 
requests.
 *
 * Timers are always scoped to a key, the currently active key of a keyed 
stream operation.
 * When a timer fires, this key will also be set as the currently active key.
 *
 * Each timer has attached metadata, the namespace. Different timer services
 * can have a different namespace type. If you don't need namespace 
differentiation you
 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
 *
 * @param name The name of the requested timer service. If no service exists 
under the given
 * name a new one will be created and returned.
 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
 * @param triggerable The {@link Triggerable} that should be invoked when 
timers fire
 *
 * @param  The type of the timer namespace.
 */
public  InternalTimerService getInternalTimerService(
String name,
TypeSerializer namespaceSerializer,
Triggerable triggerable);
{code}

  was:
When updating our Flink version to 1.2 we can use the new internal timer API 
for both the windowing and for wiring in the Beam user-facing Timer API.

An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
{{open()}} using:
{code}
/**
 * Returns a {@link InternalTimerService} that can be used to query current 
processing time
 * and event time and to set timers. An operator can have several timer 
services, where
 * each has its own namespace serializer. Timer services are differentiated by 
the string
 * key that is given when requesting them, if you call this method with the 
same key
 * multiple times you will get the same timer service instance in subsequent 
requests.
 *
 * Timers are always scoped to a key, the currently active key of a keyed 
stream operation.
 * When a timer fires, this key will also be set as the currently active key.
 *
 * Each timer has attached metadata, the namespace. Different timer services
 * can have a different namespace type. If you don't need namespace 
differentiation you
 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
 *
 * @param name The name of the requested timer service. If no service exists 
under the given
 * name a new one will be created and returned.
 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
 * @param triggerable The {@link Triggerable} that should be invoked when 
timers fire
 *
 * @param  The type of the timer namespace.
 */
public  InternalTimerService getInternalTimerService(
String name,
TypeSerializer namespaceSerializer,
Triggerable triggerable);
{code}


> Use Flink InternalTimerService for TimerInternals
> -
>
> Key: BEAM-1394
> URL: https://issues.apache.org/jira/browse/BEAM-1394
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> When updating our Flink version to 1.2 we can use the new internal timer API 
> for both the windowing and for wiring in the Beam user-facing Timer API.
> By using the internal timer API we make operators rescalable, that is, we can 
> change the parallelism of a running Beam on Flink job by performing a 
> savepoint and then restarting with a different parallelism.
> An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
> {{open()}} using:
> {code}
> /**
>  * Returns a {@link InternalTimerService} that can be used to query current 
> processing time
>  * and event time and to set timers. An operator can have several timer 
> services, where
>  * each has its own namespace serializer. Timer services are differentiated 
> by the string
>  * key that is given when requesting them, if you call this method with the 
> same key
>  *

[jira] [Commented] (BEAM-1394) Use Flink InternalTimerService for TimerInternals

2017-02-05 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853190#comment-15853190
 ] 

Aljoscha Krettek commented on BEAM-1394:


The new timer API is only available starting from Flink 1.2.

> Use Flink InternalTimerService for TimerInternals
> -
>
> Key: BEAM-1394
> URL: https://issues.apache.org/jira/browse/BEAM-1394
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> When updating our Flink version to 1.2 we can use the new internal timer API 
> for both the windowing and for wiring in the Beam user-facing Timer API.
> An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
> {{open()}} using:
> {code}
> /**
>  * Returns a {@link InternalTimerService} that can be used to query current 
> processing time
>  * and event time and to set timers. An operator can have several timer 
> services, where
>  * each has its own namespace serializer. Timer services are differentiated 
> by the string
>  * key that is given when requesting them, if you call this method with the 
> same key
>  * multiple times you will get the same timer service instance in subsequent 
> requests.
>  *
>  * Timers are always scoped to a key, the currently active key of a keyed 
> stream operation.
>  * When a timer fires, this key will also be set as the currently active key.
>  *
>  * Each timer has attached metadata, the namespace. Different timer 
> services
>  * can have a different namespace type. If you don't need namespace 
> differentiation you
>  * can use {@link VoidNamespaceSerializer} as the namespace serializer.
>  *
>  * @param name The name of the requested timer service. If no service exists 
> under the given
>  * name a new one will be created and returned.
>  * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
>  * @param triggerable The {@link Triggerable} that should be invoked when 
> timers fire
>  *
>  * @param  The type of the timer namespace.
>  */
> public  InternalTimerService getInternalTimerService(
> String name,
> TypeSerializer namespaceSerializer,
> Triggerable triggerable);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1394) Use Flink InternalTimerService for TimerInternals

2017-02-05 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-1394:
--

 Summary: Use Flink InternalTimerService for TimerInternals
 Key: BEAM-1394
 URL: https://issues.apache.org/jira/browse/BEAM-1394
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


When updating our Flink version to 1.2 we can use the new internal timer API 
for both the windowing and for wiring in the Beam user-facing Timer API.

An {{InternalTimerService}} can be retrieved by a Flink operator (mostly in 
{{open()}} using:
{code}
/**
 * Returns a {@link InternalTimerService} that can be used to query current 
processing time
 * and event time and to set timers. An operator can have several timer 
services, where
 * each has its own namespace serializer. Timer services are differentiated by 
the string
 * key that is given when requesting them, if you call this method with the 
same key
 * multiple times you will get the same timer service instance in subsequent 
requests.
 *
 * Timers are always scoped to a key, the currently active key of a keyed 
stream operation.
 * When a timer fires, this key will also be set as the currently active key.
 *
 * Each timer has attached metadata, the namespace. Different timer services
 * can have a different namespace type. If you don't need namespace 
differentiation you
 * can use {@link VoidNamespaceSerializer} as the namespace serializer.
 *
 * @param name The name of the requested timer service. If no service exists 
under the given
 * name a new one will be created and returned.
 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
 * @param triggerable The {@link Triggerable} that should be invoked when 
timers fire
 *
 * @param  The type of the timer namespace.
 */
public  InternalTimerService getInternalTimerService(
String name,
TypeSerializer namespaceSerializer,
Triggerable triggerable);
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (BEAM-1393) Update Flink Runner to Flink 1.2.0

2017-02-05 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created BEAM-1393:
--

 Summary: Update Flink Runner to Flink 1.2.0
 Key: BEAM-1393
 URL: https://issues.apache.org/jira/browse/BEAM-1393
 Project: Beam
  Issue Type: Improvement
  Components: runner-flink
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek


When we update to 1.2.0 we can use the new internal Timer API that is available 
to Flink operators: {{InternalTimerService}} and also use broadcast state to 
store side-input data.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1304) NPE if trying to get value of Aggregator that does not exist.

2017-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853185#comment-15853185
 ] 

ASF GitHub Bot commented on BEAM-1304:
--

GitHub user staslev opened a pull request:

https://github.com/apache/beam/pull/1921

[BEAM-1304] Checking for nullity before trying to obtain an aggregator's 
value.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

R: @amitsela 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/staslev/beam 
BEAM-1304-fixing-NPE-for-values-of-missing-aggregators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1921.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1921


commit 658044932eea5e8566fd2166356d05302b4fd79b
Author: Stas Levin 
Date:   2017-02-05T10:17:35Z

[BEAM-1304] Checking for nullity before trying to obtain an aggregator's 
value.




> NPE if trying to get value of Aggregator that does not exist.
> -
>
> Key: BEAM-1304
> URL: https://issues.apache.org/jira/browse/BEAM-1304
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Amit Sela
>Assignee: Amit Sela
>
> Querying a specific aggregator value in {{SparkPipelineResult}} could throw 
> NPE if the get Aggregator name doesn't exist.
> This is the root of the problem: 
> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java#L74
> {{Map#get()}} will return {{null}}.  
> We could either return {{null}}, though this could simply propagate the 
> problem upstream, or use {{Optional}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] beam pull request #1921: [BEAM-1304] Checking for nullity before trying to o...

2017-02-05 Thread staslev
GitHub user staslev opened a pull request:

https://github.com/apache/beam/pull/1921

[BEAM-1304] Checking for nullity before trying to obtain an aggregator's 
value.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

R: @amitsela 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/staslev/beam 
BEAM-1304-fixing-NPE-for-values-of-missing-aggregators

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1921.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1921


commit 658044932eea5e8566fd2166356d05302b4fd79b
Author: Stas Levin 
Date:   2017-02-05T10:17:35Z

[BEAM-1304] Checking for nullity before trying to obtain an aggregator's 
value.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (BEAM-1392) DoFn teardown not called on empty partitions

2017-02-05 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela resolved BEAM-1392.
-
   Resolution: Fixed
Fix Version/s: 0.6.0

> DoFn teardown not called on empty partitions
> 
>
> Key: BEAM-1392
> URL: https://issues.apache.org/jira/browse/BEAM-1392
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
> Fix For: 0.6.0
>
>
> DoFn teardown is not called on empty partitions.
> Discovered in a long running Kafka pipeline (read from Kafka -> write to 
> Kafka) where some partitions did not have data and Kafka producers where 
> being created but not closed, eventually resulting in {{Too many open files}} 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1392) DoFn teardown not called on empty partitions

2017-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853183#comment-15853183
 ] 

ASF GitHub Bot commented on BEAM-1392:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1920


> DoFn teardown not called on empty partitions
> 
>
> Key: BEAM-1392
> URL: https://issues.apache.org/jira/browse/BEAM-1392
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>
> DoFn teardown is not called on empty partitions.
> Discovered in a long running Kafka pipeline (read from Kafka -> write to 
> Kafka) where some partitions did not have data and Kafka producers where 
> being created but not closed, eventually resulting in {{Too many open files}} 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[1/2] beam git commit: [BEAM-1392] DoFn teardown not called on empty partitions

2017-02-05 Thread amitsela
Repository: beam
Updated Branches:
  refs/heads/master 6e220bb37 -> e5afbb27f


[BEAM-1392] DoFn teardown not called on empty partitions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/25f91353
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/25f91353
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/25f91353

Branch: refs/heads/master
Commit: 25f913536019aff80d189cf7136f2acc3b6db7c2
Parents: 6e220bb
Author: Aviem Zur 
Authored: Sun Feb 5 11:22:00 2017 +0200
Committer: Sela 
Committed: Sun Feb 5 11:35:31 2017 +0200

--
 .../apache/beam/runners/spark/translation/SparkProcessContext.java | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/25f91353/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 9957bf3..60c9d4d 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -62,8 +62,10 @@ class SparkProcessContext {
 
 // skip if partition is empty.
 if (!partition.hasNext()) {
+  DoFnInvokers.invokerFor(doFn).invokeTeardown();
   return Lists.newArrayList();
 }
+
 // call startBundle() before beginning to process the partition.
 doFnRunner.startBundle();
 // process the partition; finishBundle() is called from within the output 
iterator.



[GitHub] beam pull request #1920: [BEAM-1392] DoFn teardown not called on empty parti...

2017-02-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/1920


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] beam git commit: This closes #1920

2017-02-05 Thread amitsela
This closes #1920


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e5afbb27
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e5afbb27
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e5afbb27

Branch: refs/heads/master
Commit: e5afbb27f4a5d22a4bb29a6a336b92fa593c89ce
Parents: 6e220bb 25f9135
Author: Sela 
Authored: Sun Feb 5 11:36:28 2017 +0200
Committer: Sela 
Committed: Sun Feb 5 11:36:28 2017 +0200

--
 .../apache/beam/runners/spark/translation/SparkProcessContext.java | 2 ++
 1 file changed, 2 insertions(+)
--




[jira] [Assigned] (BEAM-1354) TextIO should comply with PTransform style guide

2017-02-05 Thread Aviem Zur (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-1354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur reassigned BEAM-1354:
---

Assignee: Aviem Zur

> TextIO should comply with PTransform style guide
> 
>
> Key: BEAM-1354
> URL: https://issues.apache.org/jira/browse/BEAM-1354
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Eugene Kirpichov
>Assignee: Aviem Zur
>  Labels: backward-incompatible
>
> * TextIO.Read,Write.Bound,Unbound - Bound,Unbound are banned names. Instead, 
> TextIO should make the user specify the type parameters explicitly, and have 
> simply TextIO.Read and TextIO.Write themselves be the transform tyles.
> * Both should take simply String, and not use Coder as a general-purpose 
> serialization mechanism.
> ** The Javadoc should be changed to reflect this.
> * Should perhaps use AutoValue for parameter builders



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (BEAM-1392) DoFn teardown not called on empty partitions

2017-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853176#comment-15853176
 ] 

ASF GitHub Bot commented on BEAM-1392:
--

GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/1920

[BEAM-1392] DoFn teardown not called on empty partitions

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/incubator-beam 
invoke-teardown-on-empty-partition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1920.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1920


commit cf1ea054cedf29ef3bb804410eb56f17f8ff1b37
Author: Aviem Zur 
Date:   2017-02-05T09:22:00Z

[BEAM-1392] DoFn teardown not called on empty partitions




> DoFn teardown not called on empty partitions
> 
>
> Key: BEAM-1392
> URL: https://issues.apache.org/jira/browse/BEAM-1392
> Project: Beam
>  Issue Type: Bug
>  Components: runner-spark
>Reporter: Aviem Zur
>Assignee: Aviem Zur
>
> DoFn teardown is not called on empty partitions.
> Discovered in a long running Kafka pipeline (read from Kafka -> write to 
> Kafka) where some partitions did not have data and Kafka producers where 
> being created but not closed, eventually resulting in {{Too many open files}} 
> exception.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] beam pull request #1920: [BEAM-1392] DoFn teardown not called on empty parti...

2017-02-05 Thread aviemzur
GitHub user aviemzur opened a pull request:

https://github.com/apache/beam/pull/1920

[BEAM-1392] DoFn teardown not called on empty partitions

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aviemzur/incubator-beam 
invoke-teardown-on-empty-partition

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/1920.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1920


commit cf1ea054cedf29ef3bb804410eb56f17f8ff1b37
Author: Aviem Zur 
Date:   2017-02-05T09:22:00Z

[BEAM-1392] DoFn teardown not called on empty partitions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (BEAM-1392) DoFn teardown not called on empty partitions

2017-02-05 Thread Aviem Zur (JIRA)
Aviem Zur created BEAM-1392:
---

 Summary: DoFn teardown not called on empty partitions
 Key: BEAM-1392
 URL: https://issues.apache.org/jira/browse/BEAM-1392
 Project: Beam
  Issue Type: Bug
  Components: runner-spark
Reporter: Aviem Zur
Assignee: Aviem Zur


DoFn teardown is not called on empty partitions.

Discovered in a long running Kafka pipeline (read from Kafka -> write to Kafka) 
where some partitions did not have data and Kafka producers where being created 
but not closed, eventually resulting in {{Too many open files}} exception.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)