Re: [Discuss] Propose Calcite Vendor Release

2019-08-21 Thread Kai Jiang
Thanks Rui! For sure, any objections should be resolved before releasing.

On Wed, Aug 21, 2019 at 10:24 PM Rui Wang  wrote:

> I can be the release manager to help release vendor calcite. Per [1],
> before we start a release, we have to reach consensus before starting a
> release.
>
>
> [1]: https://s.apache.org/beam-release-vendored-artifacts
>
> -Rui
>
> On Wed, Aug 21, 2019 at 5:00 PM Kai Jiang  wrote:
>
>> Hi Community,
>>
>> As a part of effort to unblock for vendor calcite in SQL module, we broke
>> it into pull/9333  for going
>> through vendored dependencies release process separately.
>>
>> I want to propose Calcite vendor release and look for a release manager
>> to help with the release process.
>>
>> Best,
>> Kai
>>
>


Re: [Discuss] Propose Calcite Vendor Release

2019-08-21 Thread Rui Wang
I can be the release manager to help release vendor calcite. Per [1],
before we start a release, we have to reach consensus before starting a
release.


[1]: https://s.apache.org/beam-release-vendored-artifacts

-Rui

On Wed, Aug 21, 2019 at 5:00 PM Kai Jiang  wrote:

> Hi Community,
>
> As a part of effort to unblock for vendor calcite in SQL module, we broke
> it into pull/9333  for going
> through vendored dependencies release process separately.
>
> I want to propose Calcite vendor release and look for a release manager to
> help with the release process.
>
> Best,
> Kai
>


Re: Python Beam pipelines on Flink on Kubernetes

2019-08-21 Thread Thomas Weise
The changes to containerize the Python SDK worker pool are nearly complete.
I also updated the document for next implementation steps.

The favored approach (initially targeted option) for pipeline submission is
support for the (externally created) fat far. It will keep changes to the
operator to a minimum and is applicable for any other Flink job as well.

Regarding fine grained resource scheduling, that would happen within the
pods scheduled by the k8s operator (or other external orchestration tool)
or, further down the road, in a completely elastic/dynamic fashion with
active execution mode (where Flink would request resources directly from
k8s, similar to how it would work on YARN).


On Tue, Aug 13, 2019 at 10:59 AM Chad Dombrova  wrote:

> Hi Thomas,
> Nice work!  It's really clearly presented.
>
> What's the current favored approach for pipeline submission?
>
> I'm also interested to know how this plan overlaps (if at all) with the
> work on Fine-Grained Resource Scheduling [1][2] that's being done for Flink
> 1.9+, which has implications for creation of task managers in kubernetes.
>
> [1]
> https://docs.google.com/document/d/1h68XOG-EyOFfcomd2N7usHK1X429pJSMiwZwAXCwx1k/edit#heading=h.72szmms7ufza
> [2] https://issues.apache.org/jira/browse/FLINK-12761
>
> -chad
>
>
> On Tue, Aug 13, 2019 at 9:14 AM Thomas Weise  wrote:
>
>> There have been a few comments in the doc and I'm going to start working
>> on the worker execution part.
>>
>> Instead of running a Docker container for each worker, the preferred
>> option is to use the worker pool:
>>
>>
>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898
>>
>> There are some notes at the end of the doc regarding the implementation.
>> It would be great if those interested in the environments and Python docker
>> container could take a look. In a nutshell, the proposal is to make few
>> changes to the Python container so that it (optionally) can be used to run
>> the worker pool.
>>
>> Thanks,
>> Thomas
>>
>>
>> On Wed, Jul 24, 2019 at 9:42 PM Pablo Estrada  wrote:
>>
>>> I am very happy to see this. I'll take a look, and leave my comments.
>>>
>>> I think this is something we'd been needing, and it's great that you
>>> guys are putting thought into it. Thanks!<3
>>>
>>> On Wed, Jul 24, 2019 at 9:01 PM Thomas Weise  wrote:
>>>
 Hi,

 Recently Lyft open sourced *FlinkK8sOperator,* a Kubernetes operator
 to manage Flink deployments on Kubernetes:

 https://github.com/lyft/flinkk8soperator/

 We are now discussing how to extend this operator to also support
 deployment of Python Beam pipelines with the Flink runner. I would like to
 share the proposal with the Beam community to enlist feedback as well as
 explore opportunities for collaboration:


 https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/

 Looking forward to your comments and suggestions!

 Thomas




[Discuss] Propose Calcite Vendor Release

2019-08-21 Thread Kai Jiang
Hi Community,

As a part of effort to unblock for vendor calcite in SQL module, we broke
it into pull/9333  for going
through vendored dependencies release process separately.

I want to propose Calcite vendor release and look for a release manager to
help with the release process.

Best,
Kai


Re: [DISCUSS] Making consistent use of Optionals

2019-08-21 Thread Kenneth Knowles
As mentioned on PR, I'm not convinced by Flink's discussion and all
evidence I know has shown it to have non-measurable performance impact.

I'm OK either way, though, at this point.

 - Whatever the consensus, let us set up checkstyle/analysis so that we
maintain compatible across the codebase.
 - We should see what we can do to enforce not using non-serializable
Optional in fields.
 - In special cases where you really do want an Optional stored or
transmitted we could make our own SerializableOptional to still keep it
compatible (example: the common Map> representing the three
states of --foo=v --foo=null and no foo specified)
 - Always using null & @Nullable for possible-empty fields will make the
codebase poorer overall, and NPEs are still a user-facing problem that hits
us pretty often. We should redouble our efforts to have a correct and fully
strict analysis of these.

Kenn

On Wed, Aug 21, 2019 at 1:09 PM Jan Lukavský  wrote:

> Sorry, forgot to add link to the Flink discussion [1].
>
> [1]
>
> https://lists.apache.org/thread.html/f5f8ce92f94c9be6774340fbd7ae5e4afe07386b6765ad3cfb13aec0@%3Cdev.flink.apache.org%3E
>
> On 8/21/19 10:08 PM, Jan Lukavský wrote:
> > Hi,
> >
> > sorry if this discussion have been already taken, but I'd like to know
> > others opinions about how we use Optionals. The state in current
> > master is as follows:
> >
> > $ git grep "import" | grep "java.util.Optional" | wc -l
> > 85
> > $ git grep "import" | grep "Optional" | grep guava | wc -l
> > 45
> >
> > I'd like to propose that we use only one Optional, for consistency.
> > There are arguments towards using each one of them, if I try to sum
> > these up:
> >
> > Pros for java.util:
> >
> >  * Part of standard lib
> >
> >  * Will (in the future) probably better integrate with other standard
> > APIs (e.g. Optional.stream in JDK 9, but probably more to come)
> >
> > Pros for guava:
> >
> >  * Guava's Optional is Serializable
> >
> > There was recently a discussion on Flink's mailing list [1], which
> > arrived at a conclusion, that using Optional as a field should be
> > discouraged (in favor of @Nullable). That would imply that
> > non-serializability of java.util.Optional is not an issue. But maybe
> > we can arrive at a different conclusion.
> >
> > WDYT?
> >
> >  Jan
> >
>


Re: Brief of interactive Beam

2019-08-21 Thread GMAIL
Thanks for the input, Robert!

> On Aug 21, 2019, at 11:49 AM, Robert Bradshaw  wrote:
> 
> On Wed, Aug 14, 2019 at 11:29 AM Ning Kang  > wrote:
> Ahmet, thanks for forwarding!
>  
> My main concern at this point is the introduction of new concepts, even 
> though these are not changing other parts of the Beam SDKs. It would be good 
> to see at least an alternative option covered in the design document. The 
> reason is each additional concept adds to the mental load of users. And also 
> concepts from interactive Beam will shift user's expectations of Beam even 
> though there are not direct SDK modifications.
> 
> Hi Robert. About the concern, I think I have a few points:
> Interactive Beam (or Interactive Runner) is already an existing "new concept" 
> that normal Beam user could opt-in if they want an interactive Beam 
> experience. They need to do lots of setup steps and learn new things such as 
> Jupyter notebook and at least interactive_runner module to make it work and 
> make use of it.
> I think we should start with the perspective that most users interested in 
> using Beam interactively already know about Jupyter notebooks, or at least 
> ipython, and would want to use it to learn (and more effectively use) Beam. 
Yes, I agree with the perspective for users who are familiar with notebook. Yet 
it doesn’t prevent us from creating ready-to-use containers (such as binder 
)  for users who want to try Beam 
interactively without setting up a environment with all the dependencies 
interactive Beam introduces. I agree that experienced users understand how to 
set up additional dependencies and read examples, it’s just we are also 
targeting other entry level audiences.
But back to the original topic, the design is not trying to add new concept, 
but fixing some rough edges of existing Interactive Beam features. We can 
discuss whether a factory of create_pipeline() is really desired and decide 
whether to expose it later. We hope the interactive_beam module to be the only 
module an Interactive Beam user would directly invoke in their notebook.
> The behavior of existing interactive Beam is different from normal Beam 
> because of the interactive nature and the users would expect that. And the 
> users wouldn't shift their expectation of normal Beam. Just like running 
> Python scripts might result in different behavior than running all of them in 
> an interactive Python session.
> I'm not quite following this. One of the advantages strengths of Python is 
> that lack of the difference between the interactive vs. non-interactive 
> behavior. (The fact that a script's execution is always in top to bottom 
> order, unlike a notebook, is the primary difference.)
Sorry for the confusion. What I’m saying is about the hidden states. Running 
several Python scripts from top to bottom in an IPython session might generate 
different effects than running them in the same order normally. Say if you have 
an in-memory global configuration that is shared among all the scripts and if 
it’s missing, a script initializes one. Running the scripts in IPython will 
pass the initialization and modification of configuration along the scripts. 
While running the scripts one by one will initialize different configurations. 
Running cells in a notebook is equivalent to appending the cells into a script 
and run it. The interactivity is not about the order, but if there is hidden 
states preserved between each statement or each script execution. And the users 
should expect that there might be hidden states when they are in an interactive 
environment because that is exactly the interactivity they expect. However, 
they don’t hold the hidden states, the session does it for them. A user 
wouldn’t need to explicitly say “preserve the variable x I’ve defined in this 
cell because I want to reuse it in some other cells I’m going to execute”. The 
user can directly access variable x once the cell defining x is executed. And 
even if the user deletes the cell defining x, x still exists. At that stage, no 
one would know there is a variable x in memory by just looking at the notebook. 
One would see a missing execution sequence (on top left of each executed cell) 
and wonder where the piece of code executed goes.
Above is just for interactivity, not Beam.

Another interesting thing is that a named PTransform cannot be applied to the 
same pipeline more than once. It means a cell with named PTransform:  p | 
“Name” >> NamedPTransform() cannot be re-executed. We might want to support 
such re-execution if the pipeline is in an interactive mode. In that case, the 
Beam behavior might be different from non-interactive Beam. 
> Or if a user runs a Beam pipeline with direct runner, they should expect the 
> behavior be different from running it on Dataflow while a user needs GCP 
> account. I think the users are aware of the difference when they choose to 
> use Interac

Re: Query about JdbcIO.readRows()

2019-08-21 Thread Kenneth Knowles
Hi Kishor,

If you could not find a Jira, would you file one? Your contribution would
be very appreciated.

Kenn

On Tue, Aug 20, 2019 at 10:04 PM Kishor Joshi  wrote:

> Hi,
>
> This fix is still not available in the Beam 2.15.0. Is there any Jira that
> has been created for this issue ? I am interested to contribute in that.
>
> Thanks & Regards,
> Kishor
>
> On Friday, August 2, 2019, 10:19:17 PM GMT+5:30, Jean-Baptiste Onofré <
> j...@nanthrax.net> wrote:
>
>
> Agree. I will fix that.
>
> Regards
> JB
> Le 2 août 2019, à 17:15, Vishwas Bm  a écrit:
>
> Hi Kishor,
>
> + dev ( dev@beam.apache.org)
>
> This looks like a bug.  The attribute statementPreparator is nullable
> It should have been handled in the same way as in the expand method of
> Read class.
>
>
> *Thanks & Regards,*
>
> *Vishwas *
>
>
> On Fri, Aug 2, 2019 at 2:48 PM Kishor Joshi < joshi...@yahoo.com> wrote:
>
> Hi,
>
> I am using the just released 2.14 version for JdbcIO with the newly added
> "readRows" functionality.
>
> I want to read table data with a query without parameters (select * from
> table_name).
> As per my understanding, this should not require "StatementPreperator".
> However, if I use the newly added "readRows" function, I get an exception
> that seems to force me to use the "StatementPreperator".
> Stacktrace below.
>
> java.lang.IllegalArgumentException: statementPreparator can not be null
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
>
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$Read.withStatementPreparator(JdbcIO.java:600)
>
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.expand(JdbcIO.java:499)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.expand(JdbcIO.java:410)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
> at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
> at
> com.nokia.csf.dfle.transforms.DfleRdbmsSource.expand(DfleRdbmsSource.java:34)
>
> at
> com.nokia.csf.dfle.transforms.DfleRdbmsSource.expand(DfleRdbmsSource.java:10)
>
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
> at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
> at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:182)
> at
> com.nokia.csf.dfle.dsl.DFLEBeamMain.dagWireUp(DFLEBeamMain.java:49)
> at com.nokia.csf.dfle.dsl.DFLEBeamMain.main(DFLEBeamMain.java:120)
>
>
>
> The test added in JdbcIOTest.java for this functionality only tests for
> queries with parameters.
> Is this new function supported only in the above case and not for normal
> "withQuery" (without parameters) ?
>
>
> Thanks & regards,
> Kishor
>
>


Re: [VOTE] Release 2.15.0, release candidate #2

2019-08-21 Thread Kenneth Knowles
The website build does still depend on being within a git repository.

On Wed, Aug 21, 2019 at 3:13 PM Kenneth Knowles  wrote:

> +1
>
> My mistake. I was on the wrong git commit and read the wrong gradle
> version from the spec for the wrapper.
>
> On Wed, Aug 21, 2019 at 3:08 PM Kenneth Knowles  wrote:
>
>> Trying to build from the actual source release, and having challenges.
>> The gradle wrapper is absent due to licensing, so I built with fresh Gradle
>> 4.10.3. This did not succeed, with errors that I am still looking into. I
>> copied in the wrapper from the git repo and confirmed the build does
>> succeed with the gradle wrapper.
>>
>> Kenn
>>
>> On Wed, Aug 21, 2019 at 9:17 AM Lukasz Cwik  wrote:
>>
>>> +1 (binding)
>>>
>>> I validated the signatures against the key dist/release/KEYS and hashes
>>> of the source distributions and release artifacts.
>>> I also ran some of the quickstarts for Java.
>>>
>>> On Tue, Aug 20, 2019 at 3:59 PM Pablo Estrada 
>>> wrote:
>>>
 +1

 I've installed from the source in  apache/dist.
 I've run unit tests in Python 3.6, and wordcount in Python 3.6 in
 Direct and Dataflow runners.

 Thanks!
 -P.

 On Tue, Aug 20, 2019 at 11:41 AM Hannah Jiang 
 wrote:

> Yes, I agree this is a separate topic and shouldn't block 2.15 release.
> There is already a JIRA ticket, I will update it with more details.
>
> On Tue, Aug 20, 2019 at 11:32 AM Ahmet Altay  wrote:
>
>>
>>
>> On Tue, Aug 20, 2019 at 10:18 AM Yifan Zou 
>> wrote:
>>
>>> Hi all,
>>>
>>> This is a friendly reminder. Please help to review, verify and vote
>>> on the release candidate #2 for the version 2.15.0.
>>> [ ] +1, Approve the release
>>> [ ] -1, Do not approve the release (please provide specific comments)
>>>
>>> I've verified Java quickstart & mobile games, and Python (both tar
>>> and wheel) quickstart with Py27, 35, 36, 37. They worked well.
>>>
>>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit?pli=1#gid=1036192804
>>>
>>> Thanks.
>>> Yifan
>>>
>>>
>>>
>>> On Tue, Aug 20, 2019 at 9:33 AM Hannah Jiang 
>>> wrote:
>>>
 A side note about this test:
 Now we only have py2 and py35, so it only fails with py35. I am
 introducing minor versions, which will add py36 and py37, and all py3 
 are
 flaky.
 It's really difficult to pass Portable Precommit with minor
 versions, the chance of passing the test is around 15%.

>>>
>> Hannah, let's separate this from the release thread. Is there a JIRA
>> for this, could you update it? And perhaps we need different pre commits
>> for different versions so that flakes do not stack up. Even if a suite is
>> >90% reliable, if we stack up with 4 version, the reliability will get 
>> >much
>> lower.
>>
>>
>>>
 On Mon, Aug 19, 2019 at 5:27 PM Ahmet Altay 
 wrote:

> Thank you. Unless there are any objects, let's continue with
> validating RC2.
>
> On Mon, Aug 19, 2019 at 5:21 PM Kyle Weaver 
> wrote:
>
>> I'm not sure if it's worth blocking the release, since I can't
>> reproduce the issue on my machine and a fix would be hard to verify.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib |
>> kcwea...@google.com
>>
>>
>> On Mon, Aug 19, 2019 at 4:56 PM Ahmet Altay 
>> wrote:
>>
>>> Kyle, are you currently working on this to decide whether it is
>>> the blocking case or not? Also is this affecting both release 
>>> branch and
>>> master branch?
>>>
>>> On Mon, Aug 19, 2019 at 4:49 PM Kyle Weaver 
>>> wrote:
>>>
 Re BEAM-7993: For some context, there are two possible causes
 here. The pessimistic take is that Dockerized SDK workers are 
 taking
 forever to start. The optimistic take is that the Docker 
 containers are
 just longer than normal (but not forever) to start on Jenkins, in 
 which
 case this issue is nothing new to this release. (The halting 
 problem!) If
 it's the second, it's safe to wait to fix it in the next release.

 Kyle Weaver | Software Engineer | github.com/ibzib |
 kcwea...@google.com


 On Mon, Aug 19, 2019 at 4:42 PM Yifan Zou 
 wrote:

> Mark and Kyle found a py35 portable test which is flaky:
> https://issues.apache.org/jira/browse/BEAM-7993.
> I plan to finalize the release this week. Would that be a
> blocker? Could we include the fix in 2.16?
>
>>>

Re: [VOTE] Release 2.15.0, release candidate #2

2019-08-21 Thread Kenneth Knowles
+1

My mistake. I was on the wrong git commit and read the wrong gradle version
from the spec for the wrapper.

On Wed, Aug 21, 2019 at 3:08 PM Kenneth Knowles  wrote:

> Trying to build from the actual source release, and having challenges. The
> gradle wrapper is absent due to licensing, so I built with fresh Gradle
> 4.10.3. This did not succeed, with errors that I am still looking into. I
> copied in the wrapper from the git repo and confirmed the build does
> succeed with the gradle wrapper.
>
> Kenn
>
> On Wed, Aug 21, 2019 at 9:17 AM Lukasz Cwik  wrote:
>
>> +1 (binding)
>>
>> I validated the signatures against the key dist/release/KEYS and hashes
>> of the source distributions and release artifacts.
>> I also ran some of the quickstarts for Java.
>>
>> On Tue, Aug 20, 2019 at 3:59 PM Pablo Estrada  wrote:
>>
>>> +1
>>>
>>> I've installed from the source in  apache/dist.
>>> I've run unit tests in Python 3.6, and wordcount in Python 3.6 in Direct
>>> and Dataflow runners.
>>>
>>> Thanks!
>>> -P.
>>>
>>> On Tue, Aug 20, 2019 at 11:41 AM Hannah Jiang 
>>> wrote:
>>>
 Yes, I agree this is a separate topic and shouldn't block 2.15 release.
 There is already a JIRA ticket, I will update it with more details.

 On Tue, Aug 20, 2019 at 11:32 AM Ahmet Altay  wrote:

>
>
> On Tue, Aug 20, 2019 at 10:18 AM Yifan Zou 
> wrote:
>
>> Hi all,
>>
>> This is a friendly reminder. Please help to review, verify and vote
>> on the release candidate #2 for the version 2.15.0.
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>> I've verified Java quickstart & mobile games, and Python (both tar
>> and wheel) quickstart with Py27, 35, 36, 37. They worked well.
>>
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit?pli=1#gid=1036192804
>>
>> Thanks.
>> Yifan
>>
>>
>>
>> On Tue, Aug 20, 2019 at 9:33 AM Hannah Jiang 
>> wrote:
>>
>>> A side note about this test:
>>> Now we only have py2 and py35, so it only fails with py35. I am
>>> introducing minor versions, which will add py36 and py37, and all py3 
>>> are
>>> flaky.
>>> It's really difficult to pass Portable Precommit with minor
>>> versions, the chance of passing the test is around 15%.
>>>
>>
> Hannah, let's separate this from the release thread. Is there a JIRA
> for this, could you update it? And perhaps we need different pre commits
> for different versions so that flakes do not stack up. Even if a suite is
> >90% reliable, if we stack up with 4 version, the reliability will get 
> >much
> lower.
>
>
>>
>>> On Mon, Aug 19, 2019 at 5:27 PM Ahmet Altay 
>>> wrote:
>>>
 Thank you. Unless there are any objects, let's continue with
 validating RC2.

 On Mon, Aug 19, 2019 at 5:21 PM Kyle Weaver 
 wrote:

> I'm not sure if it's worth blocking the release, since I can't
> reproduce the issue on my machine and a fix would be hard to verify.
>
> Kyle Weaver | Software Engineer | github.com/ibzib |
> kcwea...@google.com
>
>
> On Mon, Aug 19, 2019 at 4:56 PM Ahmet Altay 
> wrote:
>
>> Kyle, are you currently working on this to decide whether it is
>> the blocking case or not? Also is this affecting both release branch 
>> and
>> master branch?
>>
>> On Mon, Aug 19, 2019 at 4:49 PM Kyle Weaver 
>> wrote:
>>
>>> Re BEAM-7993: For some context, there are two possible causes
>>> here. The pessimistic take is that Dockerized SDK workers are taking
>>> forever to start. The optimistic take is that the Docker containers 
>>> are
>>> just longer than normal (but not forever) to start on Jenkins, in 
>>> which
>>> case this issue is nothing new to this release. (The halting 
>>> problem!) If
>>> it's the second, it's safe to wait to fix it in the next release.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>> kcwea...@google.com
>>>
>>>
>>> On Mon, Aug 19, 2019 at 4:42 PM Yifan Zou 
>>> wrote:
>>>
 Mark and Kyle found a py35 portable test which is flaky:
 https://issues.apache.org/jira/browse/BEAM-7993.
 I plan to finalize the release this week. Would that be a
 blocker? Could we include the fix in 2.16?

 Thanks.


 On Mon, Aug 19, 2019 at 4:38 PM Yifan Zou 
 wrote:

> I've run most of validations and they're all good.
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXB

Re: [VOTE] Release 2.15.0, release candidate #2

2019-08-21 Thread Kenneth Knowles
Trying to build from the actual source release, and having challenges. The
gradle wrapper is absent due to licensing, so I built with fresh Gradle
4.10.3. This did not succeed, with errors that I am still looking into. I
copied in the wrapper from the git repo and confirmed the build does
succeed with the gradle wrapper.

Kenn

On Wed, Aug 21, 2019 at 9:17 AM Lukasz Cwik  wrote:

> +1 (binding)
>
> I validated the signatures against the key dist/release/KEYS and hashes of
> the source distributions and release artifacts.
> I also ran some of the quickstarts for Java.
>
> On Tue, Aug 20, 2019 at 3:59 PM Pablo Estrada  wrote:
>
>> +1
>>
>> I've installed from the source in  apache/dist.
>> I've run unit tests in Python 3.6, and wordcount in Python 3.6 in Direct
>> and Dataflow runners.
>>
>> Thanks!
>> -P.
>>
>> On Tue, Aug 20, 2019 at 11:41 AM Hannah Jiang 
>> wrote:
>>
>>> Yes, I agree this is a separate topic and shouldn't block 2.15 release.
>>> There is already a JIRA ticket, I will update it with more details.
>>>
>>> On Tue, Aug 20, 2019 at 11:32 AM Ahmet Altay  wrote:
>>>


 On Tue, Aug 20, 2019 at 10:18 AM Yifan Zou  wrote:

> Hi all,
>
> This is a friendly reminder. Please help to review, verify and vote
> on the release candidate #2 for the version 2.15.0.
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> I've verified Java quickstart & mobile games, and Python (both tar and
> wheel) quickstart with Py27, 35, 36, 37. They worked well.
>
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit?pli=1#gid=1036192804
>
> Thanks.
> Yifan
>
>
>
> On Tue, Aug 20, 2019 at 9:33 AM Hannah Jiang 
> wrote:
>
>> A side note about this test:
>> Now we only have py2 and py35, so it only fails with py35. I am
>> introducing minor versions, which will add py36 and py37, and all py3 are
>> flaky.
>> It's really difficult to pass Portable Precommit with minor versions,
>> the chance of passing the test is around 15%.
>>
>
 Hannah, let's separate this from the release thread. Is there a JIRA
 for this, could you update it? And perhaps we need different pre commits
 for different versions so that flakes do not stack up. Even if a suite is
 >90% reliable, if we stack up with 4 version, the reliability will get much
 lower.


>
>> On Mon, Aug 19, 2019 at 5:27 PM Ahmet Altay  wrote:
>>
>>> Thank you. Unless there are any objects, let's continue with
>>> validating RC2.
>>>
>>> On Mon, Aug 19, 2019 at 5:21 PM Kyle Weaver 
>>> wrote:
>>>
 I'm not sure if it's worth blocking the release, since I can't
 reproduce the issue on my machine and a fix would be hard to verify.

 Kyle Weaver | Software Engineer | github.com/ibzib |
 kcwea...@google.com


 On Mon, Aug 19, 2019 at 4:56 PM Ahmet Altay 
 wrote:

> Kyle, are you currently working on this to decide whether it is
> the blocking case or not? Also is this affecting both release branch 
> and
> master branch?
>
> On Mon, Aug 19, 2019 at 4:49 PM Kyle Weaver 
> wrote:
>
>> Re BEAM-7993: For some context, there are two possible causes
>> here. The pessimistic take is that Dockerized SDK workers are taking
>> forever to start. The optimistic take is that the Docker containers 
>> are
>> just longer than normal (but not forever) to start on Jenkins, in 
>> which
>> case this issue is nothing new to this release. (The halting 
>> problem!) If
>> it's the second, it's safe to wait to fix it in the next release.
>>
>> Kyle Weaver | Software Engineer | github.com/ibzib |
>> kcwea...@google.com
>>
>>
>> On Mon, Aug 19, 2019 at 4:42 PM Yifan Zou 
>> wrote:
>>
>>> Mark and Kyle found a py35 portable test which is flaky:
>>> https://issues.apache.org/jira/browse/BEAM-7993.
>>> I plan to finalize the release this week. Would that be a
>>> blocker? Could we include the fix in 2.16?
>>>
>>> Thanks.
>>>
>>>
>>> On Mon, Aug 19, 2019 at 4:38 PM Yifan Zou 
>>> wrote:
>>>
 I've run most of validations and they're all good.
 https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit?pli=1#gid=1036192804

 On Mon, Aug 19, 2019 at 10:59 AM Hannah Jiang <
 hannahji...@google.com> wrote:

> (resending it to dev@)
> +1, I tested some test cases as well as customized test cases
> and all looks good. I updated validation sheet.
>>

Re: [Discuss] Retractions in Beam

2019-08-21 Thread Rui Wang
Thanks Kenn.

Some points to answer some concerns:
1. Adding retraction won't break existing users (because it is a new
accumulation mode).
2. Adding retraction won't affect existing pipeline's performance(it could
be done by avoiding calling retracting mode's core components, e.g.
ReduceFn, by existing modes).
3. If we keep going in the direction of "retracting and discarding", the
performance decrease should be minimal: small input change that leads to
small output change will be cheap to compute. For example, for Combining, a
retraction should never require a full recompute on every combined element.
It should be the same for Join.


Please feel free to combine/add other pieces you want to add, even if there
could be overlaps.

-Rui


On Wed, Aug 21, 2019 at 11:20 AM Kenneth Knowles  wrote:

> I reviewed your PR (https://github.com/apache/beam/pull/9199) and Anton's
> as another reference (https://github.com/apache/beam/pull/4742). Nice
> work. I thought I would summarize for the list a little bit. I think we
> have not done too much with retractions because it seems like a big job.
> You both have shown that it is maybe not that hard to implement the core.
> But it will have a lot of user-facing things that we have to test very
> carefully.
>
>  - the technical changes are primarily to ReduceFn aka GroupAlsoByWindow
> which is the core of stateful aggregation and is straightforward, which is
> cool
>  - the boilerplate through the codebase is a lot (most of the ~1000 lines
> of both PRs) but it could have been a lot worse, so we are lucky :-)
>
> Here are steps forward that I can think of:
>
>  - we need backwards compatibility, which is trivial because it is a new
> accumulation mode
>  - we need a little more mathematical analysis (at least personally to
> have more confidence there are no bad surprises)
>  - we need more description of the user-facing impact and API changes
> (same reason)
>  - lots and lots of @ValidatesRunner tests
>  - some opinions here from runner authors about efficiency in their system
>  - and merging/unmerging window support matters since that is a key
> retractions use case too, but I would save it for later in my opinion (if
> you've seen Tyler's hack to do Validity Windows then that is even harder)
>  - we also need protections so that things which will do not work with
> retractions are rejected, which will be all existing user DoFns and all
> sinks
>
> I've got an old doc made w/ Anton, Ben, and a couple others that I can try
> to find time to edit and share that deals a little bit with mathematics
> (messy and incomplete) and the API/compatibility questions (more useful,
> probably). You've seen it offline but the list has not seen a public
> version. I was going to try to merge it with yours but I can get it out
> quicker if I just allow for the overlaps.
>
> Kenn
>
> On Mon, Aug 12, 2019 at 9:47 PM Rui Wang  wrote:
>
>> Hello!
>>
>> I have also been building a proof of concept(PR
>> ), which implements the
>> streaming wordcount example in the design doc.
>>
>> What is missing in the PoC is ordering guarantee implementation in sink
>> (which I am working on).
>>
>>
>> -Rui
>>
>> On Wed, Jul 24, 2019 at 1:37 PM Rui Wang  wrote:
>>
>>> Hello!
>>>
>>> In case you are not aware of, I have added a modified streaming
>>> wordcount example at the end of the doc to illustrate retractions.
>>>
>>>
>>> -Rui
>>>
>>> On Wed, Jul 10, 2019 at 10:58 AM Rui Wang  wrote:
>>>
 Hi Community,

 Retractions is a part of core Beam model [1]. I come up with a doc to
 discuss retractions about use cases, model and API (see the link below).
 This is a very beginning discussion on retractions but I do hope we can
 have a consensus and make retractions implemented in a useful way
 eventually.


 doc link:
 https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing


 [1]: https://issues.apache.org/jira/browse/BEAM-91


 -Rui

>>>


Re: [DISCUSS] Multiple-triggering SQL Join with retractions support

2019-08-21 Thread Rui Wang
Kenn - Yep totally agree the first phrase should not include EMIT. Although
it would be interesting to explore EMIT support in Calcite as a R&D work.

Mingmin - Thanks for you example query, which is an interesting use case,
in which two inputs are aggregations with different modes. Retraction won't
be a concern. The more interesting question is how can we allow setup
different modes in SQL query. It certainly beyond scope of this thread. We
might could add more syntax to EMIT to allow control
acc/discarding/retracting. However, it's not discussed yet so I don't have
a clear idea.


-Rui

On Wed, Aug 21, 2019 at 12:22 PM Mingmin Xu  wrote:

> @Rui In my cases, we have some complex queries like
> SELECT ...
> FROM ( SELECT ... FROM PRE_A GROUP BY id, TUMBLE(1 HOUR) ) A
> JOIN ( SELECT ... FROM PRE_B GROUP BY id, TUMBLE(1 HOUR) ) B
> ON A.id=B.id
> //A emit every minute on accumulate mode and B emit every minute on
> discard move.
>
> Would be interested to know how it can support with retraction in SQL,
> currently this operation is blocked simply.
>
> Mingmin
>
> On Wed, Aug 21, 2019 at 11:21 AM Kenneth Knowles  wrote:
>
>> These all sound useful. One thing is that the EMIT syntax is a more early
>> idea, and more likely subject to some changes. The problem with EMIT
>> anywhere except the top level is that it is not very composable. It really
>> belongs most as part of an INSERT statement, just like sink triggers.
>>
>> Maybe a first step is to do the basics for retractions in Beam itself.
>> This is already a lot of work (I just reviewed your prototype and Anton's
>> together so I have a very good idea where it is at). Once we have the
>> basics, then SqlTransform can have triggers set on its input and still work
>> with grouping and joins. That will let us explore retractions in SQL
>> without depending on EMIT.
>>
>> Kenn
>>
>> On Mon, Aug 19, 2019 at 7:02 PM Rui Wang  wrote:
>>
>>> I am also asking TVF windowing and EMIT syntax support in dev@calcite.
>>> See [1].
>>>
>>>
>>>
>>> [1]:
>>> https://lists.apache.org/thread.html/71724f8a9079be11c04c70c64097491822323f560a79a7fa1321711d@%3Cdev.calcite.apache.org%3E
>>>
>>> -Rui
>>>
>>> On Mon, Aug 19, 2019 at 4:40 PM Rui Wang  wrote:
>>>
 Hi Mingmin,

 Thanks for adding "INSERT INTO" (which I missed from the example)

 I am not sure if I understand the question:

 1. multiple GBK with retraction is solved by [1].
 2. In terms of SQL and its view, the output are defined by the last GBK.

 [1]:
 https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing


 -Rui

 On Mon, Aug 19, 2019 at 4:02 PM Mingmin Xu  wrote:

> +1 to support EMIT in Beam side first if we cannot include it in
> Calcite in short time(See #1, #2). I'm open to use any format, the one
> above or something as below. The tricky question is, what's the expected
> behavior for a complex query with more than 1 GBK operators?
>
> EMIT   |  [ACCUMULATE|DISCARD]
> [INSERT INTO ...]
> SELECT ...
>
> #1.
> https://sematext.com/opensee/m/Calcite/FR3K9JVAl32VULr6?subj=Towards+a+spec+for+robust+streaming+SQL+Part+1
> #2
> https://sematext.com/opensee/m/Beam/gfKHFFDd4i1I3nZc2?subj=Towards+a+spec+for+robust+streaming+SQL+Part+2
>
> On Mon, Aug 19, 2019 at 12:02 PM Rui Wang  wrote:
>
>> To update this idea, I think we can go a step further to support EMIT
>> syntax from one-sql-to-rule-them-all paper [1].
>>
>> EMIT will allow periodic delay stream materialization. For stream
>> view, it means we will add support to sinks to keep generating a 
>> changelog
>> table. For view only, it means we will add support to sinks to generate a
>> compacted table form changelog table periodically.
>>
>> Regarding to SQL, a typical query like the following should run:
>>
>>
>> *WITH joined_table AS (SELECT * FROM S1 JOIN S2)*
>> *SELECT XX FROM HOP(joined_table)*
>> *EMTI [STREAM] AFTER DELAY INTERVAL '1' HOUR*
>>
>>
>> By doing so, retractions will be much useful for SQL from a product
>> scenario, in which we can have a meaningful end to end SQL pipeline.
>>
>> [1]: https://arxiv.org/pdf/1905.12133.pdf
>>
>> -Rui
>>
>> On Mon, Aug 12, 2019 at 11:30 PM Rui Wang  wrote:
>>
>>> Hi Community,
>>>
>>> BeamSQL currently does not support unbounded-unbounded join with
>>> non-default trigger. It is because:
>>>
>>> - Discarding mode does not work for outer joins because of lacking
>>> of ability to retract pre-emitted values. You can think about an 
>>> example in
>>> which a tuple of (left_row, null) needed to be retracted  if the matched
>>> right_row appears since last trigger fired.
>>> - Accumulating mode *theoretically* can support unbounded-unbounded
>>> join because it's supposed to always "overwrite" pr

Re: [DISCUSS] Making consistent use of Optionals

2019-08-21 Thread Jan Lukavský

Sorry, forgot to add link to the Flink discussion [1].

[1] 
https://lists.apache.org/thread.html/f5f8ce92f94c9be6774340fbd7ae5e4afe07386b6765ad3cfb13aec0@%3Cdev.flink.apache.org%3E


On 8/21/19 10:08 PM, Jan Lukavský wrote:

Hi,

sorry if this discussion have been already taken, but I'd like to know 
others opinions about how we use Optionals. The state in current 
master is as follows:


$ git grep "import" | grep "java.util.Optional" | wc -l
85
$ git grep "import" | grep "Optional" | grep guava | wc -l
45

I'd like to propose that we use only one Optional, for consistency. 
There are arguments towards using each one of them, if I try to sum 
these up:


Pros for java.util:

 * Part of standard lib

 * Will (in the future) probably better integrate with other standard 
APIs (e.g. Optional.stream in JDK 9, but probably more to come)


Pros for guava:

 * Guava's Optional is Serializable

There was recently a discussion on Flink's mailing list [1], which 
arrived at a conclusion, that using Optional as a field should be 
discouraged (in favor of @Nullable). That would imply that 
non-serializability of java.util.Optional is not an issue. But maybe 
we can arrive at a different conclusion.


WDYT?

 Jan



[DISCUSS] Making consistent use of Optionals

2019-08-21 Thread Jan Lukavský

Hi,

sorry if this discussion have been already taken, but I'd like to know 
others opinions about how we use Optionals. The state in current master 
is as follows:


$ git grep "import" | grep "java.util.Optional" | wc -l
85
$ git grep "import" | grep "Optional" | grep guava | wc -l
45

I'd like to propose that we use only one Optional, for consistency. 
There are arguments towards using each one of them, if I try to sum 
these up:


Pros for java.util:

 * Part of standard lib

 * Will (in the future) probably better integrate with other standard 
APIs (e.g. Optional.stream in JDK 9, but probably more to come)


Pros for guava:

 * Guava's Optional is Serializable

There was recently a discussion on Flink's mailing list [1], which 
arrived at a conclusion, that using Optional as a field should be 
discouraged (in favor of @Nullable). That would imply that 
non-serializability of java.util.Optional is not an issue. But maybe we 
can arrive at a different conclusion.


WDYT?

 Jan



Re: [DISCUSS] Multiple-triggering SQL Join with retractions support

2019-08-21 Thread Mingmin Xu
@Rui In my cases, we have some complex queries like
SELECT ...
FROM ( SELECT ... FROM PRE_A GROUP BY id, TUMBLE(1 HOUR) ) A
JOIN ( SELECT ... FROM PRE_B GROUP BY id, TUMBLE(1 HOUR) ) B
ON A.id=B.id
//A emit every minute on accumulate mode and B emit every minute on discard
move.

Would be interested to know how it can support with retraction in SQL,
currently this operation is blocked simply.

Mingmin

On Wed, Aug 21, 2019 at 11:21 AM Kenneth Knowles  wrote:

> These all sound useful. One thing is that the EMIT syntax is a more early
> idea, and more likely subject to some changes. The problem with EMIT
> anywhere except the top level is that it is not very composable. It really
> belongs most as part of an INSERT statement, just like sink triggers.
>
> Maybe a first step is to do the basics for retractions in Beam itself.
> This is already a lot of work (I just reviewed your prototype and Anton's
> together so I have a very good idea where it is at). Once we have the
> basics, then SqlTransform can have triggers set on its input and still work
> with grouping and joins. That will let us explore retractions in SQL
> without depending on EMIT.
>
> Kenn
>
> On Mon, Aug 19, 2019 at 7:02 PM Rui Wang  wrote:
>
>> I am also asking TVF windowing and EMIT syntax support in dev@calcite.
>> See [1].
>>
>>
>>
>> [1]:
>> https://lists.apache.org/thread.html/71724f8a9079be11c04c70c64097491822323f560a79a7fa1321711d@%3Cdev.calcite.apache.org%3E
>>
>> -Rui
>>
>> On Mon, Aug 19, 2019 at 4:40 PM Rui Wang  wrote:
>>
>>> Hi Mingmin,
>>>
>>> Thanks for adding "INSERT INTO" (which I missed from the example)
>>>
>>> I am not sure if I understand the question:
>>>
>>> 1. multiple GBK with retraction is solved by [1].
>>> 2. In terms of SQL and its view, the output are defined by the last GBK.
>>>
>>> [1]:
>>> https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing
>>>
>>>
>>> -Rui
>>>
>>> On Mon, Aug 19, 2019 at 4:02 PM Mingmin Xu  wrote:
>>>
 +1 to support EMIT in Beam side first if we cannot include it in
 Calcite in short time(See #1, #2). I'm open to use any format, the one
 above or something as below. The tricky question is, what's the expected
 behavior for a complex query with more than 1 GBK operators?

 EMIT   |  [ACCUMULATE|DISCARD]
 [INSERT INTO ...]
 SELECT ...

 #1.
 https://sematext.com/opensee/m/Calcite/FR3K9JVAl32VULr6?subj=Towards+a+spec+for+robust+streaming+SQL+Part+1
 #2
 https://sematext.com/opensee/m/Beam/gfKHFFDd4i1I3nZc2?subj=Towards+a+spec+for+robust+streaming+SQL+Part+2

 On Mon, Aug 19, 2019 at 12:02 PM Rui Wang  wrote:

> To update this idea, I think we can go a step further to support EMIT
> syntax from one-sql-to-rule-them-all paper [1].
>
> EMIT will allow periodic delay stream materialization. For stream
> view, it means we will add support to sinks to keep generating a changelog
> table. For view only, it means we will add support to sinks to generate a
> compacted table form changelog table periodically.
>
> Regarding to SQL, a typical query like the following should run:
>
>
> *WITH joined_table AS (SELECT * FROM S1 JOIN S2)*
> *SELECT XX FROM HOP(joined_table)*
> *EMTI [STREAM] AFTER DELAY INTERVAL '1' HOUR*
>
>
> By doing so, retractions will be much useful for SQL from a product
> scenario, in which we can have a meaningful end to end SQL pipeline.
>
> [1]: https://arxiv.org/pdf/1905.12133.pdf
>
> -Rui
>
> On Mon, Aug 12, 2019 at 11:30 PM Rui Wang  wrote:
>
>> Hi Community,
>>
>> BeamSQL currently does not support unbounded-unbounded join with
>> non-default trigger. It is because:
>>
>> - Discarding mode does not work for outer joins because of lacking of
>> ability to retract pre-emitted values. You can think about an example in
>> which a tuple of (left_row, null) needed to be retracted  if the matched
>> right_row appears since last trigger fired.
>> - Accumulating mode *theoretically* can support unbounded-unbounded
>> join because it's supposed to always "overwrite" previous result. However
>> in practice, for join use cases such overwriting is too expensive. It 
>> would
>> be much more efficient if small changes in inputs of join only cause 
>> small
>> changes to downstream to compute.
>> - Both discarding mode and accumulating mode are not sufficient to
>> refine materialized data.
>>
>> Meanwhile, [1] has kicked off a discussion on retractions in Beam
>> model. I have been collecting people's feedback and generally speaking
>> people agree that retractions are useful for some use cases.
>>
>> Thus I propose to combine SQL join with retractions to
>> support multiple-triggering SQL Join.
>>
>> I think SQL join is a good start for supporting retraction in Beam
>> with the 

Re: Support ZetaSQL as a new SQL dialect in BeamSQL

2019-08-21 Thread Rui Wang
Thanks everyone! Now Beam ZetaSQL is merged into Beam repo!


-Rui

On Mon, Aug 19, 2019 at 8:36 AM Ahmet Altay  wrote:

> Thank you both!
>
> On Mon, Aug 19, 2019 at 8:01 AM Kenneth Knowles  wrote:
>
>> The i.p. clearance is complete:
>> https://lists.apache.org/thread.html/239be048e7748f079dc34b06020e0c8f094859cb4a558b361f6b8eb5@
>>
>> Kenn
>>
>> On Mon, Aug 12, 2019 at 4:25 PM Rui Wang  wrote:
>>
>>> Thanks Kenneth.
>>>
>>> I will start a vote for Beam ZetaSQL contribution.
>>>
>>> -Rui
>>>
>>> On Mon, Aug 12, 2019 at 4:11 PM Kenneth Knowles  wrote:
>>>
 Nice explanations of the reasoning. I think two things will stay
 approximately the same even as the ecosystem develops: (1) ZetaSQL has
 pretty clear semantics so we will have a compliant parser, whether it is
 the official one or another like Calcite Babel, and (2) we will need a way
 to implement all the standard ZetaSQL functions and this will be the same
 no matter the frontend.

 For a contribution this large where i.p. clearance is necessary, a vote
 is appropriate. It can happen at the same time or even after i.p. 
 clearance.

 Kenn

 On Wed, Aug 7, 2019 at 1:08 PM Mingmin Xu  wrote:

> Thanks to highlight the parts of types/operators/functions/..., that
> does make things more complicated. +1 that as a short/middle term 
> solution,
> the proposal is reasonable. We could follow up in future to handle it in
> Calcite Babel if possible.
>
> Mingmin
>
> On Tue, Aug 6, 2019 at 3:57 PM Rui Wang  wrote:
>
>> Hi Mingmin,
>>
>> Honestly I don't have an answer to it: a SQL dialect is complicated
>> and I don't have enough understanding on Calcite (Calcite has a big 
>> repo).
>> Based on my read from CALCITE-2280
>> , the closer to
>> standard sql that a dialect is, the less blockers that we will have to
>> support this dialect in Calcite babel parser.
>>
>> However, this is a good question, which raises a good aspect that I
>> found people usually ignore: supporting a SQL dialect is not only 
>> support a
>> type of syntax. It also includes data types, built-in sql functions,
>> operators and many other stuff.
>>
>> I especially found the following incompatibilities between Calcite
>> and ZetaSQL during the development:
>> 1. Calcite does not support Struct/Row type well because Calcite
>> flattens Rows when reading from tables by adding an extra Projection on 
>> top
>> of tables.
>> 2. I had trouble in supporting DATETIME(or timestamp without
>> time zone) type.
>> 3. Huge incompatibilities on SQL functions. E.g. return type is
>> different for AVG(long), and many many more.
>> 4. I am not sure if Calcite has the same set of type casting rules as
>> BigQuery(my impression is there are differences).
>>
>>
>> I would say in the short/mid term, it's much easier to use logical
>> plan as IR to implement another SQL dialect for BeamSQL (Linkedin has
>> similar practice, see their blog post
>> 
>> ).
>>
>> For the longer term, it would be interesting to see how we can add
>> BigQuery syntax (plus its data types and sql functions) to Calcite babel
>> parser.
>>
>>
>>
>> -Rui
>>
>>
>> On Tue, Aug 6, 2019 at 2:49 PM Mingmin Xu  wrote:
>>
>>> Just take a look at
>>> https://issues.apache.org/jira/browse/CALCITE-2280 which introduced
>>> Babel parser in Calcite to support varied dialects, this may be an 
>>> easier
>>> way to support BigQuery syntax. @Rui do you notice any big difference
>>> between Calcite engine and ZetaSQL, like parsing, optimization? If 
>>> that's
>>> the case, it make sense to build the alternative switch in Beam side.
>>>
>>> On Sun, Aug 4, 2019 at 4:47 PM Rui Wang  wrote:
>>>
 Mingmin - it sounds like an awesome idea to translate from
 SparkSQL. It's even more exciting to know if we could translate Spark
 Structured Streaming code by a similar way, which enables existing 
 Spark
 SQL/Structure Streaming pipelines run on Beam.

 Reuven - Thanks for bringing it up. I tried to search dev@calcite
 and only found[1]. From that thread, I see that adding ZetaSQL to 
 Calcite
 itself is still a discussion. I am also looking for if anyone knows 
 more
 progress on this work than the thread.


 [1]:
 http://mail-archives.apache.org/mod_mbox/calcite-dev/201905.mbox/%3CCAMj=j=-sPWgxzAgusnx8OYvYDYDcDY=dupe6poytrxhjri9...@mail.gmail.com%3E

 -Rui

 On Sun, Aug 4, 2019 at 3:54 PM Reuven Lax  wrote:

>

Re: Brief of interactive Beam

2019-08-21 Thread Robert Bradshaw
On Wed, Aug 14, 2019 at 11:29 AM Ning Kang  wrote:

> Ahmet, thanks for forwarding!
>
>
>> My main concern at this point is the introduction of new concepts, even
>> though these are not changing other parts of the Beam SDKs. It would be
>> good to see at least an alternative option covered in the design document.
>> The reason is each additional concept adds to the mental load of users. And
>> also concepts from interactive Beam will shift user's expectations of Beam
>> even though there are not direct SDK modifications.
>
>
> Hi Robert. About the concern, I think I have a few points:
>
>1. *Interactive Beam (or Interactive Runner) is already an existing
>"new concept" that normal Beam user could opt-in if they want an
>interactive Beam experience.* They need to do lots of setup steps and
>learn new things such as Jupyter notebook and at least interactive_runner
>module to make it work and make use of it.
>
> I think we should start with the perspective that most users interested in
using Beam interactively already know about Jupyter notebooks, or at least
ipython, and would want to use it to learn (and more effectively use) Beam.

>
>1. *The behavior of existing interactive Beam is different from normal
>Beam because of the interactive nature and the users would expect that.* 
> And
>the users wouldn't shift their expectation of normal Beam. Just like
>running Python scripts might result in different behavior than running all
>of them in an interactive Python session.
>
> I'm not quite following this. One of the advantages strengths of Python is
that lack of the difference between the interactive vs. non-interactive
behavior. (The fact that a script's execution is always in top to bottom
order, unlike a notebook, is the primary difference.)

>
>1. Or if a user runs a Beam pipeline with direct runner, they should
>expect the behavior be different from running it on Dataflow while a user
>needs GCP account. I think the users are aware of the difference when they
>choose to use Interactive Beam.
>
>  The central, defining tenant of Beam is that behavior should be
consistent across different runners. Of course there are operational
details that are difficult, or perhaps even undesirable, to align (like, as
you mention, needing a GCP account for running on Dataflow, or providing
the location of the master when running Flink). But even these should be
minimized (see the recent efforts to make the temp location a standard
rather than dataflow-specific option).

We should, however, attempt to minimize gratuitous differences. In
particular, we should make it as easy as possible to transition (in terms
of code, docs, and developers) between interactive and non-interactive.

>
>1. *Our design actually reduces the mental load of interactive Beam
>users with intuitive interactive features*: create pipeline, visualize
>intermediate PCollection, run pipeline at some point with other runners and
>etc. For example, right now, the user needs to use a more complicated set
>of libraries, like creating a Beam pipeline with interactive runner that
>needs an underlying runner fed in.  We are getting rid of it. An
>interactive Beam user shouldn't be concerned about the underlying
>interactive magic.
>
> I agree a user shouldn't be concerned about the implementation details,
but I fail to see how

p = interactive_module.create_pipeline()

is significantly simpler than, or preferable to,

   p = Pipeline(interactive_module.InteractiveRunner())

especially as the latter is in line with non-interactive pipelines (and all
our examples, docs, etc.) Now, perhaps an argument could be made that
interactivity is not a property of the runner, but something orthogonal to
that, e.g. one should write

p = InteractivePipeline()

or

p = Pipeline(options, interactive=True)

or similar. (It was introduced as a runner because, conceptually, a runner
is something that takes a pipeline and executes it.)

Similarly, why introduce a special run_pipline(p) rather than p.run()?


>1. The interactive experience should be tailored for different
>underlying runners. There is no portability of interactivity and users
>opt-in interactive Beam using notebook would naturally expect something
>similar to the direct runner.
>
> This concerns me a lot. Again, the core tenant of beam is that one can
choose the execution environment (runner) completely independently with how
one writes the pipeline. We should figure out what, if anything, needs to
be supported by a runner to support interactivity (the only thing that
comes to mind is a place to read and write temporary/cached data), but I
very strongly feel we should not go down the road of having different
interactive apis/experiences for different runners. In particular, the many
instances to the DirectRunner are worrisome--what's special about the
DirectRunner that other runners cannot provide th

Re: Java 11 compatibility question

2019-08-21 Thread Kenneth Knowles
On Tue, Aug 20, 2019 at 8:37 AM Elliotte Rusty Harold 
wrote:

>
>
> On Tue, Aug 20, 2019 at 7:51 AM Ismaël Mejía  wrote:
>
>> a per case approach (the exception could be portable runners not based on
>> Java).
>>
>> Of course other definitions of being Java 11 compatible are interesting
>> but probably not part of our current scope. Actions like change the
>> codebase to use Java 11 specific APIs / idioms, publish Java 11 specific
>> artifacts or use Java Platform Modules (JPM). All of these may be nice to
>> have but are probably less important for end users who may just want to be
>> able to use Beam in its current form in Java 11 VMs.
>>
>> What do others think? Is this enough to announce Java 11 compatibility
>> and add the documentation to the webpage?
>>
>
> No, it isn't, I fear. We don't have to use JPMS in Beam, but Beam really
> does need to be compatible with JPMS-using apps. The bare minimum here is
> avoiding split packages, and that needs to include all transitive
> dependencies, not just Beam itself. I don't think we meet that bar now.
>

We definitely don't meet the basic bar ourselves, unless someone has done a
lot of clean up. We've had classes shuffled from jar to jar quite a lot
without changing their namespace appropriately. It may be mostly limited to
runner-facing pieces, but I expect for a number of runners (notably the
Direct Runner) that is enough to bite users.

Kenn


>
> --
> Elliotte Rusty Harold
> elh...@ibiblio.org
>


Re: Write-through-cache in State logic

2019-08-21 Thread Maximilian Michels
> There is probably a misunderstanding here: I'm suggesting to use a worker ID 
> instead of cache tokens, not additionally.

Ah! Misread that. We need a changing token to indicate that the cache is
stale, e.g. checkpoint has failed / restoring from an old checkpoint. If
the _Runner_ generates a new unique token/id for workers which outlast
the Runner, then this should work fine. I don't think it is safe for the
worker to supply the id. The Runner should be in control of cache tokens
to avoid invalid tokens.

> In the PR the token is modified as part of updating the state. Doesn't the 
> SDK need the new token to update it's cache entry also? That's where it would 
> help the SDK to know the new token upfront.

If the state is updated in the Runner, a new token has to be generated.
The old one is not valid anymore. The SDK will use the updated token to
store the new value in the cache. I understand that it would be nice to
know the token upfront. That could be possible with some token
generation scheme. On the other hand, writes can be asynchronous and
thus not block the UDF.

> But I believe there is no need to change the token in first place, unless 
> bundles for the same key (ranges) can be processed by different workers.

That's certainly possible, e.g. two workers A and B take turn processing
a certain key range, one bundle after another:

You process a bundle with a token T with A, then worker B takes over.
Both have an entry with cache token T. So B goes on to modify the state
and uses the same cache token T. Then A takes over again. A would have a
stale cache entry but T would still be a valid cache token.

> Indeed the fact that Dataflow can dynamically split and merge these ranges is 
> what makes it trickier. If Flink does not repartition the ranges, then things 
> are much easier.

Flink does not dynamically repartition key ranges (yet). If it started
to support that, we would invalidate the cache tokens for the changed
partitions.


I'd suggest the following cache token generation scheme:

One cache token per key range for user state and one cache token for
each side input. On writes to user state or changing side input, the
associated cache token will be renewed.

On the SDK side, it should be sufficient to let the SDK re-associate all
its cached data belonging to a valid cache token with a new cache token
returned by a successful write. This has to happen in the active scope
(i.e. user state, or a particular side input).

If the key range changes, new cache tokens have to generated. This
should happen automatically because the Runner does not checkpoint cache
tokens and will generate new ones when it restarts from an earlier
checkpoint.

The current PR needs to be changed to (1) only keep a single cache token
per user state and key range (2) add support for cache tokens for each
side input.

Hope that makes sense.

-Max

On 21.08.19 17:27, Reuven Lax wrote:
> 
> 
> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels  > wrote:
> 
> Appreciate all your comments! Replying below.
> 
> 
> @Luke:
> 
> > Having cache tokens per key would be very expensive indeed and I
> believe we should go with a single cache token "per" bundle.
> 
> Thanks for your comments on the PR. I was thinking to propose something
> along this lines of having cache tokens valid for a particular
> checkpointing "epoch". That would require even less token renewal than
> the per-bundle approach.
> 
> 
> @Thomas, thanks for the input. Some remarks:
> 
> > Wouldn't it be simpler to have the runner just track a unique ID
> for each worker and use that to communicate if the cache is valid or
> not?
> 
> We do not need a unique id per worker. If a cache token is valid for a
> particular worker, it is also valid for another worker. That is with the
> assumption that key ranges are always disjoint between the workers.
> 
> > * When the bundle is started, the runner tells the worker if the
> cache has become invalid (since it knows if another worker has
> mutated state)
> 
> This is simply done by not transferring the particular cache token. No
> need to declare it invalid explicitly.
> 
> > * When the worker sends mutation requests to the runner, it
> includes its own ID (or the runner already has it as contextual
> information). No need to wait for a response.
> 
> Mutations of cached values can be freely done as long as the cache token
> associated with the state is valid for a particular bundle. Only the
> first time, the Runner needs to wait on the response to store the cache
> token. This can also be done asynchronously.
> 
> > * When the bundle is finished, the runner records the last writer
> (only if a change occurred)
> 
> I believe this is not necessary because there will only be one writer at
> a time for a particular bundle and key range, hence only one writer
> holds a va

Re: [DISCUSS] Multiple-triggering SQL Join with retractions support

2019-08-21 Thread Kenneth Knowles
These all sound useful. One thing is that the EMIT syntax is a more early
idea, and more likely subject to some changes. The problem with EMIT
anywhere except the top level is that it is not very composable. It really
belongs most as part of an INSERT statement, just like sink triggers.

Maybe a first step is to do the basics for retractions in Beam itself. This
is already a lot of work (I just reviewed your prototype and Anton's
together so I have a very good idea where it is at). Once we have the
basics, then SqlTransform can have triggers set on its input and still work
with grouping and joins. That will let us explore retractions in SQL
without depending on EMIT.

Kenn

On Mon, Aug 19, 2019 at 7:02 PM Rui Wang  wrote:

> I am also asking TVF windowing and EMIT syntax support in dev@calcite.
> See [1].
>
>
>
> [1]:
> https://lists.apache.org/thread.html/71724f8a9079be11c04c70c64097491822323f560a79a7fa1321711d@%3Cdev.calcite.apache.org%3E
>
> -Rui
>
> On Mon, Aug 19, 2019 at 4:40 PM Rui Wang  wrote:
>
>> Hi Mingmin,
>>
>> Thanks for adding "INSERT INTO" (which I missed from the example)
>>
>> I am not sure if I understand the question:
>>
>> 1. multiple GBK with retraction is solved by [1].
>> 2. In terms of SQL and its view, the output are defined by the last GBK.
>>
>> [1]:
>> https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing
>>
>>
>> -Rui
>>
>> On Mon, Aug 19, 2019 at 4:02 PM Mingmin Xu  wrote:
>>
>>> +1 to support EMIT in Beam side first if we cannot include it in Calcite
>>> in short time(See #1, #2). I'm open to use any format, the one above or
>>> something as below. The tricky question is, what's the expected behavior
>>> for a complex query with more than 1 GBK operators?
>>>
>>> EMIT   |  [ACCUMULATE|DISCARD]
>>> [INSERT INTO ...]
>>> SELECT ...
>>>
>>> #1.
>>> https://sematext.com/opensee/m/Calcite/FR3K9JVAl32VULr6?subj=Towards+a+spec+for+robust+streaming+SQL+Part+1
>>> #2
>>> https://sematext.com/opensee/m/Beam/gfKHFFDd4i1I3nZc2?subj=Towards+a+spec+for+robust+streaming+SQL+Part+2
>>>
>>> On Mon, Aug 19, 2019 at 12:02 PM Rui Wang  wrote:
>>>
 To update this idea, I think we can go a step further to support EMIT
 syntax from one-sql-to-rule-them-all paper [1].

 EMIT will allow periodic delay stream materialization. For stream view,
 it means we will add support to sinks to keep generating a changelog table.
 For view only, it means we will add support to sinks to generate a
 compacted table form changelog table periodically.

 Regarding to SQL, a typical query like the following should run:


 *WITH joined_table AS (SELECT * FROM S1 JOIN S2)*
 *SELECT XX FROM HOP(joined_table)*
 *EMTI [STREAM] AFTER DELAY INTERVAL '1' HOUR*


 By doing so, retractions will be much useful for SQL from a product
 scenario, in which we can have a meaningful end to end SQL pipeline.

 [1]: https://arxiv.org/pdf/1905.12133.pdf

 -Rui

 On Mon, Aug 12, 2019 at 11:30 PM Rui Wang  wrote:

> Hi Community,
>
> BeamSQL currently does not support unbounded-unbounded join with
> non-default trigger. It is because:
>
> - Discarding mode does not work for outer joins because of lacking of
> ability to retract pre-emitted values. You can think about an example in
> which a tuple of (left_row, null) needed to be retracted  if the matched
> right_row appears since last trigger fired.
> - Accumulating mode *theoretically* can support unbounded-unbounded
> join because it's supposed to always "overwrite" previous result. However
> in practice, for join use cases such overwriting is too expensive. It 
> would
> be much more efficient if small changes in inputs of join only cause small
> changes to downstream to compute.
> - Both discarding mode and accumulating mode are not sufficient to
> refine materialized data.
>
> Meanwhile, [1] has kicked off a discussion on retractions in Beam
> model. I have been collecting people's feedback and generally speaking
> people agree that retractions are useful for some use cases.
>
> Thus I propose to combine SQL join with retractions to
> support multiple-triggering SQL Join.
>
> I think SQL join is a good start for supporting retraction in Beam
> with the following caveats:
> 1. multiple-triggering SQL Join is a useful feature.
> 2. SQL join is an opportunity for us to figure out implementation
> details of retraction by building it for a well defined use case.
> 3. Supporting retraction should not cause performance regression on
> existing pipelines, or require changes on existing pipelines.
>
>
> What do you think?
>
> [1]:
> https://lists.apache.org/thread.html/bb2d40b1bea8b21fbbb7caf599fabba823da357768ceca8ea2363789@%3Cdev.beam.apache.org%3E
>
>
> -Rui
>

>>>
>>> --
>>> 

Re: [Discuss] Retractions in Beam

2019-08-21 Thread Kenneth Knowles
I reviewed your PR (https://github.com/apache/beam/pull/9199) and Anton's
as another reference (https://github.com/apache/beam/pull/4742). Nice work.
I thought I would summarize for the list a little bit. I think we have not
done too much with retractions because it seems like a big job. You both
have shown that it is maybe not that hard to implement the core. But it
will have a lot of user-facing things that we have to test very carefully.

 - the technical changes are primarily to ReduceFn aka GroupAlsoByWindow
which is the core of stateful aggregation and is straightforward, which is
cool
 - the boilerplate through the codebase is a lot (most of the ~1000 lines
of both PRs) but it could have been a lot worse, so we are lucky :-)

Here are steps forward that I can think of:

 - we need backwards compatibility, which is trivial because it is a new
accumulation mode
 - we need a little more mathematical analysis (at least personally to have
more confidence there are no bad surprises)
 - we need more description of the user-facing impact and API changes (same
reason)
 - lots and lots of @ValidatesRunner tests
 - some opinions here from runner authors about efficiency in their system
 - and merging/unmerging window support matters since that is a key
retractions use case too, but I would save it for later in my opinion (if
you've seen Tyler's hack to do Validity Windows then that is even harder)
 - we also need protections so that things which will do not work with
retractions are rejected, which will be all existing user DoFns and all
sinks

I've got an old doc made w/ Anton, Ben, and a couple others that I can try
to find time to edit and share that deals a little bit with mathematics
(messy and incomplete) and the API/compatibility questions (more useful,
probably). You've seen it offline but the list has not seen a public
version. I was going to try to merge it with yours but I can get it out
quicker if I just allow for the overlaps.

Kenn

On Mon, Aug 12, 2019 at 9:47 PM Rui Wang  wrote:

> Hello!
>
> I have also been building a proof of concept(PR
> ), which implements the
> streaming wordcount example in the design doc.
>
> What is missing in the PoC is ordering guarantee implementation in sink
> (which I am working on).
>
>
> -Rui
>
> On Wed, Jul 24, 2019 at 1:37 PM Rui Wang  wrote:
>
>> Hello!
>>
>> In case you are not aware of, I have added a modified streaming wordcount
>> example at the end of the doc to illustrate retractions.
>>
>>
>> -Rui
>>
>> On Wed, Jul 10, 2019 at 10:58 AM Rui Wang  wrote:
>>
>>> Hi Community,
>>>
>>> Retractions is a part of core Beam model [1]. I come up with a doc to
>>> discuss retractions about use cases, model and API (see the link below).
>>> This is a very beginning discussion on retractions but I do hope we can
>>> have a consensus and make retractions implemented in a useful way
>>> eventually.
>>>
>>>
>>> doc link:
>>> https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE/edit?usp=sharing
>>>
>>>
>>> [1]: https://issues.apache.org/jira/browse/BEAM-91
>>>
>>>
>>> -Rui
>>>
>>


Re: Write-through-cache in State logic

2019-08-21 Thread Maximilian Michels
> There is probably a misunderstanding here: I'm suggesting to use a
worker ID instead of cache tokens, not additionally.

Ah! Misread that. We need a changing token to indicate that the cache is
stale, e.g. checkpoint has failed / restoring from an old checkpoint. If
the _Runner_ generates a new unique token/id for workers which outlast
the Runner, then this should work fine. I don't think it is safe for the
worker to supply the id. The Runner should be in control of cache tokens
to avoid invalid tokens.

> In the PR the token is modified as part of updating the state. Doesn't
the SDK need the new token to update it's cache entry also? That's where
it would help the SDK to know the new token upfront.

If the state is updated in the Runner, a new token has to be generated.
The old one is not valid anymore. The SDK will use the updated token to
store the new value in the cache. I understand that it would be nice to
know the token upfront. That could be possible with some token
generation scheme. On the other hand, writes can be asynchronous and
thus not block the UDF.

> But I believe there is no need to change the token in first place,
unless bundles for the same key (ranges) can be processed by different
workers.

That's certainly possible, e.g. two workers A and B take turn processing
a certain key range, one bundle after another:

You process a bundle with a token T with A, then worker B takes over.
Both have an entry with cache token T. So B goes on to modify the state
and uses the same cache token T. Then A takes over again. A would have a
stale cache entry but T would still be a valid cache token.

> Indeed the fact that Dataflow can dynamically split and merge these
ranges is what makes it trickier. If Flink does not repartition the
ranges, then things are much easier.

Flink does not dynamically repartition key ranges (yet). If it started
to support that, we would invalidate the cache tokens for the changed
partitions.


I'd suggest the following cache token generation scheme:

One cache token per key range for user state and one cache token for
each side input. On writes to user state or changing side input, the
associated cache token will be renewed.

On the SDK side, it should be sufficient to let the SDK re-associate all
its cached data belonging to a valid cache token with a new cache token
returned by a successful write. This has to happen in the active scope
(i.e. user state, or a particular side input).

If the key range changes, new cache tokens have to generated. This
should happen automatically because the Runner does not checkpoint cache
tokens and will generate new ones when it restarts from an earlier
checkpoint.

The current PR needs to be changed to (1) only keep a single cache token
per user state and key range (2) add support for cache tokens for each
side input.

Hope that makes sense.

-Max

On 21.08.19 17:27, Reuven Lax wrote:
> 
> 
> On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels  > wrote:
> 
> Appreciate all your comments! Replying below.
> 
> 
> @Luke:
> 
> > Having cache tokens per key would be very expensive indeed and I
> believe we should go with a single cache token "per" bundle.
> 
> Thanks for your comments on the PR. I was thinking to propose something
> along this lines of having cache tokens valid for a particular
> checkpointing "epoch". That would require even less token renewal than
> the per-bundle approach.
> 
> 
> @Thomas, thanks for the input. Some remarks:
> 
> > Wouldn't it be simpler to have the runner just track a unique ID
> for each worker and use that to communicate if the cache is valid or
> not?
> 
> We do not need a unique id per worker. If a cache token is valid for a
> particular worker, it is also valid for another worker. That is with the
> assumption that key ranges are always disjoint between the workers.
> 
> > * When the bundle is started, the runner tells the worker if the
> cache has become invalid (since it knows if another worker has
> mutated state)
> 
> This is simply done by not transferring the particular cache token. No
> need to declare it invalid explicitly.
> 
> > * When the worker sends mutation requests to the runner, it
> includes its own ID (or the runner already has it as contextual
> information). No need to wait for a response.
> 
> Mutations of cached values can be freely done as long as the cache token
> associated with the state is valid for a particular bundle. Only the
> first time, the Runner needs to wait on the response to store the cache
> token. This can also be done asynchronously.
> 
> > * When the bundle is finished, the runner records the last writer
> (only if a change occurred)
> 
> I believe this is not necessary because there will only be one writer at
> a time for a particular bundle and key range, hence only one writer
> holds a valid cache token fo

[RESULT] [VOTE] Release 2.15.0, release candidate #2

2019-08-21 Thread Yifan Zou
Hi all,

I'm happy to announce that we have unanimously approved this release.

There are 4 approving votes, 3 of which are binding (in order):
* Ahmet (al...@google.com);
* Pablo (pabl...@google.com);
* Lukasz (lc...@google.com);

There are no disapproving votes.

Thanks everyone!

Next step is to finalize the release (merge the docs/website/blog PRs,
publish artifacts). Please let me know if you have any questions.

Regards,
Yifan


Re: [VOTE] Release 2.15.0, release candidate #2

2019-08-21 Thread Lukasz Cwik
+1 (binding)

I validated the signatures against the key dist/release/KEYS and hashes of
the source distributions and release artifacts.
I also ran some of the quickstarts for Java.

On Tue, Aug 20, 2019 at 3:59 PM Pablo Estrada  wrote:

> +1
>
> I've installed from the source in  apache/dist.
> I've run unit tests in Python 3.6, and wordcount in Python 3.6 in Direct
> and Dataflow runners.
>
> Thanks!
> -P.
>
> On Tue, Aug 20, 2019 at 11:41 AM Hannah Jiang 
> wrote:
>
>> Yes, I agree this is a separate topic and shouldn't block 2.15 release.
>> There is already a JIRA ticket, I will update it with more details.
>>
>> On Tue, Aug 20, 2019 at 11:32 AM Ahmet Altay  wrote:
>>
>>>
>>>
>>> On Tue, Aug 20, 2019 at 10:18 AM Yifan Zou  wrote:
>>>
 Hi all,

 This is a friendly reminder. Please help to review, verify and vote on
 the release candidate #2 for the version 2.15.0.
 [ ] +1, Approve the release
 [ ] -1, Do not approve the release (please provide specific comments)

 I've verified Java quickstart & mobile games, and Python (both tar and
 wheel) quickstart with Py27, 35, 36, 37. They worked well.

 https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit?pli=1#gid=1036192804

 Thanks.
 Yifan



 On Tue, Aug 20, 2019 at 9:33 AM Hannah Jiang 
 wrote:

> A side note about this test:
> Now we only have py2 and py35, so it only fails with py35. I am
> introducing minor versions, which will add py36 and py37, and all py3 are
> flaky.
> It's really difficult to pass Portable Precommit with minor versions,
> the chance of passing the test is around 15%.
>

>>> Hannah, let's separate this from the release thread. Is there a JIRA for
>>> this, could you update it? And perhaps we need different pre commits for
>>> different versions so that flakes do not stack up. Even if a suite is >90%
>>> reliable, if we stack up with 4 version, the reliability will get much
>>> lower.
>>>
>>>

> On Mon, Aug 19, 2019 at 5:27 PM Ahmet Altay  wrote:
>
>> Thank you. Unless there are any objects, let's continue with
>> validating RC2.
>>
>> On Mon, Aug 19, 2019 at 5:21 PM Kyle Weaver 
>> wrote:
>>
>>> I'm not sure if it's worth blocking the release, since I can't
>>> reproduce the issue on my machine and a fix would be hard to verify.
>>>
>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>>> kcwea...@google.com
>>>
>>>
>>> On Mon, Aug 19, 2019 at 4:56 PM Ahmet Altay 
>>> wrote:
>>>
 Kyle, are you currently working on this to decide whether it is the
 blocking case or not? Also is this affecting both release branch and 
 master
 branch?

 On Mon, Aug 19, 2019 at 4:49 PM Kyle Weaver 
 wrote:

> Re BEAM-7993: For some context, there are two possible causes
> here. The pessimistic take is that Dockerized SDK workers are taking
> forever to start. The optimistic take is that the Docker containers 
> are
> just longer than normal (but not forever) to start on Jenkins, in 
> which
> case this issue is nothing new to this release. (The halting 
> problem!) If
> it's the second, it's safe to wait to fix it in the next release.
>
> Kyle Weaver | Software Engineer | github.com/ibzib |
> kcwea...@google.com
>
>
> On Mon, Aug 19, 2019 at 4:42 PM Yifan Zou 
> wrote:
>
>> Mark and Kyle found a py35 portable test which is flaky:
>> https://issues.apache.org/jira/browse/BEAM-7993.
>> I plan to finalize the release this week. Would that be a
>> blocker? Could we include the fix in 2.16?
>>
>> Thanks.
>>
>>
>> On Mon, Aug 19, 2019 at 4:38 PM Yifan Zou 
>> wrote:
>>
>>> I've run most of validations and they're all good.
>>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit?pli=1#gid=1036192804
>>>
>>> On Mon, Aug 19, 2019 at 10:59 AM Hannah Jiang <
>>> hannahji...@google.com> wrote:
>>>
 (resending it to dev@)
 +1, I tested some test cases as well as customized test cases
 and all looks good. I updated validation sheet.

 On Mon, Aug 19, 2019 at 10:40 AM Hannah Jiang <
 hannahji...@google.com> wrote:

> +1, I tested some test cases as well as customized test cases
> and all looks good. I updated validation sheet.
>
> On Mon, Aug 19, 2019 at 10:28 AM Ahmet Altay 
> wrote:
>
>> Hi all,
>>
>> Please help with validation and voting on the RC2. Let's help
>> Yifan to f

Re: Java 11 compatibility question

2019-08-21 Thread Łukasz Gajowy
https://issues.apache.org/jira/browse/BEAM-8024 I created the other issue
regarding importing beam to a Java11 project that uses JPMS. I confirmed*
in a pet project that this is happening (linked in the issue).

*no shock here, I just wanted to play with it.

Łukasz

śr., 21 sie 2019 o 14:53 Łukasz Gajowy  napisał(a):

> Thank you, Elliotte for showing us the issues with JPMS.
>
> So maybe we should just announce for end users that they can run Beam
> pipelines in Java 11 but that for the moment Beam modules cannot be
> used in Java 11 module style. I know that there is already a lot of
> fear around Java 8 not being maintained so this will probably be
> perceived well even if not perfect.
>
> +1 for sharing this information to the users. IMO this is really valuable
> knowledge.
>
> Łukasz
>
>
>
>
> śr., 21 sie 2019 o 12:22 Ismaël Mejía  napisał(a):
>
>> Thanks again Elliotte for the clear information and references. It
>> seems that being compatible with Java 11 modules will be more elusive
>> than expected considering the transitive dependencies. Do you (or
>> someone else) knows if there there is a plugin or easy way to discover
>> this?
>>
>> I think that solving this for transitive dependencies will be elusive
>> for a LONG LONG time (I had not even thought about IO modules that
>> have dependencies and commonly live in ‘older’ stable versions). So
>> maybe we should just announce for end users that they can run Beam
>> pipelines in Java 11 but that for the moment Beam modules cannot be
>> used in Java 11 module style. I know that there is already a lot of
>> fear around Java 8 not being maintained so this will probably be
>> perceived well even if not perfect.
>>
>> I filled https://issues.apache.org/jira/browse/BEAM-8021 to set
>> explicitly the module names. We will probably need also to put some
>> validation in place that the jars always include the module name so
>> new modules don’t forget to do so.
>>
>>
>> On Wed, Aug 21, 2019 at 3:28 AM Elliotte Rusty Harold
>>  wrote:
>> >
>> > If somebody is using JPMS and they attempt to import beam, they get a
>> > compile time error. Some other projects I work on have been getting
>> > user reports about this.
>> >
>> > See
>> https://github.com/GoogleCloudPlatform/cloud-opensource-java/blob/master/library-best-practices/JLBP-19.md
>> > for more details.
>> >
>> > On Tue, Aug 20, 2019 at 5:30 PM Ahmet Altay  wrote:
>> > >
>> > >
>> > >
>> > > On Tue, Aug 20, 2019 at 8:37 AM Elliotte Rusty Harold <
>> elh...@ibiblio.org> wrote:
>> > >>
>> > >>
>> > >>
>> > >> On Tue, Aug 20, 2019 at 7:51 AM Ismaël Mejía 
>> wrote:
>> > >>>
>> > >>> a per case approach (the exception could be portable runners not
>> based on Java).
>> > >>>
>> > >>> Of course other definitions of being Java 11 compatible are
>> interesting but probably not part of our current scope. Actions like change
>> the codebase to use Java 11 specific APIs / idioms, publish Java 11
>> specific artifacts or use Java Platform Modules (JPM). All of these may be
>> nice to have but are probably less important for end users who may just
>> want to be able to use Beam in its current form in Java 11 VMs.
>> > >>>
>> > >>> What do others think? Is this enough to announce Java 11
>> compatibility and add the documentation to the webpage?
>> > >>
>> > >>
>> > >> No, it isn't, I fear. We don't have to use JPMS in Beam, but Beam
>> really does need to be compatible with JPMS-using apps. The bare minimum
>> here is avoiding split packages, and that needs to include all transitive
>> dependencies, not just Beam itself. I don't think we meet that bar now.
>> > >
>> > >
>> > > For my understanding, what would be the limitations of Beam's
>> dependencies having split dependencies? Would it limit Beam users from
>> using 3rd party libraries that require JPMS supports? Would it be in scope
>> for Beam to get its dependencies to meet a certain bar?
>> > >
>> > > Ismaël's definition of being able to run Beam published dependencies
>> in Java 11 VM sounds enough to me "to announce Java 11 compatibility _for
>> Beam_".
>> > >
>> > >>
>> > >>
>> > >> --
>> > >> Elliotte Rusty Harold
>> > >> elh...@ibiblio.org
>> >
>> >
>> >
>> > --
>> > Elliotte Rusty Harold
>> > elh...@ibiblio.org
>>
>


Re: Query about JdbcIO.readRows()

2019-08-21 Thread Kishor Joshi
 Hi,
This fix is still not available in the Beam 2.15.0. Is there any Jira that has 
been created for this issue ? I am interested to contribute in that.
Thanks & Regards,Kishor
On Friday, August 2, 2019, 10:19:17 PM GMT+5:30, Jean-Baptiste Onofré 
 wrote:  
 
 Agree. I will fix that. 

Regards
JBLe 2 août 2019, à 17:15, Vishwas Bm  a écrit:
  Hi Kishor,   
   + dev ( dev@beam.apache.org)   
   This looks like a bug.  The attribute statementPreparator is nullable 
   It should have been handled in the same way as in the expand method of 
Read class.
   
   
   Thanks & Regards,  Vishwas 
  
 
  On Fri, Aug 2, 2019 at 2:48 PM Kishor Joshi < joshi...@yahoo.com> wrote: 
  
Hi,   
   I am using the just released 2.14 version for JdbcIO with the newly added 
"readRows" functionality.    
   I want to read table data with a query without parameters (select * from 
table_name).    As per my understanding, this should not require 
"StatementPreperator".    However, if I use the newly added "readRows" 
function, I get an exception that seems to force me to use the 
"StatementPreperator".    Stacktrace below.     
 java.lang.IllegalArgumentException: statementPreparator can not be null    
       at 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
           at 
org.apache.beam.sdk.io.jdbc.JdbcIO$Read.withStatementPreparator(JdbcIO.java:600)
           at 
org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.expand(JdbcIO.java:499)           
at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.expand(JdbcIO.java:410)          
 at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)           at 
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)           at 
org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)           at 
com.nokia.csf.dfle.transforms.DfleRdbmsSource.expand(DfleRdbmsSource.java:34)   
        at 
com.nokia.csf.dfle.transforms.DfleRdbmsSource.expand(DfleRdbmsSource.java:10)   
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)        
   at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)           
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)           at 
org.apache.beam.sdk.Pipeline.apply(Pipeline.java:182)           at 
com.nokia.csf.dfle.dsl.DFLEBeamMain.dagWireUp(DFLEBeamMain.java:49)           
at com.nokia.csf.dfle.dsl.DFLEBeamMain.main(DFLEBeamMain.java:120)   
   
   
   The test added in JdbcIOTest.java for this functionality only tests for 
queries with parameters.    Is this new function supported only in the above 
case and not for normal "withQuery" (without parameters) ?    
   
   Thanks & regards,   Kishor

  

Re: Write-through-cache in State logic

2019-08-21 Thread Reuven Lax
On Wed, Aug 21, 2019 at 2:16 AM Maximilian Michels  wrote:

> Appreciate all your comments! Replying below.
>
>
> @Luke:
>
> > Having cache tokens per key would be very expensive indeed and I believe
> we should go with a single cache token "per" bundle.
>
> Thanks for your comments on the PR. I was thinking to propose something
> along this lines of having cache tokens valid for a particular
> checkpointing "epoch". That would require even less token renewal than
> the per-bundle approach.
>
>
> @Thomas, thanks for the input. Some remarks:
>
> > Wouldn't it be simpler to have the runner just track a unique ID for
> each worker and use that to communicate if the cache is valid or not?
>
> We do not need a unique id per worker. If a cache token is valid for a
> particular worker, it is also valid for another worker. That is with the
> assumption that key ranges are always disjoint between the workers.
>
> > * When the bundle is started, the runner tells the worker if the cache
> has become invalid (since it knows if another worker has mutated state)
>
> This is simply done by not transferring the particular cache token. No
> need to declare it invalid explicitly.
>
> > * When the worker sends mutation requests to the runner, it includes its
> own ID (or the runner already has it as contextual information). No need to
> wait for a response.
>
> Mutations of cached values can be freely done as long as the cache token
> associated with the state is valid for a particular bundle. Only the
> first time, the Runner needs to wait on the response to store the cache
> token. This can also be done asynchronously.
>
> > * When the bundle is finished, the runner records the last writer (only
> if a change occurred)
>
> I believe this is not necessary because there will only be one writer at
> a time for a particular bundle and key range, hence only one writer
> holds a valid cache token for a particular state and key range.
>
>
> @Reuven:
>
> >  Dataflow divides the keyspace up into lexicographic ranges, and creates
> a cache token per range.
>
> State is always processed partitioned by the Flink workers (hash-based,
> not lexicopgrahical). I don't think that matters though because the key
> ranges do not overlap between the workers. Flink does not support
> dynamically repartitioning the key ranges. Even in case of fine-grained
> recovery of workers and their key ranges, we would simply generate new
> cache tokens for a particular worker.
>

Dataflow's ranges are also hash based. When I said lexicographical, I meant
lexicographical based on the hexadecimal hash value.

Indeed the fact that Dataflow can dynamically split and merge these ranges
is what makes it trickier. If Flink does not repartition the ranges, then
things are much easier.


>
> Thanks,
> Max
>
> On 21.08.19 09:33, Reuven Lax wrote:
> > Dataflow does something like this, however since work is
> > load balanced across workers a per-worker id doesn't work very well.
> > Dataflow divides the keyspace up into lexicographic ranges, and creates
> > a cache token per range.
> >
> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise  > > wrote:
> >
> > Commenting here vs. on the PR since related to the overall approach.
> >
> > Wouldn't it be simpler to have the runner just track a unique ID for
> > each worker and use that to communicate if the cache is valid or not?
> >
> > * When the bundle is started, the runner tells the worker if the
> > cache has become invalid (since it knows if another worker has
> > mutated state)
> > * When the worker sends mutation requests to the runner, it includes
> > its own ID (or the runner already has it as contextual information).
> > No need to wait for a response.
> > * When the bundle is finished, the runner records the last writer
> > (only if a change occurred)
> >
> > Whenever current worker ID and last writer ID doesn't match, cache
> > is invalid.
> >
> > Thomas
> >
> >
> > On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik  > > wrote:
> >
> > Having cache tokens per key would be very expensive indeed and I
> > believe we should go with a single cache token "per" bundle.
> >
> > On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels
> > mailto:m...@apache.org>> wrote:
> >
> > Maybe a Beam Python expert can chime in for Rakesh's
> question?
> >
> > Luke, I was assuming cache tokens to be per key and state
> > id. During
> > implementing an initial support on the Runner side, I
> > realized that we
> > probably want cache tokens to only be per state id. Note
> > that if we had
> > per-key cache tokens, the number of cache tokens would
> > approach the
> > total number of keys in an application.
> >
> > If anyone wants to have a look, here is a first version of

Re: Write-through-cache in State logic

2019-08-21 Thread Thomas Weise
-->

On Wed, Aug 21, 2019, 2:16 AM Maximilian Michels  wrote:

> Appreciate all your comments! Replying below.
>
>
> @Luke:
>
> > Having cache tokens per key would be very expensive indeed and I believe
> we should go with a single cache token "per" bundle.
>
> Thanks for your comments on the PR. I was thinking to propose something
> along this lines of having cache tokens valid for a particular
> checkpointing "epoch". That would require even less token renewal than
> the per-bundle approach.
>
>
> @Thomas, thanks for the input. Some remarks:
>
> > Wouldn't it be simpler to have the runner just track a unique ID for
> each worker and use that to communicate if the cache is valid or not?
>
> We do not need a unique id per worker. If a cache token is valid for a
> particular worker, it is also valid for another worker. That is with the
> assumption that key ranges are always disjoint between the workers
>

There is probably a misunderstanding here: I'm suggesting to use a worker
ID instead of cache tokens, not additionally.


> > * When the bundle is started, the runner tells the worker if the cache
> has become invalid (since it knows if another worker has mutated state)
>
> This is simply done by not transferring the particular cache token. No
> need to declare it invalid explicitly.
>
> > * When the worker sends mutation requests to the runner, it includes its
> own ID (or the runner already has it as contextual information). No need to
> wait for a response.
>
> Mutations of cached values can be freely done as long as the cache token
> associated with the state is valid for a particular bundle. Only the
> first time, the Runner needs to wait on the response to store the cache
> token. This can also be done asynchronously.


In the PR the token is modified as part of updating the state. Doesn't the
SDK need the new token to update it's cache entry also?

That's where it would help the SDK to know the new token upfront. But I
believe there is no need to change the token in first place, unless bundles
for the same key (ranges) can be processed by different workers.



>
>
> > * When the bundle is finished, the runner records the last writer (only
> if a change occurred)
>
> I believe this is not necessary because there will only be one writer at
> a time for a particular bundle and key range, hence only one writer
> holds a valid cache token for a particular state and key range.
>
>
> @Reuven:
>
> >  Dataflow divides the keyspace up into lexicographic ranges, and creates
> a cache token per range.
>
> State is always processed partitioned by the Flink workers (hash-based,
> not lexicopgrahical). I don't think that matters though because the key
> ranges do not overlap between the workers. Flink does not support
> dynamically repartitioning the key ranges. Even in case of fine-grained
> recovery of workers and their key ranges, we would simply generate new
> cache tokens for a particular worker.
>
>
> Thanks,
> Max
>
> On 21.08.19 09:33, Reuven Lax wrote:
> > Dataflow does something like this, however since work is
> > load balanced across workers a per-worker id doesn't work very well.
> > Dataflow divides the keyspace up into lexicographic ranges, and creates
> > a cache token per range.
> >
> > On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise  > > wrote:
> >
> > Commenting here vs. on the PR since related to the overall approach.
> >
> > Wouldn't it be simpler to have the runner just track a unique ID for
> > each worker and use that to communicate if the cache is valid or not?
> >
> > * When the bundle is started, the runner tells the worker if the
> > cache has become invalid (since it knows if another worker has
> > mutated state)
> > * When the worker sends mutation requests to the runner, it includes
> > its own ID (or the runner already has it as contextual information).
> > No need to wait for a response.
> > * When the bundle is finished, the runner records the last writer
> > (only if a change occurred)
> >
> > Whenever current worker ID and last writer ID doesn't match, cache
> > is invalid.
> >
> > Thomas
> >
> >
> > On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik  > > wrote:
> >
> > Having cache tokens per key would be very expensive indeed and I
> > believe we should go with a single cache token "per" bundle.
> >
> > On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels
> > mailto:m...@apache.org>> wrote:
> >
> > Maybe a Beam Python expert can chime in for Rakesh's
> question?
> >
> > Luke, I was assuming cache tokens to be per key and state
> > id. During
> > implementing an initial support on the Runner side, I
> > realized that we
> > probably want cache tokens to only be per state id. Note
> > that if we had
> > per-key cache tokens, the number of cache tokens

Re: Java 11 compatibility question

2019-08-21 Thread Łukasz Gajowy
Thank you, Elliotte for showing us the issues with JPMS.

So maybe we should just announce for end users that they can run Beam
pipelines in Java 11 but that for the moment Beam modules cannot be
used in Java 11 module style. I know that there is already a lot of
fear around Java 8 not being maintained so this will probably be
perceived well even if not perfect.

+1 for sharing this information to the users. IMO this is really valuable
knowledge.

Łukasz




śr., 21 sie 2019 o 12:22 Ismaël Mejía  napisał(a):

> Thanks again Elliotte for the clear information and references. It
> seems that being compatible with Java 11 modules will be more elusive
> than expected considering the transitive dependencies. Do you (or
> someone else) knows if there there is a plugin or easy way to discover
> this?
>
> I think that solving this for transitive dependencies will be elusive
> for a LONG LONG time (I had not even thought about IO modules that
> have dependencies and commonly live in ‘older’ stable versions). So
> maybe we should just announce for end users that they can run Beam
> pipelines in Java 11 but that for the moment Beam modules cannot be
> used in Java 11 module style. I know that there is already a lot of
> fear around Java 8 not being maintained so this will probably be
> perceived well even if not perfect.
>
> I filled https://issues.apache.org/jira/browse/BEAM-8021 to set
> explicitly the module names. We will probably need also to put some
> validation in place that the jars always include the module name so
> new modules don’t forget to do so.
>
>
> On Wed, Aug 21, 2019 at 3:28 AM Elliotte Rusty Harold
>  wrote:
> >
> > If somebody is using JPMS and they attempt to import beam, they get a
> > compile time error. Some other projects I work on have been getting
> > user reports about this.
> >
> > See
> https://github.com/GoogleCloudPlatform/cloud-opensource-java/blob/master/library-best-practices/JLBP-19.md
> > for more details.
> >
> > On Tue, Aug 20, 2019 at 5:30 PM Ahmet Altay  wrote:
> > >
> > >
> > >
> > > On Tue, Aug 20, 2019 at 8:37 AM Elliotte Rusty Harold <
> elh...@ibiblio.org> wrote:
> > >>
> > >>
> > >>
> > >> On Tue, Aug 20, 2019 at 7:51 AM Ismaël Mejía 
> wrote:
> > >>>
> > >>> a per case approach (the exception could be portable runners not
> based on Java).
> > >>>
> > >>> Of course other definitions of being Java 11 compatible are
> interesting but probably not part of our current scope. Actions like change
> the codebase to use Java 11 specific APIs / idioms, publish Java 11
> specific artifacts or use Java Platform Modules (JPM). All of these may be
> nice to have but are probably less important for end users who may just
> want to be able to use Beam in its current form in Java 11 VMs.
> > >>>
> > >>> What do others think? Is this enough to announce Java 11
> compatibility and add the documentation to the webpage?
> > >>
> > >>
> > >> No, it isn't, I fear. We don't have to use JPMS in Beam, but Beam
> really does need to be compatible with JPMS-using apps. The bare minimum
> here is avoiding split packages, and that needs to include all transitive
> dependencies, not just Beam itself. I don't think we meet that bar now.
> > >
> > >
> > > For my understanding, what would be the limitations of Beam's
> dependencies having split dependencies? Would it limit Beam users from
> using 3rd party libraries that require JPMS supports? Would it be in scope
> for Beam to get its dependencies to meet a certain bar?
> > >
> > > Ismaël's definition of being able to run Beam published dependencies
> in Java 11 VM sounds enough to me "to announce Java 11 compatibility _for
> Beam_".
> > >
> > >>
> > >>
> > >> --
> > >> Elliotte Rusty Harold
> > >> elh...@ibiblio.org
> >
> >
> >
> > --
> > Elliotte Rusty Harold
> > elh...@ibiblio.org
>


Re: Java 11 compatibility question

2019-08-21 Thread Ismaël Mejía
Thanks again Elliotte for the clear information and references. It
seems that being compatible with Java 11 modules will be more elusive
than expected considering the transitive dependencies. Do you (or
someone else) knows if there there is a plugin or easy way to discover
this?

I think that solving this for transitive dependencies will be elusive
for a LONG LONG time (I had not even thought about IO modules that
have dependencies and commonly live in ‘older’ stable versions). So
maybe we should just announce for end users that they can run Beam
pipelines in Java 11 but that for the moment Beam modules cannot be
used in Java 11 module style. I know that there is already a lot of
fear around Java 8 not being maintained so this will probably be
perceived well even if not perfect.

I filled https://issues.apache.org/jira/browse/BEAM-8021 to set
explicitly the module names. We will probably need also to put some
validation in place that the jars always include the module name so
new modules don’t forget to do so.


On Wed, Aug 21, 2019 at 3:28 AM Elliotte Rusty Harold
 wrote:
>
> If somebody is using JPMS and they attempt to import beam, they get a
> compile time error. Some other projects I work on have been getting
> user reports about this.
>
> See 
> https://github.com/GoogleCloudPlatform/cloud-opensource-java/blob/master/library-best-practices/JLBP-19.md
> for more details.
>
> On Tue, Aug 20, 2019 at 5:30 PM Ahmet Altay  wrote:
> >
> >
> >
> > On Tue, Aug 20, 2019 at 8:37 AM Elliotte Rusty Harold  
> > wrote:
> >>
> >>
> >>
> >> On Tue, Aug 20, 2019 at 7:51 AM Ismaël Mejía  wrote:
> >>>
> >>> a per case approach (the exception could be portable runners not based on 
> >>> Java).
> >>>
> >>> Of course other definitions of being Java 11 compatible are interesting 
> >>> but probably not part of our current scope. Actions like change the 
> >>> codebase to use Java 11 specific APIs / idioms, publish Java 11 specific 
> >>> artifacts or use Java Platform Modules (JPM). All of these may be nice to 
> >>> have but are probably less important for end users who may just want to 
> >>> be able to use Beam in its current form in Java 11 VMs.
> >>>
> >>> What do others think? Is this enough to announce Java 11 compatibility 
> >>> and add the documentation to the webpage?
> >>
> >>
> >> No, it isn't, I fear. We don't have to use JPMS in Beam, but Beam really 
> >> does need to be compatible with JPMS-using apps. The bare minimum here is 
> >> avoiding split packages, and that needs to include all transitive 
> >> dependencies, not just Beam itself. I don't think we meet that bar now.
> >
> >
> > For my understanding, what would be the limitations of Beam's dependencies 
> > having split dependencies? Would it limit Beam users from using 3rd party 
> > libraries that require JPMS supports? Would it be in scope for Beam to get 
> > its dependencies to meet a certain bar?
> >
> > Ismaël's definition of being able to run Beam published dependencies in 
> > Java 11 VM sounds enough to me "to announce Java 11 compatibility _for 
> > Beam_".
> >
> >>
> >>
> >> --
> >> Elliotte Rusty Harold
> >> elh...@ibiblio.org
>
>
>
> --
> Elliotte Rusty Harold
> elh...@ibiblio.org


Re: Write-through-cache in State logic

2019-08-21 Thread Maximilian Michels
Appreciate all your comments! Replying below.


@Luke:

> Having cache tokens per key would be very expensive indeed and I believe we 
> should go with a single cache token "per" bundle.

Thanks for your comments on the PR. I was thinking to propose something
along this lines of having cache tokens valid for a particular
checkpointing "epoch". That would require even less token renewal than
the per-bundle approach.


@Thomas, thanks for the input. Some remarks:

> Wouldn't it be simpler to have the runner just track a unique ID for each 
> worker and use that to communicate if the cache is valid or not?

We do not need a unique id per worker. If a cache token is valid for a
particular worker, it is also valid for another worker. That is with the
assumption that key ranges are always disjoint between the workers.

> * When the bundle is started, the runner tells the worker if the cache has 
> become invalid (since it knows if another worker has mutated state)

This is simply done by not transferring the particular cache token. No
need to declare it invalid explicitly.

> * When the worker sends mutation requests to the runner, it includes its own 
> ID (or the runner already has it as contextual information). No need to wait 
> for a response.

Mutations of cached values can be freely done as long as the cache token
associated with the state is valid for a particular bundle. Only the
first time, the Runner needs to wait on the response to store the cache
token. This can also be done asynchronously.

> * When the bundle is finished, the runner records the last writer (only if a 
> change occurred)

I believe this is not necessary because there will only be one writer at
a time for a particular bundle and key range, hence only one writer
holds a valid cache token for a particular state and key range.


@Reuven:

>  Dataflow divides the keyspace up into lexicographic ranges, and creates a 
> cache token per range. 

State is always processed partitioned by the Flink workers (hash-based,
not lexicopgrahical). I don't think that matters though because the key
ranges do not overlap between the workers. Flink does not support
dynamically repartitioning the key ranges. Even in case of fine-grained
recovery of workers and their key ranges, we would simply generate new
cache tokens for a particular worker.


Thanks,
Max

On 21.08.19 09:33, Reuven Lax wrote:
> Dataflow does something like this, however since work is
> load balanced across workers a per-worker id doesn't work very well.
> Dataflow divides the keyspace up into lexicographic ranges, and creates
> a cache token per range. 
> 
> On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise  > wrote:
> 
> Commenting here vs. on the PR since related to the overall approach.
> 
> Wouldn't it be simpler to have the runner just track a unique ID for
> each worker and use that to communicate if the cache is valid or not?
> 
> * When the bundle is started, the runner tells the worker if the
> cache has become invalid (since it knows if another worker has
> mutated state)
> * When the worker sends mutation requests to the runner, it includes
> its own ID (or the runner already has it as contextual information).
> No need to wait for a response.
> * When the bundle is finished, the runner records the last writer
> (only if a change occurred)
> 
> Whenever current worker ID and last writer ID doesn't match, cache
> is invalid.
> 
> Thomas
> 
> 
> On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik  > wrote:
> 
> Having cache tokens per key would be very expensive indeed and I
> believe we should go with a single cache token "per" bundle.
> 
> On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels
> mailto:m...@apache.org>> wrote:
> 
> Maybe a Beam Python expert can chime in for Rakesh's question?
> 
> Luke, I was assuming cache tokens to be per key and state
> id. During
> implementing an initial support on the Runner side, I
> realized that we
> probably want cache tokens to only be per state id. Note
> that if we had
> per-key cache tokens, the number of cache tokens would
> approach the
> total number of keys in an application.
> 
> If anyone wants to have a look, here is a first version of
> the Runner
> side for cache tokens. Note that I only implemented cache
> tokens for
> BagUserState for now, but it can be easily added for side
> inputs as well.
> 
> https://github.com/apache/beam/pull/9374
> 
> -Max
> 
> 


Re: SqlTransform Metadata

2019-08-21 Thread Reza Rokni
@Kenn / @Rob  has there been any other discussions on how the timestamp
value can be accessed from within the SQL since this thread in May?

If not my vote  is for a convenience method  that gives access to the
timestamp as a function call within the SQL statement.

Reza

On Wed, 22 May 2019 at 10:06, Reza Rokni  wrote:

> Hi,
>
> Coming back to this do we have enough of a consensus to say that in
> principle this is a good idea? If yes I will raise a Jira for this.
>
> Cheers
>
> Reza
>
> On Thu, 16 May 2019 at 02:58, Robert Bradshaw  wrote:
>
>> On Wed, May 15, 2019 at 8:51 PM Kenneth Knowles  wrote:
>> >
>> > On Wed, May 15, 2019 at 3:05 AM Robert Bradshaw 
>> wrote:
>> >>
>> >> Isn't there an API for concisely computing new fields from old ones?
>> >> Perhaps these expressions could contain references to metadata value
>> >> such as timestamp. Otherwise,
>> >
>> > Even so, being able to refer to the timestamp implies something about
>> its presence in a namespace, shared with other user-decided names.
>>
>> I was thinking that functions may live in a different namespace than
>> fields.
>>
>> > And it may be nice for users to use that API within the composite
>> SqlTransform. I think there are a lot of options.
>> >
>> >> Rather than withMetadata reifying the value as a nested field, with
>> >> the timestamp, window, etc. at the top level, one could let it take a
>> >> field name argument that attaches all the metadata as an extra
>> >> (struct-like) field. This would be like attachX, but without having to
>> >> have a separate method for every X.
>> >
>> > If you leave the input field names at the top level, then any "attach"
>> style API requires choosing a name that doesn't conflict with input field
>> names. You can't write a generic transform that works with all inputs. I
>> think it is much simpler to move the input field all into a nested
>> row/struct. Putting all the metadata in a second nested row/struct is just
>> as good as top-level, perhaps. But moving the input into the struct/row is
>> important.
>>
>> Very good point about writing generic transforms. It does mean a lot
>> of editing if one decides one wants to access the metadata field(s)
>> after-the-fact. (I also don't think we need to put the metadata in a
>> nested struct if the value is.)
>>
>> >> It seems restrictive to only consider this a a special mode for
>> >> SqlTransform rather than a more generic operation. (For SQL, my first
>> >> instinct would be to just make this a special function like
>> >> element_timestamp(), but there is some ambiguity there when there are
>> >> multiple tables in the expression.)
>> >
>> > I would propose it as both: we already have some Reify transforms, and
>> you could make a general operation that does this small data preparation
>> easily. I think the proposal is just to add a convenience build method on
>> SqlTransform to include the underlying functionality as part of the
>> composite, which we really already have.
>> >
>> > I don't think we should extend SQL with built-in functions for
>> element_timestamp() and things like that, because SQL already has TIMESTAMP
>> columns and it is very natural to use SQL on unbounded relations where the
>> timestamp is just part of the data.
>>
>> That's why I was suggesting a single element_metadata() rather than
>> exploding each one out.
>>
>> Do you have a pointer to what the TIMESTAMP columns are? (I'm assuming
>> this is a special field, but distinct from the metadata timestamp?)
>>
>> >> On Wed, May 15, 2019 at 5:03 AM Reza Rokni  wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > One use case would be when dealing with the windowing functions for
>> example:
>> >> >
>> >> > SELECT f_int, COUNT(*) , TUMBLE_START(f_timestamp, INTERVAL '1'
>> HOUR) tumble_start
>> >> >   FROM PCOLLECTION
>> >> >   GROUP BY
>> >> > f_int,
>> >> > TUMBLE(f_timestamp, INTERVAL '1' HOUR)
>> >> >
>> >> > For an element which is using Metadata to inform the EvenTime of the
>> element, rather than data within the element itself, I would need to create
>> a new schema which added the timestamp as a field. I think other examples
>> which maybe interesting is getting the value of a row with the max/min
>> timestamp. None of this would be difficult but it does feel a little on the
>> verbose side and also makes the pipeline a little harder to read.
>> >> >
>> >> > Cheers
>> >> > Reza
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > From: Kenneth Knowles 
>> >> > Date: Wed, 15 May 2019 at 01:15
>> >> > To: dev
>> >> >
>> >> >> We have support for nested rows so this should be easy. The
>> .withMetadata would reify the struct, moving from Row to WindowedValue
>> if I understand it...
>> >> >>
>> >> >> SqlTransform.query("SELECT field1 from PCOLLECTION"):
>> >> >>
>> >> >> Schema = {
>> >> >>   field1: type1,
>> >> >>   field2: type2
>> >> >> }
>> >> >>
>> >> >> SqlTransform.query(...)
>> >> >>
>> >> >> SqlTransform.withMetadata().query("SELECT event_timestam

Re: Write-through-cache in State logic

2019-08-21 Thread Reuven Lax
Dataflow does something like this, however since work is
load balanced across workers a per-worker id doesn't work very well.
Dataflow divides the keyspace up into lexicographic ranges, and creates a
cache token per range.

On Tue, Aug 20, 2019 at 8:35 PM Thomas Weise  wrote:

> Commenting here vs. on the PR since related to the overall approach.
>
> Wouldn't it be simpler to have the runner just track a unique ID for each
> worker and use that to communicate if the cache is valid or not?
>
> * When the bundle is started, the runner tells the worker if the cache has
> become invalid (since it knows if another worker has mutated state)
> * When the worker sends mutation requests to the runner, it includes its
> own ID (or the runner already has it as contextual information). No need to
> wait for a response.
> * When the bundle is finished, the runner records the last writer (only if
> a change occurred)
>
> Whenever current worker ID and last writer ID doesn't match, cache is
> invalid.
>
> Thomas
>
>
> On Tue, Aug 20, 2019 at 11:42 AM Lukasz Cwik  wrote:
>
>> Having cache tokens per key would be very expensive indeed and I believe
>> we should go with a single cache token "per" bundle.
>>
>> On Mon, Aug 19, 2019 at 11:36 AM Maximilian Michels 
>> wrote:
>>
>>> Maybe a Beam Python expert can chime in for Rakesh's question?
>>>
>>> Luke, I was assuming cache tokens to be per key and state id. During
>>> implementing an initial support on the Runner side, I realized that we
>>> probably want cache tokens to only be per state id. Note that if we had
>>> per-key cache tokens, the number of cache tokens would approach the
>>> total number of keys in an application.
>>>
>>> If anyone wants to have a look, here is a first version of the Runner
>>> side for cache tokens. Note that I only implemented cache tokens for
>>> BagUserState for now, but it can be easily added for side inputs as well.
>>>
>>> https://github.com/apache/beam/pull/9374
>>>
>>> -Max
>>>
>>>
>>>