Re: [VOTE] Release 2.27.0, release candidate #4

2021-01-06 Thread Ahmet Altay
+1 (binding) - validated python quickstarts.

Thank you Pablo.

On Wed, Jan 6, 2021 at 1:57 PM Pablo Estrada  wrote:

> +1 (binding)
> I've built and unit tested existing Dataflow Templates with the new
> version.
> Best
> -P.
>
> On Tue, Jan 5, 2021 at 11:17 PM Pablo Estrada  wrote:
>
>> Hi everyone,
>> Please review and vote on the release candidate #4 for the
>> version 2.27.0, as follows:
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>> *NOTE*. What happened to RC #2? I started building RC2 before completing
>> all the cherry-picks, so the tag for RC2 was created on an incorrect commit.
>>
>> *NOTE*. What happened to RC #3? I started building RC3, but a new bug
>> was discovered (BEAM-11569) that required amending the branch. Thus this is
>> now RC4.
>>
>> Reviewers are encouraged to test their own use cases with the release
>> candidate, and vote +1
>>  if no issues are found.
>>
>> The complete staging area is available for your review, which includes:
>> * JIRA release notes [1],
>> * the official Apache source release to be deployed to dist.apache.org [2],
>> which is signed with the key with fingerprint
>> C79DDD47DAF3808F0B9DDFAC02B2D9F742008494 [3],
>> * all artifacts to be deployed to the Maven Central Repository [4],
>> * source code tag "v2.27.0-RC4" [5],
>> * website pull request listing the release [6], publishing the API
>> reference manual [7], and the blog post [8].
>> * Python artifacts are deployed along with the source release to the
>> dist.apache.org [2].
>> * Validation sheet with a tab for 2.27.0 release to help with validation
>> [9].
>> * Docker images published to Docker Hub [10].
>>
>> The vote will be open for at least 72 hours, but given the holidays, we
>> will likely extend for a few more days. The release will be adopted by
>> majority approval, with at least 3 PMC affirmative votes.
>>
>> Thanks,
>> -P.
>>
>> [1]
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12349380
>>
>> [2] https://dist.apache.org/repos/dist/dev/beam/2.27.0/
>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> [4]
>> https://repository.apache.org/content/repositories/orgapachebeam-1149/
>> [5] https://github.com/apache/beam/tree/v2.27.0-RC4
>> [6] https://github.com/apache/beam/pull/13602
>> [7] https://github.com/apache/beam-site/pull/610
>> [8] https://github.com/apache/beam/pull/13603
>> [9]
>> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=194829106
>>
>> [10] https://hub.docker.com/search?q=apache%2Fbeam&type=image
>>
>


Re: Contributor Permissions

2021-01-06 Thread Ahmet Altay
I added you as a contributor. You can self assign the jira to yourself.
Welcome and thank you for contributing.

On Wed, Jan 6, 2021 at 3:22 PM John Edmonds 
wrote:

> I'm interested in taking on a small issue I created (
> https://issues.apache.org/jira/browse/BEAM-11580). Could I have
> permission to assign it to myself?
>


Contributor Permissions

2021-01-06 Thread John Edmonds
I'm interested in taking on a small issue I created 
(https://issues.apache.org/jira/browse/BEAM-11580). Could I have permission to 
assign it to myself?


Re: Standarizing the "Runner" concept across website content

2021-01-06 Thread Griselda Cuevas
Thank you all for this productive conversation!

Interestingly enough, a usability study we ran for Apache Beam (more
details coming soon) pointed out that our documentation and website assume
that the readers will be already familiar with Data Processing basic
concepts such as engines, pipelines, etc. So introducing a glossary and
even rethinking how we add this concepts into our new documentation is a
good practice to have in mind.

In the meantime, I will adopt the suggestion of differentiating between
engine and runner. The first application I made of this is in the copy for
the home page, which you can find as an attached file in this Jira ticket
[1] in case you want to add comments/suggestions.

The home page is the most important page in the website, as it's the one
that explains Beam to the world and markets it's features, so appreciate
feedback there too.

Thanks everyone!

[1]
https://issues.apache.org/jira/browse/BEAM-11346?jql=project%20%3D%20beam%20AND%20assignee%20%3D%20gris%20ORDER%20BY%20priority%20DESC

On Wed, 6 Jan 2021 at 13:33, Kenneth Knowles  wrote:

>
>
> On Wed, Jan 6, 2021 at 12:28 PM Robert Burke  wrote:
>
>> +1 on consolidating and being consistent with our terms.
>>
>> I've always considered them (Runner/Engine) synonymous. From a user
>> perspective, an engine without a runner isn't any good for their beam
>> pipeline. That there's an adapter is an implementation detail in some
>> instances. I do appreciate not using Adapter a term, avoiding confusing
>> descriptions.
>>
>> However, if we make the change and there's a clear glossary of terms
>> somewhere then
>>
>> That puts the lifecycle of a pipeline to be (loosely) something like...
>>
>> A Beam User authors Pipelines by writing DoFns, adding them as
>> PTransforms connected by PCollections into a Pipeline using a Beam SDK. An
>> SDK converts the pipeline into a portable representation, and submit it to
>> the Job Management Service of a Beam Runner. A Beam Runner translates the
>> portable pipeline representation into terms an underlying Engine
>> understands for Execution. The Beam Runner also reverses this translation
>> when the Engine delegates tasks to workers, so that the Beam SDKs can
>> execute the user's DoFns in keeping with the Beam Semantics.
>>
>
> An explicit glossary is a great idea to combine with standardizing
> terminology across the site. I think the important context is that most of
> the engines already existed before Beam and many of them are more
> well-known. In fact, a pretty good way for a user to understand the essence
> of what Beam is about is by taking a look at all the engines for which
> there are Beam runners :-)
>
> Engine: a system/product for doing [big] data processing
> Pipeline: user authors this logic that says what they want to compute (I
> think the fact that it is a DAG of PTransforms is relevant but we can get
> away with omitting it for the high-level view and to avoid introducing the
> term PTransform too early)
> Runner: executes a Beam pipeline on an engine (agree that "adapter" is too
> generic)
>
> I'd say below that level of granularity is getting into things that you
> need to know only after you have started writing pipelines. Possibly you
> need to introduce SDK harness to make clear that Beam pipelines are
> inherently multi-language/multi-runtime, even if the engine isn't (my
> personal opinion is that "UDF server" is the best understood terminology
> for this, and so much better that it is never too late to abandon the
> cryptic term "SDK harness").
>
> Kenn
>
>
>> (Not covered, bundles etc, but you get the idea...)
>>
>> On Wed, Jan 6, 2021, 11:16 AM Robert Bradshaw 
>> wrote:
>>
>>> +1 to keeping the distinction between Runner and Engine as Kenn
>>> described, and cleaning up the site with these in mind (I don't think the
>>> term engine is widely used yet).
>>>
>>> On Wed, Jan 6, 2021 at 11:15 AM Yichi Zhang  wrote:
>>>
 I agree with what kenn said, in most cases I would refer to the term
 runner as the adapter for translating user's pipeline code into a job
 representation and submitting it to the execution engine. Though in some
 cases they may still be used interchangeably such as direct runner?

 On Wed, Jan 6, 2021 at 11:02 AM Kenneth Knowles 
 wrote:

> I personally try to always distinguish two concepts: the thing doing
> the computing (like Spark or Flink), and the adapter for running a Beam
> pipeline (like SparkRunner or FlinkRunner). I use the term "runner" to 
> mean
> the adapter, and have been trying to use the term "engine" to refer to the
> thing doing the computing. Do you think that users will use these two
> interchangeably? Do you have recommendations about if these terms makes
> sense to users?
>
> Kenn
>
> On Wed, Jan 6, 2021 at 10:23 AM Griselda Cuevas 
> wrote:
>
>> Hi dev@ community, Happy New Year!
>>
>> I'm working on updating the copy of 

Re: [VOTE] Release 2.27.0, release candidate #4

2021-01-06 Thread Pablo Estrada
+1 (binding)
I've built and unit tested existing Dataflow Templates with the new version.
Best
-P.

On Tue, Jan 5, 2021 at 11:17 PM Pablo Estrada  wrote:

> Hi everyone,
> Please review and vote on the release candidate #4 for the version 2.27.0,
> as follows:
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> *NOTE*. What happened to RC #2? I started building RC2 before completing
> all the cherry-picks, so the tag for RC2 was created on an incorrect commit.
>
> *NOTE*. What happened to RC #3? I started building RC3, but a new bug was
> discovered (BEAM-11569) that required amending the branch. Thus this is now
> RC4.
>
> Reviewers are encouraged to test their own use cases with the release
> candidate, and vote +1
>  if no issues are found.
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release to be deployed to dist.apache.org [2],
> which is signed with the key with fingerprint
> C79DDD47DAF3808F0B9DDFAC02B2D9F742008494 [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "v2.27.0-RC4" [5],
> * website pull request listing the release [6], publishing the API
> reference manual [7], and the blog post [8].
> * Python artifacts are deployed along with the source release to the
> dist.apache.org [2].
> * Validation sheet with a tab for 2.27.0 release to help with validation
> [9].
> * Docker images published to Docker Hub [10].
>
> The vote will be open for at least 72 hours, but given the holidays, we
> will likely extend for a few more days. The release will be adopted by
> majority approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> -P.
>
> [1]
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12349380
>
> [2] https://dist.apache.org/repos/dist/dev/beam/2.27.0/
> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
> [4] https://repository.apache.org/content/repositories/orgapachebeam-1149/
>
> [5] https://github.com/apache/beam/tree/v2.27.0-RC4
> [6] https://github.com/apache/beam/pull/13602
> [7] https://github.com/apache/beam-site/pull/610
> [8] https://github.com/apache/beam/pull/13603
> [9]
> https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=194829106
>
> [10] https://hub.docker.com/search?q=apache%2Fbeam&type=image
>


Re: Compatibility between Beam v2.23 and Beam v2.26

2021-01-06 Thread Antonio Si
Thanks for the information. Do we have a jira to track this issue or do you 
want me to create a jira for this?

Thanks.

Antonio.

On 2021/01/06 17:59:47, Kenneth Knowles  wrote: 
> Agree with Boyuan & Kyle. That PR is the problem, and we probably do not
> have adequate testing. We have a cultural understanding of not breaking
> encoded data forms but this is the encoded form of the TypeSerializer, and
> actually there are two problems.
> 
> 1. When you have a serialized object that does not have the
> serialVersionUid explicitly set, the UID is generated based on many details
> that are irrelevant for binary compatibility. Any Java-serialized object
> that is intended for anything other than transient transmission *must* have
> a serialVersionUid set and an explicit serialized form. Else it is
> completely normal for it to break due to irrelevant changes. The
> serialVersionUid has no mechanism for upgrade/downgrade so you *must* keep
> it the same forever, and any versioning or compat scheme exists within the
> single serialVersionUid.
> 2. In this case there was an actual change to the fields of the object
> stored, so you need to explicitly add the serialized form and also the
> ability to read from prior serialized forms.
> 
> I believe explicitly setting the serialVersionUid to the original (and
> keeping it that way forever) and adding the ability to decode prior forms
> will regain the ability to read the snapshot. But also this seems like
> something that would be part of Flink best practice documentation since
> naive use of Java serialization often hits this problem.
> 
> Kenn
> 
> On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver  wrote:
> 
> > This raises a few related questions from me:
> >
> > 1. Do we claim to support resuming Flink checkpoints made with previous
> > Beam versions?
> > 2. Does 1. require full binary compatibility between different versions of
> > runner internals like CoderTypeSerializer?
> >
> 3. Do we have tests for 1.?
> >
> 
> Kenn
> 
> 
> > On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang  wrote:
> >
> >> https://github.com/apache/beam/pull/13240 seems suspicious to me.
> >>
> >>  +Maximilian Michels  Any insights here?
> >>
> >> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si  wrote:
> >>
> >>> Hi,
> >>>
> >>> I would like to followup with this question to see if there is a
> >>> solution/workaround for this issue.
> >>>
> >>> Thanks.
> >>>
> >>> Antonio.
> >>>
> >>> On 2020/12/19 18:33:48, Antonio Si  wrote:
> >>> > Hi,
> >>> >
> >>> > We were using Beam v2.23 and recently, we are testing upgrade to Beam
> >>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read 
> >>> and
> >>> --fasterCopy=true.
> >>> >
> >>> > We run into this exception when we resume our pipeline:
> >>> >
> >>> > Caused by: java.io.InvalidClassException:
> >>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
> >>> class incompatible: stream classdesc serialVersionUID =
> >>> 5241803328188007316, local class serialVersionUID = 7247319138941746449
> >>> >   at
> >>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> >>> >   at
> >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
> >>> >   at
> >>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
> >>> >   at
> >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
> >>> >   at
> >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
> >>> >   at
> >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
> >>> >   at
> >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
> >>> >   at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
> >>> >   at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
> >>> >   at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
> >>> >   at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
> >>> >   at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> >>> >   at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> >>> >   at
> >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> >>> >   at
> >>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnap

Re: Standarizing the "Runner" concept across website content

2021-01-06 Thread Kenneth Knowles
On Wed, Jan 6, 2021 at 12:28 PM Robert Burke  wrote:

> +1 on consolidating and being consistent with our terms.
>
> I've always considered them (Runner/Engine) synonymous. From a user
> perspective, an engine without a runner isn't any good for their beam
> pipeline. That there's an adapter is an implementation detail in some
> instances. I do appreciate not using Adapter a term, avoiding confusing
> descriptions.
>
> However, if we make the change and there's a clear glossary of terms
> somewhere then
>
> That puts the lifecycle of a pipeline to be (loosely) something like...
>
> A Beam User authors Pipelines by writing DoFns, adding them as PTransforms
> connected by PCollections into a Pipeline using a Beam SDK. An SDK converts
> the pipeline into a portable representation, and submit it to the Job
> Management Service of a Beam Runner. A Beam Runner translates the portable
> pipeline representation into terms an underlying Engine understands for
> Execution. The Beam Runner also reverses this translation when the Engine
> delegates tasks to workers, so that the Beam SDKs can execute the user's
> DoFns in keeping with the Beam Semantics.
>

An explicit glossary is a great idea to combine with standardizing
terminology across the site. I think the important context is that most of
the engines already existed before Beam and many of them are more
well-known. In fact, a pretty good way for a user to understand the essence
of what Beam is about is by taking a look at all the engines for which
there are Beam runners :-)

Engine: a system/product for doing [big] data processing
Pipeline: user authors this logic that says what they want to compute (I
think the fact that it is a DAG of PTransforms is relevant but we can get
away with omitting it for the high-level view and to avoid introducing the
term PTransform too early)
Runner: executes a Beam pipeline on an engine (agree that "adapter" is too
generic)

I'd say below that level of granularity is getting into things that you
need to know only after you have started writing pipelines. Possibly you
need to introduce SDK harness to make clear that Beam pipelines are
inherently multi-language/multi-runtime, even if the engine isn't (my
personal opinion is that "UDF server" is the best understood terminology
for this, and so much better that it is never too late to abandon the
cryptic term "SDK harness").

Kenn


> (Not covered, bundles etc, but you get the idea...)
>
> On Wed, Jan 6, 2021, 11:16 AM Robert Bradshaw  wrote:
>
>> +1 to keeping the distinction between Runner and Engine as Kenn
>> described, and cleaning up the site with these in mind (I don't think the
>> term engine is widely used yet).
>>
>> On Wed, Jan 6, 2021 at 11:15 AM Yichi Zhang  wrote:
>>
>>> I agree with what kenn said, in most cases I would refer to the term
>>> runner as the adapter for translating user's pipeline code into a job
>>> representation and submitting it to the execution engine. Though in some
>>> cases they may still be used interchangeably such as direct runner?
>>>
>>> On Wed, Jan 6, 2021 at 11:02 AM Kenneth Knowles  wrote:
>>>
 I personally try to always distinguish two concepts: the thing doing
 the computing (like Spark or Flink), and the adapter for running a Beam
 pipeline (like SparkRunner or FlinkRunner). I use the term "runner" to mean
 the adapter, and have been trying to use the term "engine" to refer to the
 thing doing the computing. Do you think that users will use these two
 interchangeably? Do you have recommendations about if these terms makes
 sense to users?

 Kenn

 On Wed, Jan 6, 2021 at 10:23 AM Griselda Cuevas 
 wrote:

> Hi dev@ community, Happy New Year!
>
> I'm working on updating the copy of a few website pages, and something
> that I want to solve is standardize how we refer to runners across the
> site. So far I've identified these definitions:
>
>- Back-end
>- Backend systems
>- Execution environments
>- Runtime
>- Runtime system
>- Runner
>
> Even when the majority of users will understand these concepts
> interchangeably, it's a good idea to be consistent so new users get
> familiar with how Beam works and its components.
>
> I'm going to start using the word "Runner" as I update the copy and
> will ask the team working in te UI revamp to do the same. Let me know if
> you have any questions/concerns.
>



Re: Standarizing the "Runner" concept across website content

2021-01-06 Thread Robert Burke
+1 on consolidating and being consistent with our terms.

I've always considered them (Runner/Engine) synonymous. From a user
perspective, an engine without a runner isn't any good for their beam
pipeline. That there's an adapter is an implementation detail in some
instances. I do appreciate not using Adapter a term, avoiding confusing
descriptions.

However, if we make the change and there's a clear glossary of terms
somewhere then

That puts the lifecycle of a pipeline to be (loosely) something like...

A Beam User authors Pipelines by writing DoFns, adding them as PTransforms
connected by PCollections into a Pipeline using a Beam SDK. An SDK converts
the pipeline into a portable representation, and submit it to the Job
Management Service of a Beam Runner. A Beam Runner translates the portable
pipeline representation into terms an underlying Engine understands for
Execution. The Beam Runner also reverses this translation when the Engine
delegates tasks to workers, so that the Beam SDKs can execute the user's
DoFns in keeping with the Beam Semantics.

(Not covered, bundles etc, but you get the idea...)

On Wed, Jan 6, 2021, 11:16 AM Robert Bradshaw  wrote:

> +1 to keeping the distinction between Runner and Engine as Kenn described,
> and cleaning up the site with these in mind (I don't think the term engine
> is widely used yet).
>
> On Wed, Jan 6, 2021 at 11:15 AM Yichi Zhang  wrote:
>
>> I agree with what kenn said, in most cases I would refer to the term
>> runner as the adapter for translating user's pipeline code into a job
>> representation and submitting it to the execution engine. Though in some
>> cases they may still be used interchangeably such as direct runner?
>>
>> On Wed, Jan 6, 2021 at 11:02 AM Kenneth Knowles  wrote:
>>
>>> I personally try to always distinguish two concepts: the thing doing the
>>> computing (like Spark or Flink), and the adapter for running a Beam
>>> pipeline (like SparkRunner or FlinkRunner). I use the term "runner" to mean
>>> the adapter, and have been trying to use the term "engine" to refer to the
>>> thing doing the computing. Do you think that users will use these two
>>> interchangeably? Do you have recommendations about if these terms makes
>>> sense to users?
>>>
>>> Kenn
>>>
>>> On Wed, Jan 6, 2021 at 10:23 AM Griselda Cuevas  wrote:
>>>
 Hi dev@ community, Happy New Year!

 I'm working on updating the copy of a few website pages, and something
 that I want to solve is standardize how we refer to runners across the
 site. So far I've identified these definitions:

- Back-end
- Backend systems
- Execution environments
- Runtime
- Runtime system
- Runner

 Even when the majority of users will understand these concepts
 interchangeably, it's a good idea to be consistent so new users get
 familiar with how Beam works and its components.

 I'm going to start using the word "Runner" as I update the copy and
 will ask the team working in te UI revamp to do the same. Let me know if
 you have any questions/concerns.

>>>


Re: Standarizing the "Runner" concept across website content

2021-01-06 Thread Robert Bradshaw
+1 to keeping the distinction between Runner and Engine as Kenn described,
and cleaning up the site with these in mind (I don't think the term engine
is widely used yet).

On Wed, Jan 6, 2021 at 11:15 AM Yichi Zhang  wrote:

> I agree with what kenn said, in most cases I would refer to the term
> runner as the adapter for translating user's pipeline code into a job
> representation and submitting it to the execution engine. Though in some
> cases they may still be used interchangeably such as direct runner?
>
> On Wed, Jan 6, 2021 at 11:02 AM Kenneth Knowles  wrote:
>
>> I personally try to always distinguish two concepts: the thing doing the
>> computing (like Spark or Flink), and the adapter for running a Beam
>> pipeline (like SparkRunner or FlinkRunner). I use the term "runner" to mean
>> the adapter, and have been trying to use the term "engine" to refer to the
>> thing doing the computing. Do you think that users will use these two
>> interchangeably? Do you have recommendations about if these terms makes
>> sense to users?
>>
>> Kenn
>>
>> On Wed, Jan 6, 2021 at 10:23 AM Griselda Cuevas  wrote:
>>
>>> Hi dev@ community, Happy New Year!
>>>
>>> I'm working on updating the copy of a few website pages, and something
>>> that I want to solve is standardize how we refer to runners across the
>>> site. So far I've identified these definitions:
>>>
>>>- Back-end
>>>- Backend systems
>>>- Execution environments
>>>- Runtime
>>>- Runtime system
>>>- Runner
>>>
>>> Even when the majority of users will understand these concepts
>>> interchangeably, it's a good idea to be consistent so new users get
>>> familiar with how Beam works and its components.
>>>
>>> I'm going to start using the word "Runner" as I update the copy and will
>>> ask the team working in te UI revamp to do the same. Let me know if you
>>> have any questions/concerns.
>>>
>>


Re: Standarizing the "Runner" concept across website content

2021-01-06 Thread Yichi Zhang
I agree with what kenn said, in most cases I would refer to the term runner
as the adapter for translating user's pipeline code into a job
representation and submitting it to the execution engine. Though in some
cases they may still be used interchangeably such as direct runner?

On Wed, Jan 6, 2021 at 11:02 AM Kenneth Knowles  wrote:

> I personally try to always distinguish two concepts: the thing doing the
> computing (like Spark or Flink), and the adapter for running a Beam
> pipeline (like SparkRunner or FlinkRunner). I use the term "runner" to mean
> the adapter, and have been trying to use the term "engine" to refer to the
> thing doing the computing. Do you think that users will use these two
> interchangeably? Do you have recommendations about if these terms makes
> sense to users?
>
> Kenn
>
> On Wed, Jan 6, 2021 at 10:23 AM Griselda Cuevas  wrote:
>
>> Hi dev@ community, Happy New Year!
>>
>> I'm working on updating the copy of a few website pages, and something
>> that I want to solve is standardize how we refer to runners across the
>> site. So far I've identified these definitions:
>>
>>- Back-end
>>- Backend systems
>>- Execution environments
>>- Runtime
>>- Runtime system
>>- Runner
>>
>> Even when the majority of users will understand these concepts
>> interchangeably, it's a good idea to be consistent so new users get
>> familiar with how Beam works and its components.
>>
>> I'm going to start using the word "Runner" as I update the copy and will
>> ask the team working in te UI revamp to do the same. Let me know if you
>> have any questions/concerns.
>>
>


Re: Standarizing the "Runner" concept across website content

2021-01-06 Thread Vincent Marquez
+1 to distinguishing  between runners and engines(spark/flink/dataflow).
Those terms are clear and make sense to me.

*~Vincent*


On Wed, Jan 6, 2021 at 11:02 AM Kenneth Knowles  wrote:

> I personally try to always distinguish two concepts: the thing doing the
> computing (like Spark or Flink), and the adapter for running a Beam
> pipeline (like SparkRunner or FlinkRunner). I use the term "runner" to mean
> the adapter, and have been trying to use the term "engine" to refer to the
> thing doing the computing. Do you think that users will use these two
> interchangeably? Do you have recommendations about if these terms makes
> sense to users?
>
> Kenn
>
> On Wed, Jan 6, 2021 at 10:23 AM Griselda Cuevas  wrote:
>
>> Hi dev@ community, Happy New Year!
>>
>> I'm working on updating the copy of a few website pages, and something
>> that I want to solve is standardize how we refer to runners across the
>> site. So far I've identified these definitions:
>>
>>- Back-end
>>- Backend systems
>>- Execution environments
>>- Runtime
>>- Runtime system
>>- Runner
>>
>> Even when the majority of users will understand these concepts
>> interchangeably, it's a good idea to be consistent so new users get
>> familiar with how Beam works and its components.
>>
>> I'm going to start using the word "Runner" as I update the copy and will
>> ask the team working in te UI revamp to do the same. Let me know if you
>> have any questions/concerns.
>>
>


Re: Standarizing the "Runner" concept across website content

2021-01-06 Thread Kenneth Knowles
I personally try to always distinguish two concepts: the thing doing the
computing (like Spark or Flink), and the adapter for running a Beam
pipeline (like SparkRunner or FlinkRunner). I use the term "runner" to mean
the adapter, and have been trying to use the term "engine" to refer to the
thing doing the computing. Do you think that users will use these two
interchangeably? Do you have recommendations about if these terms makes
sense to users?

Kenn

On Wed, Jan 6, 2021 at 10:23 AM Griselda Cuevas  wrote:

> Hi dev@ community, Happy New Year!
>
> I'm working on updating the copy of a few website pages, and something
> that I want to solve is standardize how we refer to runners across the
> site. So far I've identified these definitions:
>
>- Back-end
>- Backend systems
>- Execution environments
>- Runtime
>- Runtime system
>- Runner
>
> Even when the majority of users will understand these concepts
> interchangeably, it's a good idea to be consistent so new users get
> familiar with how Beam works and its components.
>
> I'm going to start using the word "Runner" as I update the copy and will
> ask the team working in te UI revamp to do the same. Let me know if you
> have any questions/concerns.
>


Standarizing the "Runner" concept across website content

2021-01-06 Thread Griselda Cuevas
Hi dev@ community, Happy New Year!

I'm working on updating the copy of a few website pages, and something that
I want to solve is standardize how we refer to runners across the site. So
far I've identified these definitions:

   - Back-end
   - Backend systems
   - Execution environments
   - Runtime
   - Runtime system
   - Runner

Even when the majority of users will understand these concepts
interchangeably, it's a good idea to be consistent so new users get
familiar with how Beam works and its components.

I'm going to start using the word "Runner" as I update the copy and will
ask the team working in te UI revamp to do the same. Let me know if you
have any questions/concerns.


Re: Compatibility between Beam v2.23 and Beam v2.26

2021-01-06 Thread Kenneth Knowles
Agree with Boyuan & Kyle. That PR is the problem, and we probably do not
have adequate testing. We have a cultural understanding of not breaking
encoded data forms but this is the encoded form of the TypeSerializer, and
actually there are two problems.

1. When you have a serialized object that does not have the
serialVersionUid explicitly set, the UID is generated based on many details
that are irrelevant for binary compatibility. Any Java-serialized object
that is intended for anything other than transient transmission *must* have
a serialVersionUid set and an explicit serialized form. Else it is
completely normal for it to break due to irrelevant changes. The
serialVersionUid has no mechanism for upgrade/downgrade so you *must* keep
it the same forever, and any versioning or compat scheme exists within the
single serialVersionUid.
2. In this case there was an actual change to the fields of the object
stored, so you need to explicitly add the serialized form and also the
ability to read from prior serialized forms.

I believe explicitly setting the serialVersionUid to the original (and
keeping it that way forever) and adding the ability to decode prior forms
will regain the ability to read the snapshot. But also this seems like
something that would be part of Flink best practice documentation since
naive use of Java serialization often hits this problem.

Kenn

On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver  wrote:

> This raises a few related questions from me:
>
> 1. Do we claim to support resuming Flink checkpoints made with previous
> Beam versions?
> 2. Does 1. require full binary compatibility between different versions of
> runner internals like CoderTypeSerializer?
>
3. Do we have tests for 1.?
>

Kenn


> On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang  wrote:
>
>> https://github.com/apache/beam/pull/13240 seems suspicious to me.
>>
>>  +Maximilian Michels  Any insights here?
>>
>> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si  wrote:
>>
>>> Hi,
>>>
>>> I would like to followup with this question to see if there is a
>>> solution/workaround for this issue.
>>>
>>> Thanks.
>>>
>>> Antonio.
>>>
>>> On 2020/12/19 18:33:48, Antonio Si  wrote:
>>> > Hi,
>>> >
>>> > We were using Beam v2.23 and recently, we are testing upgrade to Beam
>>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
>>> --fasterCopy=true.
>>> >
>>> > We run into this exception when we resume our pipeline:
>>> >
>>> > Caused by: java.io.InvalidClassException:
>>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
>>> class incompatible: stream classdesc serialVersionUID =
>>> 5241803328188007316, local class serialVersionUID = 7247319138941746449
>>> >   at
>>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>>> >   at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>>> >   at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>>> >   at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>>> >   at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>> >   at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>>> >   at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>>> >   at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
>>> >   at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
>>> >   at
>>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
>>> >   at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>>> >   at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>>> >   at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>>> >   at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>>> >   at
>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>>> >   at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>>> >   at
>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
>>> >
>>> > It looks like it is not able to deserialize objects from our existi

Re: [Input needed] Capability Matrix Visual Redesign for extended version

2021-01-06 Thread Kenneth Knowles
Very good questions. Answers inline.

On Wed, Jan 6, 2021 at 8:16 AM Agnieszka Sell 
wrote:

> Hi Kenneth,
>
> Thank you for your feedback about the Capability Matrix! I have several
> questions about it:
>
> *Feedback: I think we can also remove rows that are not started or not 
> complete in the Beam Model, and remove the Beam Model column.*
> Question:  If we remove the Beam model column the whole point of making it 
> static and showing the capabilities would be lost. Isn't the point to show 
> capabilities of Beam vs. other tools?
>
>
To clarify the purpose of the capability matrix: it is not comparing Beam
vs other tools. It is comparing adapters that run a Beam pipeline on top of
other tools. For example the "Apache Spark" column describes the
capabilities of Beam's "SparkRunner", not Spark itself. Maybe we need to
adjust the wording above the matrix to make this clear.

So the column with the title "What is being computed?" is already a full
list of the features of the Beam Model. The rows where "Beam Model" has an
"X" or "~" are just ideas for future work, or features still in progress.

*Feedback: I think Splittable DoFn really just deserves one row for
bounded, one for unbounded, and any caveats go in the details.*
> Question: How would it look like? All this in one matrix or separate?
>
>
I suggest to add it as a row in "What is being computed?" like ParDo,
GroupByKey, ..., Stateful Processing, Splittable DoFn.


>
> *Feedback: All the windowing rows can be condensed into "Basic windowing 
> support" and "Merging windowing support" and any runner that can only run a 
> couple WindowFns can have details in the caveats. At this point any runner 
> that doesn't do Windowing by invoking a user's WindowFn simply doesn't really 
> support windowing in the model.*
> Suggestion: Do we still have a separate matrix for only two(?) rows?
>
>
My opinion may be controversial... I don't care that much about splitting
What/Where/When/How. Especially it is confusing to use "Where" to talk
about event time.

Personally, I would just make all the last three tables into a single table
"Windowing and Triggering" and the rows "Basic windowing support", "Merging
windowing support", "Configurable triggering", "Allowed lateness",
"Discarding mode", "Accumulating mode". I would remove Timers from that
table and rename "Stateful processing" in the table above to "State &
timers" since these are really one feature taken together.

Many of those decisions are not really part of the redesign, but just ideas
to save space. If you need more space savings, I can find more... for
example there is no value to ParDo, GroupByKey, and Flatten being separate,
really. If you don't have those all implemented, you don't have a  Beam
runner at all, so they will never be different. This could be omitted. Or
it could be a single "Baseline runner" row to add caveats. For example the
existing caveats are unnecessary: Spark has a caveat on GroupByKey that is
really about triggers. Structured streaming has "~" but the details are not
actually caveats.

Kenn


> Kind regards,
>
> Agnieszka
>
> On Mon, Dec 21, 2020 at 7:49 PM Griselda Cuevas  wrote:
>
>> Thanks Kenn, this is super helpful.
>>
>>
>>
>> On Mon, 21 Dec 2020 at 09:57, Kenneth Knowles  wrote:
>>
>>> For the capability matrix, part of the problem is that the rows don't
>>> all make that much sense, as we've discussed a couple times.
>>>
>>> But assuming we keep the content identical, maybe we could just have the
>>> collapsed view and make the table selectable where *just* the selected cell
>>> controls content below? You won't be able to do side-by-side comparisons of
>>> the full text of things, but you will be able to keep the overview and
>>> drill in one at a time quickly. Just one idea.
>>>
>>> A couple ways to save space without rearchitecting it:
>>>
>>>  - Apache Hadoop MapReduce and JStorm can be removed as they are on
>>> branches, not released.
>>>  - I think we can also remove rows that are not started or not complete
>>> in the Beam Model, and remove the Beam Model column.
>>>  - I think Splittable DoFn really just deserves one row for bounded, one
>>> for unbounded, and any caveats go in the details.
>>>  - All the windowing rows can be condensed into "Basic windowing
>>> support" and "Merging windowing support" and any runner that can only run a
>>> couple WindowFns can have details in the caveats. At this point any runner
>>> that doesn't do Windowing by invoking a user's WindowFn simply doesn't
>>> really support windowing in the model.
>>>  - "Configurable triggering" can absorb "Event-time triggers",
>>> "Processing-time triggers", "Count triggers", and "Composite triggers".
>>> Same. At this point any runner that doesn't support the whole triggering
>>> language doesn't really support triggers fully.
>>>
>>> Kenn
>>>
>>> On Mon, Dec 14, 2020 at 7:39 PM Griselda Cuevas  wrote:
>>>
 Hi folks, another page that's getting a refresh this time around is the
 C

Re: Making preview (sample) time consistent on Direct runner

2021-01-06 Thread Ismaël Mejía
> Those are good points. Do you know if the Interactive Runner has been tried 
> in those instances? If so, what were the shortcomings?

I am not aware of experiences or shortcomings with the Interactive
Runner. The issue is that the Interactive runner is based on python
and all the tools I mention above are Java-based so Python probably
won't be a valid alternative.

What is concerning for me is that in other similar systems (e.g.
Spark, Flink) a developer can consistently do a `.take(n)` read from a
data source and have results in constant time almost independently of
the size of the targeted data. This allows to  iterate faster and
improve the developer experience.

What is not clear for me yet is how we can achieve this in a clean
way, given all the 'wrappings' we already have in translation time. I
don't know if there could be a way to override some default
translation(s) to achieve this. Any ideas maybe?


On Tue, Jan 5, 2021 at 10:26 PM Sam Rohde  wrote:
>
> Hi Ismael,
>
> Those are good points. Do you know if the Interactive Runner has been tried 
> in those instances? If so, what were the shortcomings?
>
> I can also see the use of sampling for a performance benchmarking reason. We 
> have seen others send in known elements which are tracked throughout the 
> pipeline to generate timings for each transform/stage.
>
> -Sam
>
> On Fri, Dec 18, 2020 at 8:24 AM Ismaël Mejía  wrote:
>>
>> Hello,
>>
>> The use of direct runner for interactive local use cases has increased
>> with the years on Beam due to projects like Scio, Kettle/Hop and our
>> own SQL CLI. All these tools have in common one thing, they show a
>> sample of some source input to the user and interactively apply
>> transforms to it to help users build Pipelines more rapidly.
>>
>> If you build a pipeline today to produce this sample using the Beam’s
>> Sample transform from a set of files, the read of the files happens
>> first and then the sample, so the more files or the bigger they are
>> the longer it takes to produce the sample even if the number of
>> elements expected to read is constant.
>>
>> During Beam Summit last year there were some discussions about how we
>> could improve this scenario (and others) but I have the impression no
>> further discussions happened in the mailing list, so I wanted to know
>> if there are some ideas about how we can get direct runner to improve
>> this case.
>>
>> It seems to me that we can still ‘force’ the count with some static
>> field because it is not a distributed case but I don’t know how we can
>> stop reading once we have the number of sampled elements in a generic
>> way, specially now it seems to me a bit harder to do with pure DoFn
>> (SDF) APIs vs old Source ones, but well that’s just a guess.
>>
>> Does anyone have an idea of how could we generalize this and of course
>> if you see the value of such use case, other ideas for improvements?
>>
>> Regards,
>> Ismaël


Re: [Input needed] Capability Matrix Visual Redesign for extended version

2021-01-06 Thread Agnieszka Sell
Hi Kenneth,

Thank you for your feedback about the Capability Matrix! I have several
questions about it:

*Feedback: I think we can also remove rows that are not started or not
complete in the Beam Model, and remove the Beam Model column.*
Question:  If we remove the Beam model column the whole point of
making it static and showing the capabilities would be lost. Isn't the
point to show capabilities of Beam vs. other tools?

*Feedback: I think Splittable DoFn really just deserves one row for
bounded, one for unbounded, and any caveats go in the details.*
Question: How would it look like? All this in one matrix or separate?


*Feedback: All the windowing rows can be condensed into "Basic
windowing support" and "Merging windowing support" and any runner that
can only run a couple WindowFns can have details in the caveats. At
this point any runner that doesn't do Windowing by invoking a user's
WindowFn simply doesn't really support windowing in the model.*
Suggestion: Do we still have a separate matrix for only two(?) rows?


Kind regards,

Agnieszka

On Mon, Dec 21, 2020 at 7:49 PM Griselda Cuevas  wrote:

> Thanks Kenn, this is super helpful.
>
>
>
> On Mon, 21 Dec 2020 at 09:57, Kenneth Knowles  wrote:
>
>> For the capability matrix, part of the problem is that the rows don't all
>> make that much sense, as we've discussed a couple times.
>>
>> But assuming we keep the content identical, maybe we could just have the
>> collapsed view and make the table selectable where *just* the selected cell
>> controls content below? You won't be able to do side-by-side comparisons of
>> the full text of things, but you will be able to keep the overview and
>> drill in one at a time quickly. Just one idea.
>>
>> A couple ways to save space without rearchitecting it:
>>
>>  - Apache Hadoop MapReduce and JStorm can be removed as they are on
>> branches, not released.
>>  - I think we can also remove rows that are not started or not complete
>> in the Beam Model, and remove the Beam Model column.
>>  - I think Splittable DoFn really just deserves one row for bounded, one
>> for unbounded, and any caveats go in the details.
>>  - All the windowing rows can be condensed into "Basic windowing support"
>> and "Merging windowing support" and any runner that can only run a couple
>> WindowFns can have details in the caveats. At this point any runner that
>> doesn't do Windowing by invoking a user's WindowFn simply doesn't really
>> support windowing in the model.
>>  - "Configurable triggering" can absorb "Event-time triggers",
>> "Processing-time triggers", "Count triggers", and "Composite triggers".
>> Same. At this point any runner that doesn't support the whole triggering
>> language doesn't really support triggers fully.
>>
>> Kenn
>>
>> On Mon, Dec 14, 2020 at 7:39 PM Griselda Cuevas  wrote:
>>
>>> Hi folks, another page that's getting a refresh this time around is the
>>> Capability Matrix, which is one of the most critical pages for users as
>>> they evaluate the current support for each of the Beam runners.
>>>
>>> The situation we'd like to get your input on is: How do we optimize the
>>> expanded version of the capability matrix, which explains the level of
>>> support in each of the functions?
>>>
>>> Right now the text gets in the way of analyzing the table and makes
>>> reading hard. You can see a screenshot in the Beam wiki here [1], the file
>>> is titled current_CapMatExt.
>>>
>>> One of the proposed solutions is that after clicking the link "(click to
>>> expand details)", we load a new page that has the corresponding table to
>>> the click (what, where, when, how) at the top, and all the content of each
>>> runner/function gets displayed at the bottom of the page, the file with the
>>> proposed design is also in the Beam wiki here [1] and the file's name is
>>> proposed_CapMatExt. This solution isn't perfect either, since we'd need to
>>> move too much text under the table and reading isn't much easier.
>>>
>>> Do you have suggestions/ideas in how to condense the extended version?
>>>
>>> Share with us your feedback through this week,
>>> Thanks!
>>> G
>>>
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/BEAM/Website+Redesign+Files
>>>
>>

-- 

Agnieszka Sell
Polidea  | Project Manager

M: *+48 504 901 334* <+48504901334>
E: agnieszka.s...@polidea.com
[image: Polidea] 

Check out our projects! 
[image: Github]  [image: Facebook]
 [image: Twitter]
 [image: Linkedin]
 [image: Instagram]


Unique Tech
Check out our projects! 


Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2021-01-06 Thread Jan Lukavský

Sorry for the typo in your name. :-)

On 1/6/21 10:11 AM, Jan Lukavský wrote:

Hi Antonie,

yes, for instance. I'd just like to rule out possibility that a single 
DoFn processing multiple partitions (restrictions) brings some 
overhead in your case.


Jan

On 12/31/20 10:36 PM, Antonio Si wrote:

Hi Jan,

Sorry for the late reply. My topic has 180 partitions. Do you mean 
run with a

parallelism set to 900?

Thanks.

Antonio.

On 2020/12/23 20:30:34, Jan Lukavský  wrote:

OK,

could you make an experiment and increase the parallelism to something
significantly higher than the total number of partitions? Say 5 times
higher? Would that have impact on throughput in your case?

Jan

On 12/23/20 7:03 PM, Antonio Si wrote:

Hi Jan,

The performance data that I reported was run with parallelism = 8. 
We also ran with parallelism = 15 and we observed similar behaviors 
although I don't have the exact numbers. I can get you the numbers 
if needed.


Regarding number of partitions, since we have multiple topics, the 
number of partitions varies from 180 to 12. The highest TPS topic 
has 180 partitions, while the lowest TPS topic has 12 partitions.


Thanks.

Antonio.

On 2020/12/23 12:28:42, Jan Lukavský  wrote:

Hi Antonio,

can you please clarify a few things:

    a) what parallelism you use for your sources

    b) how many partitions there is in your topic(s)

Thanks,

    Jan

On 12/22/20 10:07 PM, Antonio Si wrote:

Hi Boyuan,

Let me clarify, I have tried with and without using 
--experiments=beam_fn_api,use_sdf_kafka_read option:


-  with --experiments=use_deprecated_read --fasterrCopy=true, I 
am able to achieve 13K TPS
-  with --experiments="beam_fn_api,use_sdf_kafka_read" 
--fasterCopy=true, I am able to achieve 10K

-  with --fasterCopy=true alone, I am only able to achieve 5K TPS

In our testcase, we have multiple topics, checkpoint intervals is 
60s. Some topics have a lot higher traffics than others. We look 
at the case with --experiments="beam_fn_api,use_sdf_kafka_read" 
--fasterCopy=true options a little. Based on our observation, 
each consumer poll() in ReadFromKafkaDoFn.processElement() takes 
about 0.8ms. So for topic with high traffics, it will continue in 
the loop because every poll() will return some records. Every 
poll returns about 200 records. So, it takes about 0.8ms for 
every 200 records. I am not sure if that is part of the reason 
for the performance.


Thanks.

Antonio.

On 2020/12/21 19:03:19, Boyuan Zhang  wrote:

Hi Antonio,

Thanks for the data point. That's very valuable information!

I didn't use DirectRunner. I am using FlinkRunner.
We measured the number of Kafka messages that we can processed 
per second.

With Beam v2.26 with --experiments=use_deprecated_read and
--fasterCopy=true,
we are able to consume 13K messages per second, but with Beam 
v2.26
without the use_deprecated_read option, we are only able to 
process 10K

messages
per second for the same pipeline.
We do have SDF implementation of Kafka Read instead of using the 
wrapper.
Would you like to have a try to see whether it helps you improve 
your
situation?  You can use 
--experiments=beam_fn_api,use_sdf_kafka_read to

switch to the Kafka SDF Read.

On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang 
 wrote:



Hi Jan,
it seems that what we would want is to couple the lifecycle of 
the Reader

not with the restriction but with the particular instance of
(Un)boundedSource (after being split). That could be done in 
the processing
DoFn, if it contained a cache mapping instance of the source 
to the
(possibly null - i.e. not yet open) reader. In @NewTracker we 
could assign
(or create) the reader to the tracker, as the tracker is 
created for each

restriction.

WDYT?

I was thinking about this but it seems like it is not 
applicable to the

way how UnboundedSource and UnboundedReader work together.
Please correct me if I'm wrong. The UnboundedReader is created 
from
UnboundedSource per CheckpointMark[1], which means for certain 
sources, the
CheckpointMark could affect some attributes like start position 
of the
reader when resuming. So a single UnboundedSource could be 
mapped to
multiple readers because of different instances of 
CheckpointMarl. That's

also the reason why we use CheckpointMark as the restriction.

Please let me know if I misunderstand your suggestion.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78 



On Mon, Dec 21, 2020 at 9:18 AM Antonio Si 
 wrote:



Hi Boyuan,

Sorry for my late reply. I was off for a few days.

I didn't use DirectRunner. I am using FlinkRunner.

We measured the number of Kafka messages that we can processed 
per second.

With Beam v2.26 with --experiments=use_deprecated_read and
--fasterCopy=true,
we are able to consume 13K messages per second, but with Beam 
v2.26
without the use_deprecated_read option, we are only able to 
process 10K

messages
per second for the same pipeline.

Thanks a

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2021-01-06 Thread Jan Lukavský

Hi Antonie,

yes, for instance. I'd just like to rule out possibility that a single 
DoFn processing multiple partitions (restrictions) brings some overhead 
in your case.


Jan

On 12/31/20 10:36 PM, Antonio Si wrote:

Hi Jan,

Sorry for the late reply. My topic has 180 partitions. Do you mean run with a
parallelism set to 900?

Thanks.

Antonio.

On 2020/12/23 20:30:34, Jan Lukavský  wrote:

OK,

could you make an experiment and increase the parallelism to something
significantly higher than the total number of partitions? Say 5 times
higher? Would that have impact on throughput in your case?

Jan

On 12/23/20 7:03 PM, Antonio Si wrote:

Hi Jan,

The performance data that I reported was run with parallelism = 8. We also ran 
with parallelism = 15 and we observed similar behaviors although I don't have 
the exact numbers. I can get you the numbers if needed.

Regarding number of partitions, since we have multiple topics, the number of 
partitions varies from 180 to 12. The highest TPS topic has 180 partitions, 
while the lowest TPS topic has 12 partitions.

Thanks.

Antonio.

On 2020/12/23 12:28:42, Jan Lukavský  wrote:

Hi Antonio,

can you please clarify a few things:

    a) what parallelism you use for your sources

    b) how many partitions there is in your topic(s)

Thanks,

    Jan

On 12/22/20 10:07 PM, Antonio Si wrote:

Hi Boyuan,

Let me clarify, I have tried with and without using 
--experiments=beam_fn_api,use_sdf_kafka_read option:

-  with --experiments=use_deprecated_read --fasterrCopy=true, I am able to 
achieve 13K TPS
-  with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true, I am 
able to achieve 10K
-  with --fasterCopy=true alone, I am only able to achieve 5K TPS

In our testcase, we have multiple topics, checkpoint intervals is 60s. Some topics have a 
lot higher traffics than others. We look at the case with 
--experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true options a 
little. Based on our observation, each consumer poll() in 
ReadFromKafkaDoFn.processElement() takes about 0.8ms. So for topic with high traffics, it 
will continue in the loop because every poll() will return some records. Every poll 
returns about 200 records. So, it takes about 0.8ms for every 200 records. I am not sure 
if that is part of the reason for the performance.

Thanks.

Antonio.

On 2020/12/21 19:03:19, Boyuan Zhang  wrote:

Hi Antonio,

Thanks for the data point. That's very valuable information!

I didn't use DirectRunner. I am using FlinkRunner.

We measured the number of Kafka messages that we can processed per second.
With Beam v2.26 with --experiments=use_deprecated_read and
--fasterCopy=true,
we are able to consume 13K messages per second, but with Beam v2.26
without the use_deprecated_read option, we are only able to process 10K
messages
per second for the same pipeline.

We do have SDF implementation of Kafka Read instead of using the wrapper.
Would you like to have a try to see whether it helps you improve your
situation?  You can use --experiments=beam_fn_api,use_sdf_kafka_read to
switch to the Kafka SDF Read.

On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang  wrote:


Hi Jan,

it seems that what we would want is to couple the lifecycle of the Reader
not with the restriction but with the particular instance of
(Un)boundedSource (after being split). That could be done in the processing
DoFn, if it contained a cache mapping instance of the source to the
(possibly null - i.e. not yet open) reader. In @NewTracker we could assign
(or create) the reader to the tracker, as the tracker is created for each
restriction.

WDYT?


I was thinking about this but it seems like it is not applicable to the
way how UnboundedSource and UnboundedReader work together.
Please correct me if I'm wrong. The UnboundedReader is created from
UnboundedSource per CheckpointMark[1], which means for certain sources, the
CheckpointMark could affect some attributes like start position of the
reader when resuming. So a single UnboundedSource could be mapped to
multiple readers because of different instances of CheckpointMarl. That's
also the reason why we use CheckpointMark as the restriction.

Please let me know if I misunderstand your suggestion.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78

On Mon, Dec 21, 2020 at 9:18 AM Antonio Si  wrote:


Hi Boyuan,

Sorry for my late reply. I was off for a few days.

I didn't use DirectRunner. I am using FlinkRunner.

We measured the number of Kafka messages that we can processed per second.
With Beam v2.26 with --experiments=use_deprecated_read and
--fasterCopy=true,
we are able to consume 13K messages per second, but with Beam v2.26
without the use_deprecated_read option, we are only able to process 10K
messages
per second for the same pipeline.

Thanks and regards,

Antonio.

On 2020/12/11 22:19:40, Boyuan Zhang  wrote:

Hi Antonio,

Thanks for the details! Which version o