[VOTE] Release 2.27.0, release candidate #4

2021-01-05 Thread Pablo Estrada
Hi everyone,
Please review and vote on the release candidate #4 for the version 2.27.0,
as follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

*NOTE*. What happened to RC #2? I started building RC2 before completing
all the cherry-picks, so the tag for RC2 was created on an incorrect commit.

*NOTE*. What happened to RC #3? I started building RC3, but a new bug was
discovered (BEAM-11569) that required amending the branch. Thus this is now
RC4.

Reviewers are encouraged to test their own use cases with the release
candidate, and vote +1
 if no issues are found.

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release to be deployed to dist.apache.org [2],
which is signed with the key with fingerprint
C79DDD47DAF3808F0B9DDFAC02B2D9F742008494 [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag "v2.27.0-RC4" [5],
* website pull request listing the release [6], publishing the API
reference manual [7], and the blog post [8].
* Python artifacts are deployed along with the source release to the
dist.apache.org [2].
* Validation sheet with a tab for 2.27.0 release to help with validation
[9].
* Docker images published to Docker Hub [10].

The vote will be open for at least 72 hours, but given the holidays, we
will likely extend for a few more days. The release will be adopted by
majority approval, with at least 3 PMC affirmative votes.

Thanks,
-P.

[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12349380

[2] https://dist.apache.org/repos/dist/dev/beam/2.27.0/
[3] https://dist.apache.org/repos/dist/release/beam/KEYS
[4] https://repository.apache.org/content/repositories/orgapachebeam-1149/
[5] https://github.com/apache/beam/tree/v2.27.0-RC4
[6] https://github.com/apache/beam/pull/13602
[7] https://github.com/apache/beam-site/pull/610
[8] https://github.com/apache/beam/pull/13603
[9]
https://docs.google.com/spreadsheets/d/1qk-N5vjXvbcEk68GjbkSZTR8AGqyNUM-oLFo_ZXBpJw/edit#gid=194829106

[10] https://hub.docker.com/search?q=apache%2Fbeam&type=image


Re: Compatibility between Beam v2.23 and Beam v2.26

2021-01-05 Thread Kyle Weaver
This raises a few related questions from me:

1. Do we claim to support resuming Flink checkpoints made with previous
Beam versions?
2. Does 1. require full binary compatibility between different versions of
runner internals like CoderTypeSerializer?
3. Do we have tests for 1.?

On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang  wrote:

> https://github.com/apache/beam/pull/13240 seems suspicious to me.
>
>  +Maximilian Michels  Any insights here?
>
> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si  wrote:
>
>> Hi,
>>
>> I would like to followup with this question to see if there is a
>> solution/workaround for this issue.
>>
>> Thanks.
>>
>> Antonio.
>>
>> On 2020/12/19 18:33:48, Antonio Si  wrote:
>> > Hi,
>> >
>> > We were using Beam v2.23 and recently, we are testing upgrade to Beam
>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
>> --fasterCopy=true.
>> >
>> > We run into this exception when we resume our pipeline:
>> >
>> > Caused by: java.io.InvalidClassException:
>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
>> class incompatible: stream classdesc serialVersionUID =
>> 5241803328188007316, local class serialVersionUID = 7247319138941746449
>> >   at
>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>> >   at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>> >   at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>> >   at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>> >   at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>> >   at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>> >   at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>> >   at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
>> >   at
>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
>> >   at
>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
>> >   at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>> >   at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>> >   at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>> >   at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>> >   at
>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>> >   at
>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>> >   at
>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
>> >
>> > It looks like it is not able to deserialize objects from our existing
>> checkpoints. Is there any way we could resume our v2.23 checkpoints by
>> v2.26?
>> >
>> > Thanks for any suggestions.
>> >
>> > Antonio.
>> >
>>
>


Re: Compatibility between Beam v2.23 and Beam v2.26

2021-01-05 Thread Boyuan Zhang
https://github.com/apache/beam/pull/13240 seems suspicious to me.

 +Maximilian Michels  Any insights here?

On Tue, Jan 5, 2021 at 8:48 AM Antonio Si  wrote:

> Hi,
>
> I would like to followup with this question to see if there is a
> solution/workaround for this issue.
>
> Thanks.
>
> Antonio.
>
> On 2020/12/19 18:33:48, Antonio Si  wrote:
> > Hi,
> >
> > We were using Beam v2.23 and recently, we are testing upgrade to Beam
> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
> --fasterCopy=true.
> >
> > We run into this exception when we resume our pipeline:
> >
> > Caused by: java.io.InvalidClassException:
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
> class incompatible: stream classdesc serialVersionUID =
> 5241803328188007316, local class serialVersionUID = 7247319138941746449
> >   at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> >   at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
> >   at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
> >   at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
> >   at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
> >   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
> >   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> >   at
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
> >   at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
> >   at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
> >
> > It looks like it is not able to deserialize objects from our existing
> checkpoints. Is there any way we could resume our v2.23 checkpoints by
> v2.26?
> >
> > Thanks for any suggestions.
> >
> > Antonio.
> >
>


Re: Using dataflow from a notebook via Interactive Runner

2021-01-05 Thread Ning Kang
Hi Sayan,

1. It's not yet officially supported to use the DataflowRunner as the
underlying runner with InteractiveRunner (It's possible to set up GCS
buckets with the underlying source recording and PCollection cache
mechanism to work with the DataflowRunner, but it's not recommended).
You can use the default DirectRunner with a sample of data when creating
the pipeline, then run the pipeline with a DataflowRunner using the full
set of data.

2. Beam Dataframes
 has been
announced.
You should be able to use the Dataframe APIs and convert them to
PCollections with `from apache_beam.dataframe.convert import to_pcollection`

On Tue, Jan 5, 2021 at 8:49 AM Sayan Sanyal  wrote:

> Hello team,
>
> As a user of pyspark, I've been following along the development of Apache
> Beam with some interest. My interest was specifically piqued when I saw the
> investment in the Dataframe AP, as well as the Notebook based Interactive
> Runner.
>
> I had a few questions that I would love to understand better, so any
> pointers would be appreciated.
>
> 1. For the interactive runner
> ,
> while the default is direct runner, are we able to use Dataflow here
> instead? I ask because I would love to process large amounts of data that
> won't fit on my notebook's machine interactively and then inspect it.
> Specifically, I'm trying to replicate this functionality from spark in beam:
>
> # read some data from GCS that won't fit in memory
> df = spark.read.parquet(...)
>
> # groupby and summarize data, shuffle is distributed, because otherwise
> Notebook machine would OOM
> result_df = df.groupby(...).agg(...)
>
> # We interactively inspect a random sample of rows from the dataframe,
> need not be in order
> result_df.show(...)
>
> 2. Are there any close demo notebooks planned between the Interactive
> Runner and the Dataframe API? I ask this more leadingly, as I hope that
> give the large number of interactive notebook users out there who primarily
> deal in dataframes, this would be a natural audience for you to market the
> APIs to.
>
> I appreciate any discussion and thoughts.
>
> Thanks,
> Sayan
>
> --
>
> Sayan Sanyal
>
> Data Scientist on Notifications
>
>
>


Re: Using dataflow from a notebook via Interactive Runner

2021-01-05 Thread Ning Kang
Sorry I might have misunderstood the second topic. The idea is that
Dataframe API "should" naturally work with the InteractiveRunner inside a
notebook by converting the data set between DefferedDataframe and
PCollection.
Still there is additional work needed to fully support that. Like how the
pipeline is defined and used in the REPL environment? Are all the APIs and
transforms supported?
We'll have these notebooks out once those work are done.

On Tue, Jan 5, 2021 at 1:55 PM Ning Kang  wrote:

> Hi Sayan,
>
> 1. It's not yet officially supported to use the DataflowRunner as the
> underlying runner with InteractiveRunner (It's possible to set up GCS
> buckets with the underlying source recording and PCollection cache
> mechanism to work with the DataflowRunner, but it's not recommended).
> You can use the default DirectRunner with a sample of data when creating
> the pipeline, then run the pipeline with a DataflowRunner using the full
> set of data.
>
> 2. Beam Dataframes
>  has
> been announced.
> You should be able to use the Dataframe APIs and convert them to
> PCollections with `from apache_beam.dataframe.convert import
> to_pcollection`
>
> On Tue, Jan 5, 2021 at 8:49 AM Sayan Sanyal  wrote:
>
>> Hello team,
>>
>> As a user of pyspark, I've been following along the development of Apache
>> Beam with some interest. My interest was specifically piqued when I saw the
>> investment in the Dataframe AP, as well as the Notebook based Interactive
>> Runner.
>>
>> I had a few questions that I would love to understand better, so any
>> pointers would be appreciated.
>>
>> 1. For the interactive runner
>> ,
>> while the default is direct runner, are we able to use Dataflow here
>> instead? I ask because I would love to process large amounts of data that
>> won't fit on my notebook's machine interactively and then inspect it.
>> Specifically, I'm trying to replicate this functionality from spark in beam:
>>
>> # read some data from GCS that won't fit in memory
>> df = spark.read.parquet(...)
>>
>> # groupby and summarize data, shuffle is distributed, because otherwise
>> Notebook machine would OOM
>> result_df = df.groupby(...).agg(...)
>>
>> # We interactively inspect a random sample of rows from the dataframe,
>> need not be in order
>> result_df.show(...)
>>
>> 2. Are there any close demo notebooks planned between the Interactive
>> Runner and the Dataframe API? I ask this more leadingly, as I hope that
>> give the large number of interactive notebook users out there who primarily
>> deal in dataframes, this would be a natural audience for you to market the
>> APIs to.
>>
>> I appreciate any discussion and thoughts.
>>
>> Thanks,
>> Sayan
>>
>> --
>>
>> Sayan Sanyal
>>
>> Data Scientist on Notifications
>>
>>
>>


Re: Making preview (sample) time consistent on Direct runner

2021-01-05 Thread Sam Rohde
Hi Ismael,

Those are good points. Do you know if the Interactive Runner has been tried
in those instances? If so, what were the shortcomings?

I can also see the use of sampling for a performance benchmarking reason.
We have seen others send in known elements which are tracked throughout the
pipeline to generate timings for each transform/stage.

-Sam

On Fri, Dec 18, 2020 at 8:24 AM Ismaël Mejía  wrote:

> Hello,
>
> The use of direct runner for interactive local use cases has increased
> with the years on Beam due to projects like Scio, Kettle/Hop and our
> own SQL CLI. All these tools have in common one thing, they show a
> sample of some source input to the user and interactively apply
> transforms to it to help users build Pipelines more rapidly.
>
> If you build a pipeline today to produce this sample using the Beam’s
> Sample transform from a set of files, the read of the files happens
> first and then the sample, so the more files or the bigger they are
> the longer it takes to produce the sample even if the number of
> elements expected to read is constant.
>
> During Beam Summit last year there were some discussions about how we
> could improve this scenario (and others) but I have the impression no
> further discussions happened in the mailing list, so I wanted to know
> if there are some ideas about how we can get direct runner to improve
> this case.
>
> It seems to me that we can still ‘force’ the count with some static
> field because it is not a distributed case but I don’t know how we can
> stop reading once we have the number of sampled elements in a generic
> way, specially now it seems to me a bit harder to do with pure DoFn
> (SDF) APIs vs old Source ones, but well that’s just a guess.
>
> Does anyone have an idea of how could we generalize this and of course
> if you see the value of such use case, other ideas for improvements?
>
> Regards,
> Ismaël
>


Re: Contributor permission for Beam Jira tickets

2021-01-05 Thread Ahmet Altay
I added you as a contributor. You can self assign those JIRAs to yourself.
And welcome.

On Sat, Jan 2, 2021 at 7:10 AM Nir Gazit  wrote:

> Hey,
>
> I'm Nir, platform architect from Fiverr, working on integrating Beam into
> our streaming pipelines at the company. I have several features in mind
> that might be useful for us (specifically BEAM-11525
>  for
> supporting MemSQL I/O and BEAM-2466
>  for
> supporting Kafka streams runner) so I wonder if someone can add me as a
> contributor on JIRA.
>
> I'd also would love to hear if anyone's currently working on the Kafka
> streams runner (I saw Kyle Winkelman used to but hasn't been in a while) as
> I'd love to assist in getting this out; and also your opinions about MemSQL
> I/O. It's kind of a niche DB however our data science group uses it
> heavily.
>
> Thanks!
> Nir
>


Using dataflow from a notebook via Interactive Runner

2021-01-05 Thread Sayan Sanyal
Hello team,

As a user of pyspark, I've been following along the development of Apache
Beam with some interest. My interest was specifically piqued when I saw the
investment in the Dataframe AP, as well as the Notebook based Interactive
Runner.

I had a few questions that I would love to understand better, so any
pointers would be appreciated.

1. For the interactive runner
,
while the default is direct runner, are we able to use Dataflow here
instead? I ask because I would love to process large amounts of data that
won't fit on my notebook's machine interactively and then inspect it.
Specifically, I'm trying to replicate this functionality from spark in beam:

# read some data from GCS that won't fit in memory
df = spark.read.parquet(...)

# groupby and summarize data, shuffle is distributed, because otherwise
Notebook machine would OOM
result_df = df.groupby(...).agg(...)

# We interactively inspect a random sample of rows from the dataframe, need
not be in order
result_df.show(...)

2. Are there any close demo notebooks planned between the Interactive
Runner and the Dataframe API? I ask this more leadingly, as I hope that
give the large number of interactive notebook users out there who primarily
deal in dataframes, this would be a natural audience for you to market the
APIs to.

I appreciate any discussion and thoughts.

Thanks,
Sayan

-- 

Sayan Sanyal

Data Scientist on Notifications


Re: Compatibility between Beam v2.23 and Beam v2.26

2021-01-05 Thread Antonio Si
Hi,

I would like to followup with this question to see if there is a 
solution/workaround for this issue.

Thanks.

Antonio.

On 2020/12/19 18:33:48, Antonio Si  wrote: 
> Hi,
> 
> We were using Beam v2.23 and recently, we are testing upgrade to Beam v2.26. 
> For Beam v2.26, we are passing --experiments=use_deprecated_read and 
> --fasterCopy=true.
> 
> We run into this exception when we resume our pipeline:
> 
> Caused by: java.io.InvalidClassException: 
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local 
> class incompatible: stream classdesc serialVersionUID = 5241803328188007316, 
> local class serialVersionUID = 7247319138941746449
>   at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>   at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>   at 
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
> 
> It looks like it is not able to deserialize objects from our existing 
> checkpoints. Is there any way we could resume our v2.23 checkpoints by v2.26?
> 
> Thanks for any suggestions.
> 
> Antonio.
> 


Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2021-01-05 Thread Antonio Si
Hi Jan,

Sorry for the late reply. My topic has 180 partitions. Do you mean run with a
parallelism set to 900?

Thanks.

Antonio.

On 2020/12/23 20:30:34, Jan Lukavský  wrote: 
> OK,
> 
> could you make an experiment and increase the parallelism to something 
> significantly higher than the total number of partitions? Say 5 times 
> higher? Would that have impact on throughput in your case?
> 
> Jan
> 
> On 12/23/20 7:03 PM, Antonio Si wrote:
> > Hi Jan,
> >
> > The performance data that I reported was run with parallelism = 8. We also 
> > ran with parallelism = 15 and we observed similar behaviors although I 
> > don't have the exact numbers. I can get you the numbers if needed.
> >
> > Regarding number of partitions, since we have multiple topics, the number 
> > of partitions varies from 180 to 12. The highest TPS topic has 180 
> > partitions, while the lowest TPS topic has 12 partitions.
> >
> > Thanks.
> >
> > Antonio.
> >
> > On 2020/12/23 12:28:42, Jan Lukavský  wrote:
> >> Hi Antonio,
> >>
> >> can you please clarify a few things:
> >>
> >>    a) what parallelism you use for your sources
> >>
> >>    b) how many partitions there is in your topic(s)
> >>
> >> Thanks,
> >>
> >>    Jan
> >>
> >> On 12/22/20 10:07 PM, Antonio Si wrote:
> >>> Hi Boyuan,
> >>>
> >>> Let me clarify, I have tried with and without using 
> >>> --experiments=beam_fn_api,use_sdf_kafka_read option:
> >>>
> >>> -  with --experiments=use_deprecated_read --fasterrCopy=true, I am able 
> >>> to achieve 13K TPS
> >>> -  with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true, 
> >>> I am able to achieve 10K
> >>> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS
> >>>
> >>> In our testcase, we have multiple topics, checkpoint intervals is 60s. 
> >>> Some topics have a lot higher traffics than others. We look at the case 
> >>> with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true 
> >>> options a little. Based on our observation, each consumer poll() in 
> >>> ReadFromKafkaDoFn.processElement() takes about 0.8ms. So for topic with 
> >>> high traffics, it will continue in the loop because every poll() will 
> >>> return some records. Every poll returns about 200 records. So, it takes 
> >>> about 0.8ms for every 200 records. I am not sure if that is part of the 
> >>> reason for the performance.
> >>>
> >>> Thanks.
> >>>
> >>> Antonio.
> >>>
> >>> On 2020/12/21 19:03:19, Boyuan Zhang  wrote:
>  Hi Antonio,
> 
>  Thanks for the data point. That's very valuable information!
> 
>  I didn't use DirectRunner. I am using FlinkRunner.
> > We measured the number of Kafka messages that we can processed per 
> > second.
> > With Beam v2.26 with --experiments=use_deprecated_read and
> > --fasterCopy=true,
> > we are able to consume 13K messages per second, but with Beam v2.26
> > without the use_deprecated_read option, we are only able to process 10K
> > messages
> > per second for the same pipeline.
>  We do have SDF implementation of Kafka Read instead of using the wrapper.
>  Would you like to have a try to see whether it helps you improve your
>  situation?  You can use --experiments=beam_fn_api,use_sdf_kafka_read to
>  switch to the Kafka SDF Read.
> 
>  On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang  wrote:
> 
> > Hi Jan,
> >> it seems that what we would want is to couple the lifecycle of the 
> >> Reader
> >> not with the restriction but with the particular instance of
> >> (Un)boundedSource (after being split). That could be done in the 
> >> processing
> >> DoFn, if it contained a cache mapping instance of the source to the
> >> (possibly null - i.e. not yet open) reader. In @NewTracker we could 
> >> assign
> >> (or create) the reader to the tracker, as the tracker is created for 
> >> each
> >> restriction.
> >>
> >> WDYT?
> >>
> > I was thinking about this but it seems like it is not applicable to the
> > way how UnboundedSource and UnboundedReader work together.
> > Please correct me if I'm wrong. The UnboundedReader is created from
> > UnboundedSource per CheckpointMark[1], which means for certain sources, 
> > the
> > CheckpointMark could affect some attributes like start position of the
> > reader when resuming. So a single UnboundedSource could be mapped to
> > multiple readers because of different instances of CheckpointMarl. 
> > That's
> > also the reason why we use CheckpointMark as the restriction.
> >
> > Please let me know if I misunderstand your suggestion.
> >
> > [1]
> > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
> >
> > On Mon, Dec 21, 2020 at 9:18 AM Antonio Si  wrote:
> >
> >> Hi Boyuan,
> >>
> >> Sorry for my late reply. I was off for a few days.
> >>
> >> I

Re: Combine with multiple outputs case Sample and the rest

2021-01-05 Thread Kenneth Knowles
Perhaps something based on stateful DoFn so there is a simple decision
point at which each element is either sampled or not so it can be output to
one PCollection or the other. Without doing a little research, I don't
recall if this is doable in the way you need.

Kenn

On Wed, Dec 23, 2020 at 3:12 PM Ismaël Mejía  wrote:

> Thanks for the answer Robert. Producing a combiner with two lists as
> outputs was one idea I was considering too but I was afraid of
> OutOfMemory issues. I had not thought much about the consequences on
> combining state, thanks for pointing that. For the particular sampling
> use case it might be not an issue, or am I missing something?
>
> I am still curious if for Sampling there could be another approach to
> achieve the same goal of producing the same result (uniform sample +
> the rest) but without the issues of combining.
>
> On Mon, Dec 21, 2020 at 7:23 PM Robert Bradshaw 
> wrote:
> >
> > There are two ways to emit multiple outputs: either to multiple distinct
> PCollections (e.g. withOutputTags) or multiple (including 0) outputs to a
> single PCollection (the difference between Map and FlatMap). In full
> generality, one can always have a CombineFn that outputs lists (say  result>*) followed by a DoFn that emits to multiple places based on this
> result.
> >
> > One other cons of emitting multiple values from a CombineFn is that they
> are used in other contexts as well, e.g. combining state, and trying to
> make sense of a multi-outputting CombineFn in that context is trickier.
> >
> > Note that for Sample in particular, it works as a CombineFn because we
> throw most of the data away. If we kept most of the data, it likely
> wouldn't fit into one machine to do the final sampling. The idea of using a
> side input to filter after the fact should work well (unless there's
> duplicate elements, in which case you'd have to uniquify them somehow to
> filter out only the "right" copies).
> >
> > - Robert
> >
> >
> >
> > On Fri, Dec 18, 2020 at 8:20 AM Ismaël Mejía  wrote:
> >>
> >> I had a question today from one of our users about Beam’s Sample
> >> transform (a Combine with an internal top-like function to produce a
> >> uniform sample of size n of a PCollection). They wanted to obtain also
> >> the rest of the PCollection as an output (the non sampled elements).
> >>
> >> My suggestion was to use the sample (since it was little) as a side
> >> input and then reprocess the collection to filter its elements,
> >> however I wonder if this is the ‘best’ solution.
> >>
> >> I was thinking also if Combine is essentially GbK + ParDo why we don’t
> >> have a Combine function with multiple outputs (maybe an evolution of
> >> CombineWithContext). I know this sounds weird and I have probably not
> >> thought much about issues or the performance of the translation but I
> >> wanted to see what others thought, does this make sense, do you see
> >> some pros/cons or other ideas.
> >>
> >> Thanks,
> >> Ismaël
>