Full-time Job Opportunity - Canada

2024-10-03 Thread Evan Galpin
Hi folks!

My name is Evan Galpin. I'm a long time user of, and committer to, Apache
Beam.  I currently have the opportunity to hire for a role[1] where Beam is
a key component of the day-to-day work, alongside other OSS technologies,
with particular focus on low-latency streaming use cases. This role is a
marriage between Data Platform Engineering and Production Engineering where
our data-customers are end-users, which brings all kinds of compelling
constraints and challenges. As a team, we enjoy being both users and
builders of the OSS technologies we employ, and care deeply about honing
our craft as Software Engineers.

If this sounds like you, and you are based anywhere in *Canada*, I strongly
encourage you to apply!

Special thanks to the PMC for their approval to share this opportunity with
the community!

[1]
https://www.dialpad.com/careers/open-opportunities/apply/?id=7514666002&location=Vancouver-Canada&title=Software-Engineer-Data-Platform

- Evan


Re: Environmental variables not accessible in Dataflow pipeline

2023-12-22 Thread Evan Galpin
I assume from the previous messages that GCP Dataflow is being used as the
pipeline runner.  Even without Flex Templates, the v2 runner can use docker
containers to install all dependencies from various sources[1].  I have
used docker containers to solve the same problem you mention: installing a
python dependency from a private package repository.  The process is
roughly:


   1. Build a docker container from the apache beam base images,
   customizing as you need[2]
   2. Tag and push that image to Google Container Registry
   3. When you deploy your Dataflow job, include the options
   "--experiment=use_runner_v2 --worker_harness_container_image=
   gcr.io/my-project/my-image-name:my-image-tag" (there may be other ways,
   but this is what I have seen working first-hand)

Your docker file can be as simple as:

# Python:major:minor-slim must match apache/beam_python[major:minor]_sdk
FROM python:3.10-slim

# authenticate with private python package repo, install all various
# dependencies, set env vars, COPY your pipeline code to the container, etc
#
#  ...
#
#

# Copy files from official SDK image, including script/dependencies.
# Apache SDK version must match python image major:minor version
# Based on
https://cloud.google.com/dataflow/docs/guides/using-custom-containers#python_1
COPY --from=apache/beam_python3.10_sdk:2.52.0  /opt/apache/beam
/opt/apache/beam

# Set the entrypoint to Apache Beam SDK launcher.
ENTRYPOINT ["/opt/apache/beam/boot"]

[1]
https://cloud.google.com/dataflow/docs/guides/using-custom-containers#python_1
[2]
https://cloud.google.com/dataflow/docs/guides/build-container-image#python


On Fri, Dec 22, 2023 at 6:32 AM XQ Hu via user  wrote:

> You can use the same docker image for both template launcher and Dataflow
> job. Here is one example:
> https://github.com/google/dataflow-ml-starter/blob/main/tensorflow_gpu.flex.Dockerfile#L60
>
> On Fri, Dec 22, 2023 at 8:04 AM Sumit Desai 
> wrote:
>
>> Yes, I will have to try it out.
>>
>> Regards
>> Sumit Desai
>>
>> On Fri, Dec 22, 2023 at 3:53 PM Sofia’s World 
>> wrote:
>>
>>> I guess so, i am not an expert on using env variables in dataflow
>>> pipelines as any config dependencies i  need, i pass them as job input
>>> params
>>>
>>> But perhaps you can configure variables in your docker file (i am not an
>>> expert in this either),  as  flex templates use Docker?
>>>
>>>
>>> https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates
>>>
>>> hth
>>>   Marco
>>>
>>>
>>>
>>>
>>> On Fri, Dec 22, 2023 at 10:17 AM Sumit Desai 
>>> wrote:
>>>
 We are using an external non-public package which expects environmental
 variables only. If environmental variables are not found, it will throw an
 error. We can't change source of this package.

 Does this mean we will face same problem with flex templates also?

 On Fri, 22 Dec 2023, 3:39 pm Sofia’s World, 
 wrote:

> The flex template will allow you to pass input params with dynamic
> values to your data flow job so you could replace the env variable with
> that input? That is, unless you have to have env bars..but from your
> snippets it appears you are just using them to configure one of your
> components?
> Hth
>
> On Fri, 22 Dec 2023, 10:01 Sumit Desai, 
> wrote:
>
>> Hi Sofia and XQ,
>>
>> The application is failing because I have loggers defined in every
>> file and the method to create a logger tries to create an object of
>> UplightTelemetry. If I use flex templated, will the environmental 
>> variables
>> I supply be loaded before the application gets loaded? If not, it would 
>> not
>> serve my purpose.
>>
>> Thanks & Regards,
>> Sumit Desai
>>
>> On Thu, Dec 21, 2023 at 10:02 AM Sumit Desai 
>> wrote:
>>
>>> Thank you HQ. Will take a look at this.
>>>
>>> Regards,
>>> Sumit Desai
>>>
>>> On Wed, Dec 20, 2023 at 8:13 PM XQ Hu  wrote:
>>>
 Dataflow VMs cannot know your local env variable. I think you
 should use custom container:
 https://cloud.google.com/dataflow/docs/guides/using-custom-containers.
 Here is a sample project:
 https://github.com/google/dataflow-ml-starter

 On Wed, Dec 20, 2023 at 4:57 AM Sofia’s World 
 wrote:

> Hello Sumit
>  Thanks. Sorry...I guess if the value of the env variable is
> always the same u can pass it as job params?..though it doesn't sound 
> like
> a viable option...
> Hth
>
> On Wed, 20 Dec 2023, 09:49 Sumit Desai, 
> wrote:
>
>> Hi Sofia,
>>
>> Thanks for the response. For now, we have decided not to use flex
>> template. Is there a way to pass environmental variables without 
>> using any
>> template?
>>
>> Thanks & Regards,
>> Sumit Desai

Re: [Dataflow][Java][2.52.0] Upgrading to 2.52.0 Surfaces Pubsub Coder Error

2023-12-14 Thread Evan Galpin
The pipeline in question is using Dataflow v1 Runner (Runner v2: Disabled)
in case that's an important detail.

On Tue, Dec 12, 2023 at 4:22 PM Evan Galpin  wrote:

> I've checked the source code and deployment command for cases of setting
> experiments. I don't see "enable_custom_pubsub_source" being used at all,
> no.  I also confirmed that it is not active on the existing/running job.
>
> On Tue, Dec 12, 2023 at 4:11 PM Reuven Lax via user 
> wrote:
>
>> Are you setting the enable_custom_pubsub_source experiment by any chance?
>>
>> On Tue, Dec 12, 2023 at 3:24 PM Evan Galpin  wrote:
>>
>>> Hi all,
>>>
>>> When attempting to upgrade a running Dataflow pipeline from SDK 2.51.0
>>> to 2.52.0, an incompatibility warning is surfaced that prevents pipeline
>>> upgrade:
>>>
>>>
>>>> The Coder or type for step .../PubsubUnboundedSource has changed
>>>
>>>
>>> Was there an intentional coder change introduced for PubsubMessage in
>>> 2.52.0?  I didn't note anything in the release notes
>>> https://beam.apache.org/blog/beam-2.52.0/ nor recent changes in
>>> PubsubMessageWithAttributesCoder[1].  Specifically the step uses
>>> `PubsubMessageWithAttributesCoder` via
>>> `PubsubIO.readMessagesWithAttributes()`
>>>
>>> Thanks!
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/90e79ae373ab38cf4e48e9854c28aaffb0938458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java#L36
>>>
>>


Re: [Dataflow][Java][2.52.0] Upgrading to 2.52.0 Surfaces Pubsub Coder Error

2023-12-12 Thread Evan Galpin
I've checked the source code and deployment command for cases of setting
experiments. I don't see "enable_custom_pubsub_source" being used at all,
no.  I also confirmed that it is not active on the existing/running job.

On Tue, Dec 12, 2023 at 4:11 PM Reuven Lax via user 
wrote:

> Are you setting the enable_custom_pubsub_source experiment by any chance?
>
> On Tue, Dec 12, 2023 at 3:24 PM Evan Galpin  wrote:
>
>> Hi all,
>>
>> When attempting to upgrade a running Dataflow pipeline from SDK 2.51.0 to
>> 2.52.0, an incompatibility warning is surfaced that prevents pipeline
>> upgrade:
>>
>>
>>> The Coder or type for step .../PubsubUnboundedSource has changed
>>
>>
>> Was there an intentional coder change introduced for PubsubMessage in
>> 2.52.0?  I didn't note anything in the release notes
>> https://beam.apache.org/blog/beam-2.52.0/ nor recent changes in
>> PubsubMessageWithAttributesCoder[1].  Specifically the step uses
>> `PubsubMessageWithAttributesCoder` via
>> `PubsubIO.readMessagesWithAttributes()`
>>
>> Thanks!
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/90e79ae373ab38cf4e48e9854c28aaffb0938458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java#L36
>>
>


[Dataflow][Java][2.52.0] Upgrading to 2.52.0 Surfaces Pubsub Coder Error

2023-12-12 Thread Evan Galpin
Hi all,

When attempting to upgrade a running Dataflow pipeline from SDK 2.51.0 to
2.52.0, an incompatibility warning is surfaced that prevents pipeline
upgrade:


> The Coder or type for step .../PubsubUnboundedSource has changed


Was there an intentional coder change introduced for PubsubMessage in
2.52.0?  I didn't note anything in the release notes
https://beam.apache.org/blog/beam-2.52.0/ nor recent changes in
PubsubMessageWithAttributesCoder[1].  Specifically the step uses
`PubsubMessageWithAttributesCoder` via
`PubsubIO.readMessagesWithAttributes()`

Thanks!


[1]
https://github.com/apache/beam/blob/90e79ae373ab38cf4e48e9854c28aaffb0938458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java#L36


Re: [Dataflow][Stateful] Bypass Dataflow Overrides?

2023-05-25 Thread Evan Galpin
Understood, thanks for the clarification, I'll need to look more in-depth
at my pipeline code then.  I'm definitely observing that all steps
downstream from the Stateful step in my pipeline do not start until steps
upstream of the Stateful step are fully completed.  The Stateful step is a
RateLimit[1] transfer which borrows heavily from GroupIntoBatches.

[1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3

On Thu, May 25, 2023 at 2:25 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> The GbkBeforeStatefulParDo is an implementation detail used to send all
> elements with the same key to the same worker (so that they can share
> state, which is itself partitioned by worker). This does cause a global
> barrier in batch pipelines.
>
> On Thu, May 25, 2023 at 2:15 PM Evan Galpin  wrote:
>
>> Hi all,
>>
>> I'm running into a scenario where I feel that Dataflow Overrides
>> (specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
>> unnecessarily causing a batch pipeline to "pause" throughput since a GBK
>> needs to have processed all the data in a window before it can output.
>>
>> Is it strictly required that GbkBeforeStatefulParDo must run before any
>> stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
>> to protect against, and how can it be bypassed/disabled while still using
>> DataflowRunner?
>>
>> Thanks,
>> Evan
>>
>


[Dataflow][Stateful] Bypass Dataflow Overrides?

2023-05-25 Thread Evan Galpin
Hi all,

I'm running into a scenario where I feel that Dataflow Overrides
(specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
unnecessarily causing a batch pipeline to "pause" throughput since a GBK
needs to have processed all the data in a window before it can output.

Is it strictly required that GbkBeforeStatefulParDo must run before any
stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
to protect against, and how can it be bypassed/disabled while still using
DataflowRunner?

Thanks,
Evan


Re: [java] Trouble with gradle and using ParquetIO

2023-04-25 Thread Evan Galpin
The root cause was actually   "java.lang.ClassNotFoundException:
org.apache.hadoop.io.Writable" which I eventually fixed by including
hadoop-common as a dep for my pipeline (below).  Should hadoop-common be
listed as a dep of ParquetIO the beam repo itself?

implementation "org.apache.hadoop:hadoop-common:3.2.4"

On Fri, Apr 21, 2023 at 10:38 AM Evan Galpin  wrote:

> Oops, I was looking at the "bootleg" mvnrepository search engine, which
> shows `compileOnly` in the copy-pastable dependency installation
> prompts[1].  When I received the "ClassNotFound" error, my thought was that
> the dep should be installed in "implementation" mode.  When I tried that, I
> get other more strange errors when I try to run my pipeline:
> "java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.beam.sdk.coders.CoderRegistry".
>
> My deps are like so:
> implementation "org.apache.beam:beam-sdks-java-core:${beamVersion}"
> implementation
> "org.apache.beam:beam-sdks-java-io-parquet:${beamVersion}"
> ...
>
> Not sure why the CoderRegistry error comes up at runtime when both of the
> above deps are included.
>
> [1]
> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-parquet/2.46.0
>
> On Fri, Apr 21, 2023 at 2:34 AM Alexey Romanenko 
> wrote:
>
>> Just curious. where it was documented like this?
>>
>> I briefly checked it on Maven Central [1] and the provided code snippet
>> for Gradle uses “implementation” scope.
>>
>> —
>> Alexey
>>
>> [1]
>> https://search.maven.org/artifact/org.apache.beam/beam-sdks-java-io-parquet/2.46.0/jar
>>
>> > On 21 Apr 2023, at 01:52, Evan Galpin  wrote:
>> >
>> > Hi all,
>> >
>> > I'm trying to make use of ParquetIO.  Based on what's documented in
>> maven central, I'm including the artifact in "compileOnly" mode (or in
>> maven parlance, 'provided' scope).  I can successfully compile my pipeline,
>> but when I run it I (intuitively?) am met with a ClassNotFound exception
>> for ParquetIO.
>> >
>> > Is 'compileOnly' still the desired way to include ParquetIO as a
>> pipeline dependency?
>> >
>> > Thanks,
>> > Evan
>>
>>


Re: [EXTERNAL] Re: Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries

2023-04-24 Thread Evan Galpin
Is your pipeline a bounded or unbounded pipeline? Are you hoping to run a
job where the queries are streamed in to some unbounded pipeline Source,
and in response the pipeline would execute the query and proceed with any
downstream data manipulation? If so, unfortunately the approach I
described, as well as the ReadAll approach in SolrIO, won’t work. Both of
those approaches assume a bounded workload (Alexey please correct me if I'm
wrong).

The approach that fits best with supporting Elasticsearch reads in an
unbounded pipeline is rewriting ElasticsearchIO to be a Splitable DoFn[1].

It might be possible to run multiple jobs to suit your needs:

1. A streaming job that writes queries to a time-partitioned file in some
distributed file system (GCS, S3, HDFS)
2. A batch job (being deployed once every time a new file is “done” having
new things written to it) that reads a given time-partitioned file
containing the input queries, then uses a loop like in my prior example to
bootstrap the Elasticsearch reads for each query in the file.

The big difference here is that each of these batch pipelines is bounded,
whereas it sounds like you’d ideally like to have support in a streaming
pipeline. I think the two options for streaming support are using an es
java client directly in a custom DoFn, or converting Read to SplittableDoFn

[1]
https://beam.apache.org/blog/splittable-do-fn-is-available/

On Mon, Apr 24, 2023 at 13:36 Murphy, Sean P. via user 
wrote:

> Thank for the reply.
>
>
>
> This would need to build the queries at runtime.   There are incoming
> patient clinics for which there would be a known quantity, but this could
> fluctuate from thousands to hundreds of thousands depending on the size of
> study.
>
>
>
> From the approach you provided below; couldn’t the esQueryResults still be
> determined at runtime?
>
>
>
> ~Sean
>
>
>
> *From: *Evan Galpin 
>
>
> *Date: *Monday, April 24, 2023 at 2:18 PM
> *To: *user 
> *Cc: *Anthony Samy, Charles , Murphy, Sean
> P. 
> *Subject: *[EXTERNAL] Re: Re: Q: Apache Beam IOElasticsearchIO.read()
> method (Java), which expects a PBegin input and a means to handle a
> collection of queries
>
> Redirecting to the user mailing list as well to hopefully help the
> community if others face similar issues in the future.  All of the
> solutions in the thread so far involve making changes to the OSS Beam
> ElasticsearchIO codebase, which is the best long-term path and the path I
> would encourage.  That said, I understand that doing so is not always
> feasible depending on timelines etc.  Is your set of queries countable? Can
> they be known at pipeline compilation time? Not the most elegant solution,
> but you could potentially iterate over them if they can be known at compile
> time:
>
> List> esQueryResults = new ArrayList<>();
> for (String queryString : myKnownQueryStrings) {
>   esQueryResults.add(p
>   .apply(ElasticsearchIO.read()
>   .withConnectionConfiguration(
>   ElasticsearchIO.ConnectionConfiguration.create(hosts,
> indexName))
>   .withQuery(queryString))
>   );
> }
>
> PCollectionList resultsList = PCollectionList.empty(p);
>
> for (PCollection qResults : esQueryResults) {
>   resultsList.and(qResults);
> }
>
> resultsList
> .apply(Flatten.pCollections())
> .apply(...);
>
>
>
> On Mon, Apr 24, 2023 at 10:39 AM Murphy, Sean P. 
> wrote:
>
> Any other thoughts?  I’ve run out of ideas.   Thanks, ~Sean
>
>
>
> *From: *Murphy, Sean P. 
> *Date: *Friday, April 21, 2023 at 11:00 AM
> *To: *Alexey Romanenko , Evan Galpin <
> egal...@apache.org>
> *Cc: *Anthony Samy, Charles ,
> egal...@apache.org 
> *Subject: *Re: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read()
> method (Java), which expects a PBegin input and a means to handle a
> collection of queries
>
> Thank you, Alexey.
>
>
>
> The issue isn’t with the split itself, but how to introduce the Create.of
> (or similar) using the in a similar fashion as was described for the FileIO
> approach.   I may have missed something, but I’m not sure I can implement
> the same approach using ElasticsearchIO.  Thanks, ~Sean
>
>
>
> apply(MapElements
>
>  // uses imports from TypeDescriptors
>
>  .into(kvs(strings(), strings()))
>
>  .via((ReadableFile f) -> {
>
>try {
>
>  return KV.of(
>
>  f.getMetadata().resourceId().toString(),
> f.readFullyAsUTF8String());
>
>} catch (IOException ex) {
>
>  throw new RuntimeException("Failed to read the file", ex);
&g

Re: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries

2023-04-24 Thread Evan Galpin
Redirecting to the user mailing list as well to hopefully help the
community if others face similar issues in the future.  All of the
solutions in the thread so far involve making changes to the OSS Beam
ElasticsearchIO codebase, which is the best long-term path and the path I
would encourage.  That said, I understand that doing so is not always
feasible depending on timelines etc.  Is your set of queries countable? Can
they be known at pipeline compilation time? Not the most elegant solution,
but you could potentially iterate over them if they can be known at compile
time:

List> esQueryResults = new ArrayList<>();
for (String queryString : myKnownQueryStrings) {
  esQueryResults.add(p
  .apply(ElasticsearchIO.read()
  .withConnectionConfiguration(
  ElasticsearchIO.ConnectionConfiguration.create(hosts,
indexName))
  .withQuery(queryString))
  );
}

PCollectionList resultsList = PCollectionList.empty(p);

for (PCollection qResults : esQueryResults) {
  resultsList.and(qResults);
}

resultsList
.apply(Flatten.pCollections())
.apply(...);

On Mon, Apr 24, 2023 at 10:39 AM Murphy, Sean P. 
wrote:

> Any other thoughts?  I’ve run out of ideas.   Thanks, ~Sean
>
>
>
> *From: *Murphy, Sean P. 
> *Date: *Friday, April 21, 2023 at 11:00 AM
> *To: *Alexey Romanenko , Evan Galpin <
> egal...@apache.org>
> *Cc: *Anthony Samy, Charles ,
> egal...@apache.org 
> *Subject: *Re: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read()
> method (Java), which expects a PBegin input and a means to handle a
> collection of queries
>
> Thank you, Alexey.
>
>
>
> The issue isn’t with the split itself, but how to introduce the Create.of
> (or similar) using the in a similar fashion as was described for the FileIO
> approach.   I may have missed something, but I’m not sure I can implement
> the same approach using ElasticsearchIO.  Thanks, ~Sean
>
>
>
> apply(MapElements
>
>  // uses imports from TypeDescriptors
>
>  .into(kvs(strings(), strings()))
>
>  .via((ReadableFile f) -> {
>
>try {
>
>  return KV.of(
>
>  f.getMetadata().resourceId().toString(),
> f.readFullyAsUTF8String());
>
>} catch (IOException ex) {
>
>  throw new RuntimeException("Failed to read the file", ex);
>
>}
>
>  }));
>
>
>
>
>
> *From: *Alexey Romanenko 
> *Date: *Friday, April 21, 2023 at 5:20 AM
> *To: *Murphy, Sean P. 
> *Cc: *Anthony Samy, Charles ,
> egal...@apache.org 
> *Subject: *[EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method
> (Java), which expects a PBegin input and a means to handle a collection of
> queries
>
> Yes, “ReadAll” doesn’t exist for ElasticsearchIO, it has to be implemented
> (btw, it would be a good contribution for Beam!). I’d say that "SplitFn()"
> is rather optional and specific for this example of SolrIO, that I showed
> before. The general idea of this is actually to evenly distribute all
> Reads across all workers and split them, if possible, to have an equal load
> on your Elasticsearch cluster.
>
>
>
> I can’t say for sure what is a best way to implement it for Elasticsearch,
> so I’d recommend you to discuss it with Evan Galpin, who is a main
> contributor and maintaner of ElasticsearchIO.
>
>
>
> —
>
> Alexey
>
>
>
>
>
>
> On 20 Apr 2023, at 18:52, Murphy, Sean P.  wrote:
>
>
>
> Excuse my question if it’s obvious, but since those methods aren’t
> accessible for Elasticsearch from the same level :
> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
>
>
>
> Would I need to implement my own versions of SplitFn() and ReadAll()?
>
>
>
> Such as : PTransform, PCollection>
> { @Override public PCollection
> expand(PCollection input) { return input .apply("Split", ParDo.of(new
> SplitFn())) .apply("Reshuffle", Reshuffle.viaRandomKey()) .apply("Read",
> ParDo.of(new ReadFn())); } }
>
>
>
>
>
>
>
> *From: *Alexey Romanenko 
> *Date: *Thursday, April 20, 2023 at 11:13 AM
> *To: *user , Murphy, Sean P. 
> *Subject: *[EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method
> (Java), which expects a PBegin input and a means to handle a collection of
> queries
>
> Some Java IO-connectors implement a class something like "class ReadAll
> extends PTransform, PCollection>” where
> “Read” is supposed to be configured dynamically. As a simple example, take
> a look on “SolrIO” [1]
&g

Re: [java] Trouble with gradle and using ParquetIO

2023-04-21 Thread Evan Galpin
Oops, I was looking at the "bootleg" mvnrepository search engine, which
shows `compileOnly` in the copy-pastable dependency installation
prompts[1].  When I received the "ClassNotFound" error, my thought was that
the dep should be installed in "implementation" mode.  When I tried that, I
get other more strange errors when I try to run my pipeline:
"java.lang.NoClassDefFoundError: Could not initialize class
org.apache.beam.sdk.coders.CoderRegistry".

My deps are like so:
implementation "org.apache.beam:beam-sdks-java-core:${beamVersion}"
implementation
"org.apache.beam:beam-sdks-java-io-parquet:${beamVersion}"
...

Not sure why the CoderRegistry error comes up at runtime when both of the
above deps are included.

[1]
https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-parquet/2.46.0

On Fri, Apr 21, 2023 at 2:34 AM Alexey Romanenko 
wrote:

> Just curious. where it was documented like this?
>
> I briefly checked it on Maven Central [1] and the provided code snippet
> for Gradle uses “implementation” scope.
>
> —
> Alexey
>
> [1]
> https://search.maven.org/artifact/org.apache.beam/beam-sdks-java-io-parquet/2.46.0/jar
>
> > On 21 Apr 2023, at 01:52, Evan Galpin  wrote:
> >
> > Hi all,
> >
> > I'm trying to make use of ParquetIO.  Based on what's documented in
> maven central, I'm including the artifact in "compileOnly" mode (or in
> maven parlance, 'provided' scope).  I can successfully compile my pipeline,
> but when I run it I (intuitively?) am met with a ClassNotFound exception
> for ParquetIO.
> >
> > Is 'compileOnly' still the desired way to include ParquetIO as a
> pipeline dependency?
> >
> > Thanks,
> > Evan
>
>


[java] Trouble with gradle and using ParquetIO

2023-04-20 Thread Evan Galpin
Hi all,

I'm trying to make use of ParquetIO.  Based on what's documented in maven
central, I'm including the artifact in "compileOnly" mode (or in maven
parlance, 'provided' scope).  I can successfully compile my pipeline, but
when I run it I (intuitively?) am met with a ClassNotFound exception for
ParquetIO.

Is 'compileOnly' still the desired way to include ParquetIO as a pipeline
dependency?

Thanks,
Evan


Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries

2023-04-20 Thread Evan Galpin
For more info on splitable DoFn, there is a good resource on the beam
blog[1].  Alexey has also shown a great alternative!

[1] https://beam.apache.org/blog/splittable-do-fn/

On Thu, Apr 20, 2023 at 9:08 AM Alexey Romanenko 
wrote:

> Some Java IO-connectors implement a class something like "class ReadAll
> extends PTransform, PCollection>” where
> “Read” is supposed to be configured dynamically. As a simple example, take
> a look on “SolrIO” [1]
>
> So, to support what you are looking for, “ReadAll”-pattern should be
> implemented for ElasticsearchIO.
>
> —
> Alexey
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
>
> On 19 Apr 2023, at 19:05, Murphy, Sean P. via user 
> wrote:
>
> I'm running into an issue using the ElasticsearchIO.read() to handle more
> than one instance of a query. My queries are being dynamically built as a
> PCollection based on an incoming group of values. I'm trying to see how
> to load the .withQuery() parameter which could provide this capability or
> any approach that provides flexibility.
>
> The issue is that ElasticsearchIO.read() method expects a PBegin input to
> start a pipeline, but it seems like I need access outside of a pipeline
> context somehow. PBegin represents the beginning of a pipeline, and it's
> required to create a pipeline that can read data from Elasticsearch using
> IOElasticsearchIO.read().
>
> Can I wrap the ElasticsearchIO.read() call in a Create transform that
> creates a PCollection with a single element (e.g., PBegin) to simulate the
> beginning of a pipeline or something similar?
>
> Here is my naive attempt without accepting the reality of PBegin:
>PCollection queries = ... // a PCollection of Elasticsearch
> queries
>
> PCollection queryResults = queries.apply(
> ParDo.of(new DoFn() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> String query = c.element();
> PCollection results = c.pipeline()
> .apply(ElasticsearchIO.read()
> .withConnectionConfiguration(
>
> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
> .withQuery(query));
> c.output(results);
> }
> })
> .apply(Flatten.pCollections()));
>
>
>
> In general I'm wondering for any of IO-related classes proved by Beam that
> conforms to PBegin input -- if there is a means to introduce a collection.
>
>
>
> Here is one approach that might be promising:
> // Define a ValueProvider for a List
> ValueProvider> myListProvider =
> ValueProvider.StaticValueProvider.of(myList);
>
> // Use the ValueProvider to create a PCollection of Strings
> PCollection pcoll =
> pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of()));
>
> PCollection partitionData = PBegin.in(pipeline)
> .apply("Read data from Elasticsearch", 
> ElasticsearchIO.*read*().withConnectionConfiguration(connConfig).withQuery(ValueProvider
> pcoll).withScrollKeepalive("1m").withBatchSize(50))
> .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(),
> opt.getMedTagVersion(), opt.getNoteType()));
>
> Any thoughts or ideas would be great.   Thanks, ~Sean
>
>
>


Re: Q: Apache Beam IOElasticsearchIO.read() method (Java), which expects a PBegin input and a means to handle a collection of queries

2023-04-19 Thread Evan Galpin
Yes unfortunately the ES IO connector is not built in a way that can work
by taking inputs from a PCollection to issue Reads. The most scalable way
to support this is to revisit the implementation of Elasticsearch Read
transform and instead implement it as a SplittableDoFn.

On Wed, Apr 19, 2023 at 10:51 Shahar Frank  wrote:

> Hi Sean,
>
> I'm not an expert but I think the .withQuery() functions takes part of the
> build stage rather than the runtime stage.
> This means that the way ElasticsearchIO was built is so that while the
> pipeline is being built you could set the query but it is not possible
> during runtime which mean you cannot dynamically run the query based on the
> element processed within the pipeline.
>
> To do something like that the transformation must be designed more like
> the FileIO in this example: (From
> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/FileIO.html
> )
>
>>  PCollection> filesAndContents = p
>>  .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
>>  // withCompression can be omitted - by default compression is detected 
>> from the filename.
>>  .apply(FileIO.readMatches().withCompression(GZIP))
>>  .apply(MapElements
>>  // uses imports from TypeDescriptors
>>  .into(kvs(strings(), strings()))
>>  .via((ReadableFile f) -> {
>>try {
>>  return KV.of(
>>  f.getMetadata().resourceId().toString(), 
>> f.readFullyAsUTF8String());
>>} catch (IOException ex) {
>>  throw new RuntimeException("Failed to read the file", ex);
>>}
>>  }));
>>
>>
> If you look at how FileIO.readMatches() works - it doesn't set the
> filename when building the pipeline but rather accepts that within the
> ProcessElement function.
>
> See here
> 
>
> Does that make sense?
>
> Cheers,
> Shahar.
>
> --
>
> Shahar Frank
>
> srf...@gmail.com
>
> +447799561438
>
> --
>
>
>
>
>
>
> On Wed, 19 Apr 2023 at 18:05, Murphy, Sean P. via user <
> user@beam.apache.org> wrote:
>
>> I'm running into an issue using the ElasticsearchIO.read() to handle
>> more than one instance of a query. My queries are being dynamically built
>> as a PCollection based on an incoming group of values. I'm trying to see
>> how to load the .withQuery() parameter which could provide this
>> capability or any approach that provides flexibility.
>>
>>
>>
>> The issue is that ElasticsearchIO.read() method expects a PBegin input
>> to start a pipeline, but it seems like I need access outside of a pipeline
>> context somehow. PBegin represents the beginning of a pipeline, and it's
>> required to create a pipeline that can read data from Elasticsearch using
>> IOElasticsearchIO.read().
>>
>>
>>
>> Can I wrap the ElasticsearchIO.read() call in a Create transform that
>> creates a PCollection with a single element (e.g., PBegin) to simulate the
>> beginning of a pipeline or something similar?
>>
>>
>>
>> Here is my naive attempt without accepting the reality of PBegin:
>>
>>PCollection queries = ... // a PCollection of Elasticsearch
>> queries
>>
>>
>>
>> PCollection queryResults = queries.apply(
>>
>> ParDo.of(new DoFn() {
>>
>> @ProcessElement
>>
>> public void processElement(ProcessContext c) {
>>
>> String query = c.element();
>>
>> PCollection results = c.pipeline()
>>
>> .apply(ElasticsearchIO.read()
>>
>> .withConnectionConfiguration(
>>
>>
>> ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
>>
>> .withQuery(query));
>>
>> c.output(results);
>>
>> }
>>
>> })
>>
>> .apply(Flatten.pCollections()));
>>
>>
>>
>>
>>
>> In general I'm wondering for any of IO-related classes proved by Beam
>> that conforms to PBegin input -- if there is a means to introduce a
>> collection.
>>
>>
>>
>> Here is one approach that might be promising:
>>
>> // Define a ValueProvider for a List
>>
>> ValueProvider> myListProvider =
>> ValueProvider.StaticValueProvider.of(myList);
>>
>>
>>
>> // Use the ValueProvider to create a PCollection of Strings
>>
>> PCollection pcoll =
>> pipeline.apply(Create.ofProvider(myListProvider, ListCoder.of()));
>>
>>
>>
>> PCollection partitionData = PBegin.in(pipeline)
>> .apply("Read data from Elasticsearch", 
>> ElasticsearchIO.*read*().withConnectionConfiguration(connConfig).withQuery(ValueProvider
>> pcoll).withScrollKeepalive("1m").withBatchSize(50))
>> .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(),
>> opt.getMedTagVersion(), opt.getNoteType()));
>>
>>
>>
>> Any thoughts or ideas would be great.   Thanks, ~Sean
>>
>>
>>
>


Re: [EXTERNAL] Re: ElasticsearchIO write to trigger a percolator

2023-02-02 Thread Evan Galpin
I believe that 2.41.0 is the "oldest" safe version[1] to use as there were
initially some bugs introduced when migrating from PDone to outputting the
write results.

[1]
https://github.com/apache/beam/commit/2cb2ee2ba3b5efb0f08880a9325f092485b3ccf2

On Thu, Feb 2, 2023 at 3:16 PM Kaggal, Vinod C. (Vinod C.), M.S. via user <
user@beam.apache.org> wrote:

> Thank you for the response!
>
>
>
> We are currently using :
>
> 2.37.0
> 2.33.0
>
>
>
> Is there a compatible elasticsearchio version that may work with our beam
> version?
>
>
>
> Thank you!
>
> Vinod
>
>
>
> *From: *Chamikara Jayalath 
> *Date: *Thursday, February 2, 2023 at 6:01 PM
> *To: *user@beam.apache.org , Kaggal, Vinod C.
> (Vinod C), M.S. 
> *Subject: *[EXTERNAL] Re: ElasticsearchIO write to trigger a percolator
>
>
>
>
>
> On Thu, Feb 2, 2023 at 1:56 PM Kaggal, Vinod C. (Vinod C.), M.S. via user <
> user@beam.apache.org> wrote:
>
> Hello! Thank you for all the hard work on implementing these useful
> libraries.
>
>
>
> *Background:* We have been using Apache Storm in production for some time
> (over 8 years) and have recently switched over to Beam. One of the
> topologies that we had in Storm was to ingest data, index to elastic
> (write) and right after the write to elastic we would trigger a percolator
> query for a downstream task to be triggered. The DAGs using Flux allowed us
> to chain these steps ( [read from source] -> [process] -> [index] ->
> [percolator] -> [trigger]).
>
>
>
> *Beam:*
>
> We are able to accomplish much of that using beam. However, using
> ElasticsearchIO, the index step results in a PDone and hence we cannot
> chain the remaining tasks ( [percolator] -> [trigger other events]).
>
>
>
> The question I have is this: is there a way to trigger a percolator query
> after writing a document to elastic using ElasticsearchIO.Write()?
>
>
>
> Which version of Beam are you using ?
>
> Seems like for recent versions of Beam, we actually return
> PCollections from ElasticsearchIO sink so this should not be an issue.
>
>
>
>
> https://github.com/apache/beam/blob/24f40aab7aa4c4f9eea6dc90c0baa22bb17a962e/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1842
> 
>
>
>
> Thanks,
>
> Cham
>
>
>
>
>
> Your thoughts would be appreciated!
>
> Vinod
>
>
>
>


Re: [Question][Dataflow][Java][pubsub] Streaming Pipeline Stall Scenarios

2022-11-17 Thread Evan Galpin
friendly bump 🙃 Anyone have thoughts or answers?  Thanks!

On Thu, Nov 3, 2022 at 3:07 PM Evan Galpin  wrote:

> Hi folks,
>
> Hoping to get some definitive answers with respect to streaming pipeline
> bundle retry semantics on Dataflow.  I understand that a bundle containing
> a "poison pill" (bad data, let's say it causes a null pointer exception
> when processing in DoFn) will be retried indefinitely.  What I'm not clear
> on are the implications of those retries.
>
>
>1. Is it the case that a worker will continuously retry the same
>"poison pill" bundle, and not be able to work on any other/new bundles
>indefinitely after receiving the first poison pill? I've noticed that a
>small number poison pills can cause all processing to stall, even if the
>bad data represents only a very small percentage of the overall data being
>processed
>2. Is there any implication with windowing and this retry/stall
>scenario?  I've noticed that the scenario where all processing stalls
>entirely is more common for a pipeline where all data is globally
>windowed.  I don't, however, have a solid understanding of how to explain
>that observation; I'd really appreciate any insights that can aid in
>understanding
>
> Thanks,
> Evan
>


Re: [KafkaIO] Use of sinkGroupId with Exactly Once Semantics

2022-11-14 Thread Evan Galpin
Thanks for the input folks! Sounds like a singular value across my
application (despite that application writing to multiple topics) is what I
want.

On Fri, Nov 11, 2022 at 1:34 PM Byron Ellis via user 
wrote:

> The Kafka consumer offset key
> <https://github.com/apache/kafka/blob/trunk/core/src/main/resources/common/message/OffsetCommitKey.json>
>  is
> (group, topic, partition)
>
> On Fri, Nov 11, 2022 at 7:58 AM John Casey via user 
> wrote:
>
>> I haven't done this experimentally before, so take this with a grain of
>> salt, but...
>>
>> Kafka Group Ids are essentially used to track where a logical (aka
>> application level, not thread/machine level) producer / consumer is at. As
>> such, I think it would be fine to use just one group id, even when writing
>> to multiple topics
>>
>> On Thu, Nov 10, 2022 at 6:30 PM Evan Galpin  wrote:
>>
>>> Hey folks,
>>>
>>> I can see in the docs for "withEOS"[1] in the KafkaIO#Write section that
>>> the sinkGroupId is recommended to be unique per job.  I'm wondering about a
>>> case where a single job outputs to multiple topics.  Would it be advisable
>>> to have a unique sinkGroupId per instance of KafkaIO#Write transform, or
>>> still only per job even if the job has multiple KafkaIO#Write?
>>>
>>> Thanks in advance!
>>>
>>> [1]
>>> https://beam.apache.org/releases/javadoc/2.41.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
>>>
>>


[KafkaIO] Use of sinkGroupId with Exactly Once Semantics

2022-11-10 Thread Evan Galpin
Hey folks,

I can see in the docs for "withEOS"[1] in the KafkaIO#Write section that
the sinkGroupId is recommended to be unique per job.  I'm wondering about a
case where a single job outputs to multiple topics.  Would it be advisable
to have a unique sinkGroupId per instance of KafkaIO#Write transform, or
still only per job even if the job has multiple KafkaIO#Write?

Thanks in advance!

[1]
https://beam.apache.org/releases/javadoc/2.41.0/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-


[Question][Dataflow][Java][pubsub] Streaming Pipeline Stall Scenarios

2022-11-03 Thread Evan Galpin
Hi folks,

Hoping to get some definitive answers with respect to streaming pipeline
bundle retry semantics on Dataflow.  I understand that a bundle containing
a "poison pill" (bad data, let's say it causes a null pointer exception
when processing in DoFn) will be retried indefinitely.  What I'm not clear
on are the implications of those retries.


   1. Is it the case that a worker will continuously retry the same "poison
   pill" bundle, and not be able to work on any other/new bundles indefinitely
   after receiving the first poison pill? I've noticed that a small number
   poison pills can cause all processing to stall, even if the bad data
   represents only a very small percentage of the overall data being processed
   2. Is there any implication with windowing and this retry/stall
   scenario?  I've noticed that the scenario where all processing stalls
   entirely is more common for a pipeline where all data is globally
   windowed.  I don't, however, have a solid understanding of how to explain
   that observation; I'd really appreciate any insights that can aid in
   understanding

Thanks,
Evan


Re: Help on Apache Beam Pipeline Optimization

2022-10-11 Thread Evan Galpin
If I’m not mistaken you could create a PCollection from the pubsub read
operation, and then apply 3 different windowing strategies in different
“chains” of the graph. Ex

PCollection msgs = PubsubIO.read(…);

msgs.apply(Window.into(FixedWindows.of(1 min)).apply(allMyTransforms)

msgs.apply(Window.into(FixedWindows.of(5 min)).apply(allMyTransforms)

msgs.apply(Window.into(FixedWindows.of(60 min)).apply(allMyTransforms)


Similarly this could be done with a loop if preferred.

On Tue, Oct 4, 2022 at 14:15 Yi En Ong  wrote:

> Hi,
>
>
> I am trying to optimize my Apache Beam pipeline on Google Cloud Platform
> Dataflow, and I would really appreciate your help and advice.
>
>
> Background information: I am trying to read data from PubSub Messages, and
> aggregate them based on 3 time windows: 1 min, 5 min and 60 min. Such
> aggregations consists of summing, averaging, finding the maximum or
> minimum, etc. For example, for all data collected from 1200 to 1201, I want
> to aggregate them and write the output into BigTable's 1-min column family.
> And for all data collected from 1200 to 1205, I want to similarly aggregate
> them and write the output into BigTable's 5-min column. Same goes for 60min.
>
>
> The current approach I took is to have 3 separate dataflow jobs (i.e. 3
> separate Beam Pipelines), each one having a different window duration
> (1min, 5min and 60min). See
> https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/windowing/Window.html.
> And the outputs of all 3 dataflow jobs are written to the same BigTable,
> but on different column families. Other than that, the function and
> aggregations of the data are the same for the 3 jobs.
>
>
> However, this seems to be very computationally inefficient, and cost
> inefficient, as the 3 jobs are essentially doing the same function, with
> the only exception being the window time duration and output column family.
>
>
>
> Some challenges and limitations we faced was that from the documentation,
> it seems like we are unable to create multiple windows of different periods
> in a singular dataflow job. Also, when we write the final data into big
> table, we would have to define the table, column family, column, and
> rowkey. And unfortunately, the column family is a fixed property (i.e. it
> cannot be redefined or changed given the window period).
>
>
> Hence, I am writing to ask if there is a way to only use 1 dataflow job
> that fulfils the objective of this project? Which is to aggregate data on
> different window periods, and write them to different column families of
> the same BigTable.
>
>
> Thank you
>


Re: [Question] Beam 2.42.0 Release Date Confirmation

2022-09-28 Thread Evan Galpin
Hi there :-)

You can follow the vote on the first release candidate for 2.42.0 here:
https://lists.apache.org/thread/ho2mvvgc253my1ovqmrxjpql8gvz0285

Thanks,
Evan

On Tue, Sep 27, 2022 at 7:16 AM Varun Chopra via user 
wrote:

> Classification: Public
>
>
>
> Hi Team,
>
>
>
> We at Deutsche Bank are using Apache Beam SDK 2.41.0 and it has
> Vulnerabilities of *netty-codec* under *beam-vendor-grpc* jar.
>
>
>
> We checked the Github fixes and seems like the code is fixed and merged.
>
>
>
> We wanted to understand when can we expect Apache Beam SDK 2.42.0 Version,
> so that we can start using it.
>
>
>
> We have some urgent requirement for this fix, If you can provide some
> dates that will help.
>
>
>
> Kind Regards,
> Varun Chopra
>
>
>
>
> ---
> This e-mail may contain confidential and/or privileged information. If you
> are not the intended recipient (or have received this e-mail in error)
> please notify the sender immediately and destroy this e-mail. Any
> unauthorized copying, disclosure or distribution of the material in this
> e-mail is strictly forbidden.
>
> Privacy of communications
> In order to monitor compliance with legal and regulatory obligations and
> our policies, procedures and compliance programs, we may review emails and
> instant messages passing through our IT systems (including any personal
> data and customer information they contain), and record telephone calls
> routed via our telephone systems. We will only do so in accordance with
> local laws and regulations. In some countries please refer to your local DB
> website for a copy of our Privacy Policy.
>
> Please refer to https://db.com/disclosures for additional EU corporate
> and regulatory disclosures.
>


Re: [troubleshooting] KafkaIO#write gets stuck "since the associated topicId changed from null to "

2022-09-16 Thread Evan Galpin
Following up to close the loop.  I believe the Kafka errors I was seeing
were a red herring.  The actual root cause of the issues was worker nodes
running out of memory, and as a result kafka producers would have
difficulty competing for resources over GC thrashing.  Increasing the
worker node size to where there are no longer OOMKills has removed any
kafka "issue".

Thanks all for your time and willingness to help.

Evan

On Tue, Sep 13, 2022 at 12:33 PM Evan Galpin  wrote:

> There's a few related log lines, but there isn't a full stacktrace as the
> info originates from a logger statement[1] as opposed to thrown exception.
> The related log lines are like so:
>
> org.apache.kafka.clients.NetworkClient [Producer clientId=producer-109]
> Disconnecting from node 10 due to socket connection setup timeout. The
> timeout value is 11436 ms.[2]
>
> and
>
> org.apache.kafka.clients.NetworkClient [Producer clientId=producer-109]
> Node 10 disconnected.[3]
>
> [1]
> https://github.com/apache/kafka/blob/f653cb7b5889fd619ab0e6a25216bd981a9d82bf/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
> [2]
> https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L820
> [3]
> https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L937
>
> On Tue, Sep 13, 2022 at 12:17 PM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>> Do you have by any chance the full stacktrace of this error?
>>
>> —
>> Alexey
>>
>> On 13 Sep 2022, at 18:05, Evan Galpin  wrote:
>>
>> Ya likewise, I'd expect this to be handled in the Kafka code without the
>> need for special handling by Beam.  I'll reach out to Kafka mailing list as
>> well and try to get a better understanding of the root issue.  Thanks for
>> your time so far John! I'll ping this thread with any interesting findings
>> or insights.
>>
>> Thanks,
>> Evan
>>
>> On Tue, Sep 13, 2022 at 11:38 AM John Casey via user <
>> user@beam.apache.org> wrote:
>>
>>> In principle yes, but I don't see any Beam level code to handle that.
>>> I'm a bit surprised it isn't handled in the Kafka producer layer itself.
>>>
>>> On Tue, Sep 13, 2022 at 11:15 AM Evan Galpin  wrote:
>>>
>>>> I'm not certain based on the logs where the disconnect is starting.  I
>>>> have seen TimeoutExceptions like that mentioned in the SO issue you linked,
>>>> so if we assume it's starting from the kafka cluster side, my concern is
>>>> that the producers don't seem to be able to gracefully recover.  Given that
>>>> restarting the pipeline (in this case, in Dataflow) makes the issue go
>>>> away, I'm under the impression that producer clients in KafkaIO#write can
>>>> get into a state that they're not able to recover from after experiencing a
>>>> disconnect.  Is graceful recovery after cluster unavailability something
>>>> that would be expected to be supported by KafkaIO today?
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> On Tue, Sep 13, 2022 at 11:07 AM John Casey via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> Googling that error message returned
>>>>> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout
>>>>> and
>>>>> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
>>>>>
>>>>> Which suggests that there is some sort of disconnect happening between
>>>>> your pipeline and your kafka instance.
>>>>>
>>>>> Do you see any logs when this disconnect starts, on the Beam or Kafka
>>>>> side of things?
>>>>>
>>>>> On Tue, Sep 13, 2022 at 10:38 AM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> Thanks for the quick reply John!  I should also add that the root
>>>>>> issue is not so much the logging, rather that these log messages seem to 
>>>>>> be
>>>>>> correlated with periods where producers are not able to publish data to
>>>>>> kafka.  The issue of not being able to publish data does not seem to
>>>>>> resolve until restarting or updating th

Re: [troubleshooting] KafkaIO#write gets stuck "since the associated topicId changed from null to "

2022-09-13 Thread Evan Galpin
There's a few related log lines, but there isn't a full stacktrace as the
info originates from a logger statement[1] as opposed to thrown exception.
The related log lines are like so:

org.apache.kafka.clients.NetworkClient [Producer clientId=producer-109]
Disconnecting from node 10 due to socket connection setup timeout. The
timeout value is 11436 ms.[2]

and

org.apache.kafka.clients.NetworkClient [Producer clientId=producer-109]
Node 10 disconnected.[3]

[1]
https://github.com/apache/kafka/blob/f653cb7b5889fd619ab0e6a25216bd981a9d82bf/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
[2]
https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L820
[3]
https://github.com/apache/kafka/blob/1135f22eaf404fdf76489302648199578876c4ac/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L937

On Tue, Sep 13, 2022 at 12:17 PM Alexey Romanenko 
wrote:

> Do you have by any chance the full stacktrace of this error?
>
> —
> Alexey
>
> On 13 Sep 2022, at 18:05, Evan Galpin  wrote:
>
> Ya likewise, I'd expect this to be handled in the Kafka code without the
> need for special handling by Beam.  I'll reach out to Kafka mailing list as
> well and try to get a better understanding of the root issue.  Thanks for
> your time so far John! I'll ping this thread with any interesting findings
> or insights.
>
> Thanks,
> Evan
>
> On Tue, Sep 13, 2022 at 11:38 AM John Casey via user 
> wrote:
>
>> In principle yes, but I don't see any Beam level code to handle that. I'm
>> a bit surprised it isn't handled in the Kafka producer layer itself.
>>
>> On Tue, Sep 13, 2022 at 11:15 AM Evan Galpin  wrote:
>>
>>> I'm not certain based on the logs where the disconnect is starting.  I
>>> have seen TimeoutExceptions like that mentioned in the SO issue you linked,
>>> so if we assume it's starting from the kafka cluster side, my concern is
>>> that the producers don't seem to be able to gracefully recover.  Given that
>>> restarting the pipeline (in this case, in Dataflow) makes the issue go
>>> away, I'm under the impression that producer clients in KafkaIO#write can
>>> get into a state that they're not able to recover from after experiencing a
>>> disconnect.  Is graceful recovery after cluster unavailability something
>>> that would be expected to be supported by KafkaIO today?
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Tue, Sep 13, 2022 at 11:07 AM John Casey via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> Googling that error message returned
>>>> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout
>>>> and
>>>> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
>>>>
>>>> Which suggests that there is some sort of disconnect happening between
>>>> your pipeline and your kafka instance.
>>>>
>>>> Do you see any logs when this disconnect starts, on the Beam or Kafka
>>>> side of things?
>>>>
>>>> On Tue, Sep 13, 2022 at 10:38 AM Evan Galpin 
>>>> wrote:
>>>>
>>>>> Thanks for the quick reply John!  I should also add that the root
>>>>> issue is not so much the logging, rather that these log messages seem to 
>>>>> be
>>>>> correlated with periods where producers are not able to publish data to
>>>>> kafka.  The issue of not being able to publish data does not seem to
>>>>> resolve until restarting or updating the pipeline.
>>>>>
>>>>> Here's my publisher config map:
>>>>>
>>>>> .withProducerConfigUpdates(
>>>>> Map.ofEntries(
>>>>> Map.entry(
>>>>> ProducerConfig.PARTITIONER_CLASS_CONFIG,
>>>>> DefaultPartitioner.class),
>>>>> Map.entry(
>>>>> ProducerConfig.COMPRESSION_TYPE_CONFIG,
>>>>> CompressionType.GZIP.name <http://compressiontype.gzip.name/>),
>>>>> Map.entry(
>>>>>
>>>>> CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
>>>>> SecurityProtocol.SASL_SSL.name
>>>>> <http://securityprotocol.sasl_ssl.name/>),
>>&g

Re: [troubleshooting] KafkaIO#write gets stuck "since the associated topicId changed from null to "

2022-09-13 Thread Evan Galpin
Ya likewise, I'd expect this to be handled in the Kafka code without the
need for special handling by Beam.  I'll reach out to Kafka mailing list as
well and try to get a better understanding of the root issue.  Thanks for
your time so far John! I'll ping this thread with any interesting findings
or insights.

Thanks,
Evan

On Tue, Sep 13, 2022 at 11:38 AM John Casey via user 
wrote:

> In principle yes, but I don't see any Beam level code to handle that. I'm
> a bit surprised it isn't handled in the Kafka producer layer itself.
>
> On Tue, Sep 13, 2022 at 11:15 AM Evan Galpin  wrote:
>
>> I'm not certain based on the logs where the disconnect is starting.  I
>> have seen TimeoutExceptions like that mentioned in the SO issue you linked,
>> so if we assume it's starting from the kafka cluster side, my concern is
>> that the producers don't seem to be able to gracefully recover.  Given that
>> restarting the pipeline (in this case, in Dataflow) makes the issue go
>> away, I'm under the impression that producer clients in KafkaIO#write can
>> get into a state that they're not able to recover from after experiencing a
>> disconnect.  Is graceful recovery after cluster unavailability something
>> that would be expected to be supported by KafkaIO today?
>>
>> Thanks,
>> Evan
>>
>> On Tue, Sep 13, 2022 at 11:07 AM John Casey via user <
>> user@beam.apache.org> wrote:
>>
>>> Googling that error message returned
>>> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout
>>> and
>>> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
>>>
>>> Which suggests that there is some sort of disconnect happening between
>>> your pipeline and your kafka instance.
>>>
>>> Do you see any logs when this disconnect starts, on the Beam or Kafka
>>> side of things?
>>>
>>> On Tue, Sep 13, 2022 at 10:38 AM Evan Galpin  wrote:
>>>
>>>> Thanks for the quick reply John!  I should also add that the root issue
>>>> is not so much the logging, rather that these log messages seem to be
>>>> correlated with periods where producers are not able to publish data to
>>>> kafka.  The issue of not being able to publish data does not seem to
>>>> resolve until restarting or updating the pipeline.
>>>>
>>>> Here's my publisher config map:
>>>>
>>>> .withProducerConfigUpdates(
>>>> Map.ofEntries(
>>>> Map.entry(
>>>> ProducerConfig.PARTITIONER_CLASS_CONFIG,
>>>> DefaultPartitioner.class),
>>>> Map.entry(
>>>> ProducerConfig.COMPRESSION_TYPE_CONFIG,
>>>> CompressionType.GZIP.name),
>>>> Map.entry(
>>>>
>>>> CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
>>>> SecurityProtocol.SASL_SSL.name),
>>>> Map.entry(
>>>> SaslConfigs.SASL_MECHANISM,
>>>> PlainSaslServer.PLAIN_MECHANISM),
>>>> Map.entry(
>>>> SaslConfigs.SASL_JAAS_CONFIG,
>>>> "org.apache.kafka.common.security.plain.PlainLoginModule required
>>>> username=\"\" password=\"\";")))
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> On Tue, Sep 13, 2022 at 10:30 AM John Casey 
>>>> wrote:
>>>>
>>>>> Hi Evan,
>>>>>
>>>>> I haven't seen this before. Can you share your Kafka write
>>>>> configuration, and any other stack traces that could be relevant?
>>>>>
>>>>> John
>>>>>
>>>>> On Tue, Sep 13, 2022 at 10:23 AM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I've recently started using the KafkaIO connector as a sink, and am
>>>>>> new to Kafka in general.  My kafka clusters are hosted by Confluent 
>>>>>> Cloud.
>>>>>> I'm using Beam SDK 2.41.0.  At least daily, the producers in my Beam
>>>>>> pipeline are getting stuck in a loop frantically logging this message:
>>>>>>
>>>>>> Node n disconnected.
>>>>>>
>>>>>> Resetting the last seen epoch of partition  to x
>>>>>> since the associated topicId changed from null to 
>>>>>>
>>>>>> Updating the running pipeline "resolves" the issue I believe as a
>>>>>> result of recreating the Kafka producer clients, but it seems that as-is
>>>>>> the KafkaIO producer clients are not resilient to node disconnects.  
>>>>>> Might
>>>>>> I be missing a configuration option, or are there any known issues like
>>>>>> this?
>>>>>>
>>>>>> Thanks,
>>>>>> Evan
>>>>>>
>>>>>


Re: [troubleshooting] KafkaIO#write gets stuck "since the associated topicId changed from null to "

2022-09-13 Thread Evan Galpin
I'm not certain based on the logs where the disconnect is starting.  I have
seen TimeoutExceptions like that mentioned in the SO issue you linked, so
if we assume it's starting from the kafka cluster side, my concern is that
the producers don't seem to be able to gracefully recover.  Given that
restarting the pipeline (in this case, in Dataflow) makes the issue go
away, I'm under the impression that producer clients in KafkaIO#write can
get into a state that they're not able to recover from after experiencing a
disconnect.  Is graceful recovery after cluster unavailability something
that would be expected to be supported by KafkaIO today?

Thanks,
Evan

On Tue, Sep 13, 2022 at 11:07 AM John Casey via user 
wrote:

> Googling that error message returned
> https://stackoverflow.com/questions/71077394/kafka-producer-resetting-the-last-seen-epoch-of-partition-resulting-in-timeout
> and
> https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L402
>
> Which suggests that there is some sort of disconnect happening between
> your pipeline and your kafka instance.
>
> Do you see any logs when this disconnect starts, on the Beam or Kafka side
> of things?
>
> On Tue, Sep 13, 2022 at 10:38 AM Evan Galpin  wrote:
>
>> Thanks for the quick reply John!  I should also add that the root issue
>> is not so much the logging, rather that these log messages seem to be
>> correlated with periods where producers are not able to publish data to
>> kafka.  The issue of not being able to publish data does not seem to
>> resolve until restarting or updating the pipeline.
>>
>> Here's my publisher config map:
>>
>> .withProducerConfigUpdates(
>> Map.ofEntries(
>> Map.entry(
>> ProducerConfig.PARTITIONER_CLASS_CONFIG,
>> DefaultPartitioner.class),
>> Map.entry(
>> ProducerConfig.COMPRESSION_TYPE_CONFIG,
>> CompressionType.GZIP.name),
>> Map.entry(
>> CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
>> SecurityProtocol.SASL_SSL.name),
>> Map.entry(
>> SaslConfigs.SASL_MECHANISM,
>> PlainSaslServer.PLAIN_MECHANISM),
>> Map.entry(
>> SaslConfigs.SASL_JAAS_CONFIG,
>> "org.apache.kafka.common.security.plain.PlainLoginModule required
>> username=\"\" password=\"\";")))
>>
>> Thanks,
>> Evan
>>
>> On Tue, Sep 13, 2022 at 10:30 AM John Casey 
>> wrote:
>>
>>> Hi Evan,
>>>
>>> I haven't seen this before. Can you share your Kafka write
>>> configuration, and any other stack traces that could be relevant?
>>>
>>> John
>>>
>>> On Tue, Sep 13, 2022 at 10:23 AM Evan Galpin  wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've recently started using the KafkaIO connector as a sink, and am new
>>>> to Kafka in general.  My kafka clusters are hosted by Confluent Cloud.  I'm
>>>> using Beam SDK 2.41.0.  At least daily, the producers in my Beam pipeline
>>>> are getting stuck in a loop frantically logging this message:
>>>>
>>>> Node n disconnected.
>>>>
>>>> Resetting the last seen epoch of partition  to x since
>>>> the associated topicId changed from null to 
>>>>
>>>> Updating the running pipeline "resolves" the issue I believe as a
>>>> result of recreating the Kafka producer clients, but it seems that as-is
>>>> the KafkaIO producer clients are not resilient to node disconnects.  Might
>>>> I be missing a configuration option, or are there any known issues like
>>>> this?
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>


Re: [troubleshooting] KafkaIO#write gets stuck "since the associated topicId changed from null to "

2022-09-13 Thread Evan Galpin
Thanks for the quick reply John!  I should also add that the root issue is
not so much the logging, rather that these log messages seem to be
correlated with periods where producers are not able to publish data to
kafka.  The issue of not being able to publish data does not seem to
resolve until restarting or updating the pipeline.

Here's my publisher config map:

.withProducerConfigUpdates(
Map.ofEntries(
Map.entry(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
DefaultPartitioner.class),
Map.entry(
ProducerConfig.COMPRESSION_TYPE_CONFIG,
CompressionType.GZIP.name),
Map.entry(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SASL_SSL.name),
Map.entry(
SaslConfigs.SASL_MECHANISM,
PlainSaslServer.PLAIN_MECHANISM),
Map.entry(
SaslConfigs.SASL_JAAS_CONFIG,
"org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"\" password=\"\";")))

Thanks,
Evan

On Tue, Sep 13, 2022 at 10:30 AM John Casey  wrote:

> Hi Evan,
>
> I haven't seen this before. Can you share your Kafka write configuration,
> and any other stack traces that could be relevant?
>
> John
>
> On Tue, Sep 13, 2022 at 10:23 AM Evan Galpin  wrote:
>
>> Hi all,
>>
>> I've recently started using the KafkaIO connector as a sink, and am new
>> to Kafka in general.  My kafka clusters are hosted by Confluent Cloud.  I'm
>> using Beam SDK 2.41.0.  At least daily, the producers in my Beam pipeline
>> are getting stuck in a loop frantically logging this message:
>>
>> Node n disconnected.
>>
>> Resetting the last seen epoch of partition  to x since
>> the associated topicId changed from null to 
>>
>> Updating the running pipeline "resolves" the issue I believe as a result
>> of recreating the Kafka producer clients, but it seems that as-is the
>> KafkaIO producer clients are not resilient to node disconnects.  Might I be
>> missing a configuration option, or are there any known issues like this?
>>
>> Thanks,
>> Evan
>>
>


[troubleshooting] KafkaIO#write gets stuck "since the associated topicId changed from null to "

2022-09-13 Thread Evan Galpin
Hi all,

I've recently started using the KafkaIO connector as a sink, and am new to
Kafka in general.  My kafka clusters are hosted by Confluent Cloud.  I'm
using Beam SDK 2.41.0.  At least daily, the producers in my Beam pipeline
are getting stuck in a loop frantically logging this message:

Node n disconnected.

Resetting the last seen epoch of partition  to x since the
associated topicId changed from null to 

Updating the running pipeline "resolves" the issue I believe as a result of
recreating the Kafka producer clients, but it seems that as-is the KafkaIO
producer clients are not resilient to node disconnects.  Might I be missing
a configuration option, or are there any known issues like this?

Thanks,
Evan


Re: [question] Good Course to learn beam

2022-08-30 Thread Evan Galpin
+dev  for additional visibility/input

On Mon, Aug 29, 2022 at 11:10 AM Leandro Nahabedian via user <
user@beam.apache.org> wrote:

> Hi community!
>
> I'm looking for a good course to learn apache beam and I saw this one
> 
> which I believe is good, since it has very good feedback from students.
> What do you think?
>
> Thanks in advance for your help
>
> Cheers,
> Leandro
>
> --
> [image: dialpad] 
> __
> *Leandro Nahabedian *
> Data Engineer
> O: 908.883.4369
>
>


Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Evan Galpin
Hi Shivam,

When you say "merge the PCollections" do you mean Flatten, or somehow join?
CoGroupByKey[1] would be a good choice if you need to join based on key.
You would then be able to implement application logic to keep 1 of the 2
records if there is a way to decipher an element from CollectionA vs.
CollectionB by only examining the elements.

If there isn't a natural way of determining which element to keep by only
examining the elements themselves, you could further nest the data in a KV
ex. If CollectionA holds data like KV and CollectionB is KV
you could transform these into something like KV> and KV>. Then when you CoGroupByKey, these
elements would be grouped based on both having k1, and the source/origin
PCollection could be deciphered based on the key of the inner KV.

Thanks,
Evan

[1]
https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/

On Wed, Aug 10, 2022 at 3:25 PM Shivam Singhal 
wrote:

> I have two PCollections, CollectionA & CollectionB of type KV Byte[]>.
>
>
> I would like to merge them into one PCollection but CollectionA &
> CollectionB might have some elements with the same key. In those repeated
> cases, I would like to keep the element from CollectionA & drop the
> repeated element from CollectionB.
>
> Does anyone know a simple method to do this?
>
> Thanks,
> Shivam Singhal
>


Re: [Dataflow][Java][stateful] Workflow Failed when trying to introduce stateful RateLimit

2022-08-05 Thread Evan Galpin
Thanks for the suggestion Luke! Unfortunately it looks like v2 also fails
in the same way (pipeline does not start at all with "Workflow Failed"
error message).

Thanks,
Evan

On Fri, Aug 5, 2022 at 12:52 PM Luke Cwik via user 
wrote:

> You could try Dataflow Runner v2. The difference in the implementation may
> allow you to work around what is impacting the pipelines.
>
> On Fri, Aug 5, 2022 at 9:40 AM Evan Galpin  wrote:
>
>> Thanks Luke, I've opened a support case as well but thought it would be
>> prudent to ask here in case there was something obvious with the code.  Is
>> there any additional/optional validation that I can opt to use when
>> building and deploying the pipeline that might give hints? Otherwise I'll
>> just wait on the support case.
>>
>> Thanks,
>> Evan
>>
>> On Fri, Aug 5, 2022 at 11:22 AM Luke Cwik via user 
>> wrote:
>>
>>> I took a look at the code and nothing obvious stood out to me in the
>>> code as this is a ParDo with OnWindowExpiration. Just to make sure, the
>>> rate limit is per key and would only be a global rate limit if there was a
>>> single key.
>>>
>>> Are the workers trying to start?
>>> * If no, then you would need to open a support case and share some
>>> job ids so that someone could debug internal service logs.
>>> * If yes, then did the workers start successfully?
>>> ** If no, logs should have some details as to why the worker couldn't
>>> start.
>>> ** If yes, are the workers getting work items?
>>> *** If no, then you would need to open a support case and share some
>>> job ids so that someone could debug internal service logs.
>>> *** If yes then the logs should have some details as to why the work
>>> items are failing.
>>>
>>>
>>> On Fri, Aug 5, 2022 at 7:36 AM Evan Galpin  wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm trying to create a RateLimit[1] transform that's based fairly
>>>> heavily on GroupIntoBatches[2]. I've been able to run unit tests using
>>>> TestPipeline to verify desired behaviour and have also run successfully
>>>> using DirectRunner.  However, when I submit the same job to Dataflow it
>>>> completely fails to start and only gives the error message "Workflow
>>>> Failed." The job builds/uploads/submits without error, but never starts and
>>>> gives no detail as to why.
>>>>
>>>> Is there anything I can do to gain more insight about what is going
>>>> wrong?  I've included a gist of the RateLimit[1] code in case there is
>>>> anything obvious wrong there.
>>>>
>>>> Thanks in advance,
>>>> Evan
>>>>
>>>> [1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
>>>> [2]
>>>> https://github.com/apache/beam/blob/c8d92b03b6b2029978dbc2bf824240232c5e61ac/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
>>>>
>>>


Re: [Dataflow][Java][stateful] Workflow Failed when trying to introduce stateful RateLimit

2022-08-05 Thread Evan Galpin
Thanks Luke, I've opened a support case as well but thought it would be
prudent to ask here in case there was something obvious with the code.  Is
there any additional/optional validation that I can opt to use when
building and deploying the pipeline that might give hints? Otherwise I'll
just wait on the support case.

Thanks,
Evan

On Fri, Aug 5, 2022 at 11:22 AM Luke Cwik via user 
wrote:

> I took a look at the code and nothing obvious stood out to me in the code
> as this is a ParDo with OnWindowExpiration. Just to make sure, the rate
> limit is per key and would only be a global rate limit if there was a
> single key.
>
> Are the workers trying to start?
> * If no, then you would need to open a support case and share some job ids
> so that someone could debug internal service logs.
> * If yes, then did the workers start successfully?
> ** If no, logs should have some details as to why the worker couldn't
> start.
> ** If yes, are the workers getting work items?
> *** If no, then you would need to open a support case and share some
> job ids so that someone could debug internal service logs.
> *** If yes then the logs should have some details as to why the work items
> are failing.
>
>
> On Fri, Aug 5, 2022 at 7:36 AM Evan Galpin  wrote:
>
>> Hi all,
>>
>> I'm trying to create a RateLimit[1] transform that's based fairly heavily
>> on GroupIntoBatches[2]. I've been able to run unit tests using TestPipeline
>> to verify desired behaviour and have also run successfully using
>> DirectRunner.  However, when I submit the same job to Dataflow it
>> completely fails to start and only gives the error message "Workflow
>> Failed." The job builds/uploads/submits without error, but never starts and
>> gives no detail as to why.
>>
>> Is there anything I can do to gain more insight about what is going
>> wrong?  I've included a gist of the RateLimit[1] code in case there is
>> anything obvious wrong there.
>>
>> Thanks in advance,
>> Evan
>>
>> [1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
>> [2]
>> https://github.com/apache/beam/blob/c8d92b03b6b2029978dbc2bf824240232c5e61ac/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
>>
>


[Dataflow][Java][stateful] Workflow Failed when trying to introduce stateful RateLimit

2022-08-05 Thread Evan Galpin
Hi all,

I'm trying to create a RateLimit[1] transform that's based fairly heavily
on GroupIntoBatches[2]. I've been able to run unit tests using TestPipeline
to verify desired behaviour and have also run successfully using
DirectRunner.  However, when I submit the same job to Dataflow it
completely fails to start and only gives the error message "Workflow
Failed." The job builds/uploads/submits without error, but never starts and
gives no detail as to why.

Is there anything I can do to gain more insight about what is going wrong?
I've included a gist of the RateLimit[1] code in case there is anything
obvious wrong there.

Thanks in advance,
Evan

[1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
[2]
https://github.com/apache/beam/blob/c8d92b03b6b2029978dbc2bf824240232c5e61ac/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-08 Thread Evan Galpin
Ya fair enough, makes sense. I’ll reach out to GCP. Thanks Luke!

- Evan

On Fri, Jul 8, 2022 at 11:24 Luke Cwik  wrote:

> I was suggesting GCP support mainly because I don't think you want to
> share the 2.36 and 2.40 version of your job file publicly as someone
> familiar with the layout and format may spot a meaningful difference.
>
> Also, if it turns out that there is no meaningful difference between the
> two then the internal mechanics of how the graph is modified by Dataflow is
> not surfaced back to you in enough depth to debug further.
>
>
>
> On Fri, Jul 8, 2022 at 6:12 AM Evan Galpin  wrote:
>
>> Thanks for your response Luke :-)
>>
>> Updating in 2.36.0 works as expected, but as you alluded to I'm
>> attempting to update to the latest SDK; in this case there are no code
>> changes in the user code, only the SDK version.  Is GCP support the only
>> tool when it comes to deciphering the steps added by Dataflow?  I would
>> love to be able to inspect the complete graph with those extra steps like
>> "Unzipped-2/FlattenReplace" that aren't in the job file.
>>
>> Thanks,
>> Evan
>>
>> On Wed, Jul 6, 2022 at 4:21 PM Luke Cwik via user 
>> wrote:
>>
>>> Does doing a pipeline update in 2.36 work or do you want to do an update
>>> to get the latest version?
>>>
>>> Feel free to share the job files with GCP support. It could be something
>>> internal but the coders for ephemeral steps that Dataflow adds are based
>>> upon existing coders within the graph.
>>>
>>> On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:
>>>
>>>> +dev@
>>>>
>>>> Reviving this thread as it has hit me again on Dataflow.  I am trying
>>>> to upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally,
>>>> I received an error that the step "Flatten.pCollections" was missing from
>>>> the new job graph.  I knew from the code that that wasn't true, so I dumped
>>>> the job file via "--dataflowJobFile" for both the running pipeline and for
>>>> the new version I'm attempting to update to.  Both job files showed
>>>> identical data for the Flatten.pCollections step, which raises the question
>>>> of why that would have been reported as missing.
>>>>
>>>> Out of curiosity I then tried mapping the step to the same name, which
>>>> changed the error to:  "The Coder or type for step
>>>> Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
>>>> job files show identical coders for the Flatten step (though
>>>> "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
>>>> internal Dataflow thing?), so I'm confident that the coder hasn't actually
>>>> changed.
>>>>
>>>> I'm not sure how to proceed in updating the running pipeline, and I'd
>>>> really prefer not to drain.  Any ideas?
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>>
>>>> On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin 
>>>> wrote:
>>>>
>>>>> Thanks for the ideas Luke. I checked out the json graphs as per your
>>>>> recommendation (thanks for that, was previously unaware), and the
>>>>> "output_info" was identical for both the running pipeline and the pipeline
>>>>> I was hoping to update it with.  I ended up opting to just drain and 
>>>>> submit
>>>>> the updated pipeline as a new job.  Thanks for the tips!
>>>>>
>>>>> Thanks,
>>>>> Evan
>>>>>
>>>>> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>>>>>
>>>>>> I would suggest dumping the JSON representation (with the
>>>>>> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
>>>>>> and looking to see what is being submitted to Dataflow. Dataflow's JSON
>>>>>> graph representation is a bipartite graph where there are transform nodes
>>>>>> with inputs and outputs and PCollection nodes with no inputs or outputs.
>>>>>> The PCollection nodes typically end with the suffix ".out". This could 
>>>>>> help
>>>>>> find steps that have been added/removed/renamed.
>>>>>>
>>>>>> The PipelineDotRenderer[1] might be of use as well.
>>>>>>
>>&g

Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-08 Thread Evan Galpin
Thanks for your response Luke :-)

Updating in 2.36.0 works as expected, but as you alluded to I'm attempting
to update to the latest SDK; in this case there are no code changes in the
user code, only the SDK version.  Is GCP support the only tool when it
comes to deciphering the steps added by Dataflow?  I would love to be able
to inspect the complete graph with those extra steps like
"Unzipped-2/FlattenReplace" that aren't in the job file.

Thanks,
Evan

On Wed, Jul 6, 2022 at 4:21 PM Luke Cwik via user 
wrote:

> Does doing a pipeline update in 2.36 work or do you want to do an update
> to get the latest version?
>
> Feel free to share the job files with GCP support. It could be something
> internal but the coders for ephemeral steps that Dataflow adds are based
> upon existing coders within the graph.
>
> On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:
>
>> +dev@
>>
>> Reviving this thread as it has hit me again on Dataflow.  I am trying to
>> upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally, I
>> received an error that the step "Flatten.pCollections" was missing from the
>> new job graph.  I knew from the code that that wasn't true, so I dumped the
>> job file via "--dataflowJobFile" for both the running pipeline and for the
>> new version I'm attempting to update to.  Both job files showed identical
>> data for the Flatten.pCollections step, which raises the question of why
>> that would have been reported as missing.
>>
>> Out of curiosity I then tried mapping the step to the same name, which
>> changed the error to:  "The Coder or type for step
>> Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
>> job files show identical coders for the Flatten step (though
>> "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
>> internal Dataflow thing?), so I'm confident that the coder hasn't actually
>> changed.
>>
>> I'm not sure how to proceed in updating the running pipeline, and I'd
>> really prefer not to drain.  Any ideas?
>>
>> Thanks,
>> Evan
>>
>>
>> On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin 
>> wrote:
>>
>>> Thanks for the ideas Luke. I checked out the json graphs as per your
>>> recommendation (thanks for that, was previously unaware), and the
>>> "output_info" was identical for both the running pipeline and the pipeline
>>> I was hoping to update it with.  I ended up opting to just drain and submit
>>> the updated pipeline as a new job.  Thanks for the tips!
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>>>
>>>> I would suggest dumping the JSON representation (with the
>>>> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
>>>> and looking to see what is being submitted to Dataflow. Dataflow's JSON
>>>> graph representation is a bipartite graph where there are transform nodes
>>>> with inputs and outputs and PCollection nodes with no inputs or outputs.
>>>> The PCollection nodes typically end with the suffix ".out". This could help
>>>> find steps that have been added/removed/renamed.
>>>>
>>>> The PipelineDotRenderer[1] might be of use as well.
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>>>>
>>>> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm looking for any help regarding updating streaming jobs which are
>>>>> already running on Dataflow.  Specifically I'm seeking guidance for
>>>>> situations where Fusion is involved, and trying to decipher which old 
>>>>> steps
>>>>> should be mapped to which new steps.
>>>>>
>>>>> I have a case where I updated the steps which come after the step in
>>>>> question, but when I attempt to update there is an error that "
>>>>> no longer produces data to the steps ". I believe that
>>>>>  is only changed as a result of fusion, and in reality it does 
>>>>> in
>>>>> fact produce data to  (confirmed when deployed as a new
>>>>> job for testing purposes).
>>>>>
>>>>> Is there a guide for how to deal with updates and fusion?
>>>>>
>>>>> Thanks,
>>>>> Evan
>>>>>
>>>>


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-05 Thread Evan Galpin
+dev@

Reviving this thread as it has hit me again on Dataflow.  I am trying to
upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally, I
received an error that the step "Flatten.pCollections" was missing from the
new job graph.  I knew from the code that that wasn't true, so I dumped the
job file via "--dataflowJobFile" for both the running pipeline and for the
new version I'm attempting to update to.  Both job files showed identical
data for the Flatten.pCollections step, which raises the question of why
that would have been reported as missing.

Out of curiosity I then tried mapping the step to the same name, which
changed the error to:  "The Coder or type for step
Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
job files show identical coders for the Flatten step (though
"Unzipped-2/FlattenReplace" is not present in the job file, maybe an
internal Dataflow thing?), so I'm confident that the coder hasn't actually
changed.

I'm not sure how to proceed in updating the running pipeline, and I'd
really prefer not to drain.  Any ideas?

Thanks,
Evan


On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin  wrote:

> Thanks for the ideas Luke. I checked out the json graphs as per your
> recommendation (thanks for that, was previously unaware), and the
> "output_info" was identical for both the running pipeline and the pipeline
> I was hoping to update it with.  I ended up opting to just drain and submit
> the updated pipeline as a new job.  Thanks for the tips!
>
> Thanks,
> Evan
>
> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>
>> I would suggest dumping the JSON representation (with the
>> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
>> and looking to see what is being submitted to Dataflow. Dataflow's JSON
>> graph representation is a bipartite graph where there are transform nodes
>> with inputs and outputs and PCollection nodes with no inputs or outputs.
>> The PCollection nodes typically end with the suffix ".out". This could help
>> find steps that have been added/removed/renamed.
>>
>> The PipelineDotRenderer[1] might be of use as well.
>>
>> 1:
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>>
>> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm looking for any help regarding updating streaming jobs which are
>>> already running on Dataflow.  Specifically I'm seeking guidance for
>>> situations where Fusion is involved, and trying to decipher which old steps
>>> should be mapped to which new steps.
>>>
>>> I have a case where I updated the steps which come after the step in
>>> question, but when I attempt to update there is an error that "
>>> no longer produces data to the steps ". I believe that
>>>  is only changed as a result of fusion, and in reality it does in
>>> fact produce data to  (confirmed when deployed as a new
>>> job for testing purposes).
>>>
>>> Is there a guide for how to deal with updates and fusion?
>>>
>>> Thanks,
>>> Evan
>>>
>>


Re: Chained Job Graph Apache Beam | Dataflow

2022-06-15 Thread Evan Galpin
It may also be helpful to explore CoGroupByKey as a way of joining data,
though depending on the shape of the data doing so may not fit in mem:
https://beam.apache.org/documentation/transforms/java/aggregation/cogroupbykey/

- Evan

On Wed, Jun 15, 2022 at 3:45 PM Bruno Volpato 
wrote:

> Hello,
>
> I am not sure what is the context behind your join, but I just wanted to
> point out that Beam SQL [1] or the Join-library extension [2] may be
> helpful in your scenario to avoid changing semantics or the need to
> orchestrate your jobs outside Beam.
>
> [1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/
> [2] https://beam.apache.org/documentation/sdks/java-extensions/
>
>
> Best,
> Bruno
>
>
>
> On Wed, Jun 15, 2022 at 3:35 PM Jack McCluskey 
> wrote:
>
>> Hey Ravi,
>>
>> The problem you're running into is that the act of writing data to a
>> table and reading from it are not joined actions in the Beam model. There's
>> no connecting PCollection tying those together, so they are split and run
>> in parallel. If you want to do this and need the data written to C, you
>> should re-use the PCollection written to C in your filtering step instead
>> of reading the data from C again. That should produce the graph you're
>> looking for in a batch context.
>>
>> Thanks,
>>
>> Jack McCluskey
>>
>> On Wed, Jun 15, 2022 at 3:30 PM Ravi Kapoor 
>> wrote:
>>
>>> FYI
>>>
>>> On Thu, Jun 16, 2022 at 12:56 AM Ravi Kapoor 
>>> wrote:
>>>
 Hi Daniel,

 I have a use case where I join two tables say A and B and write the
 joined Collection to C.
 Then I would like to filter some records on C and put it to another
 table say D.
 So, the pipeline on Dataflow UI should look like this right?

 A
\
 C -> D
/
 B

 However, the pipeline is writing C -> D in parallel.
 How can this pipeline run in parallel as data has not been pushed yet
 to C by the previous pipeline?

 Even when I ran this pipeline, Table D did not get any records inserted
 as well which is apparent.
 Can you help me with this use case?

 Thanks,
 Ravi



 On Tue, Jun 14, 2022 at 9:01 PM Daniel Collins 
 wrote:

> Can you speak to what specifically you want to be different? The job
> graph you see, with the A -> B and B-> C being separate is an accurate
> reflection of your pipeline. table_B is outside of the beam model, by
> pushing your data there, Dataflow has no ability to identify that no
> manipulation of data is happening at table_B.
>
> If you want to just process data from A to destinations D and E, while
> writing an intermediate output to table_B, you should just remove the read
> from table B and modify table_A_records again directly. If that is not 
> what
> you want, you would need to explain more specifically what you want that 
> is
> different. Is it a pure UI change? Is it a functional change?
>
> -Daniel
>
> On Tue, Jun 14, 2022 at 11:12 AM Ravi Kapoor 
> wrote:
>
>> Team,
>> Any update on this?
>>
>> On Mon, Jun 13, 2022 at 8:39 PM Ravi Kapoor 
>> wrote:
>>
>>> Hi Team,
>>>
>>> I am currently using Beam in my project with Dataflow Runner.
>>> I am trying to create a pipeline where the data flows from the
>>> source to staging then to target such as:
>>>
>>> A (Source) -> B(Staging) -> C (Target)
>>>
>>> When I create a pipeline as below:
>>>
>>> PCollection table_A_records = 
>>> p.apply(BigQueryIO.readTableRows()
>>> .from("project:dataset.table_A"));
>>>
>>> table_A_records.apply(BigQueryIO.writeTableRows().
>>> to("project:dataset.table_B")
>>> 
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>> 
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>
>>> PCollection table_B_records = 
>>> p.apply(BigQueryIO.readTableRows()
>>> .from("project:dataset.table_B"));
>>> table_B_records.apply(BigQueryIO.writeTableRows().
>>> to("project:dataset.table_C")
>>> 
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>> 
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>> p.run().waitUntilFinish();
>>>
>>>
>>> It basically creates two parallel job graphs in dataflow instead
>>> creating a transformation as expected:
>>> A -> B
>>> B -> C
>>> I needed to create data pipeline which flows the data in chain like:
>>>  D
>>>/
>>> A -> B -> C
>>>   \
>>> E
>>> Is there a way to achieve this transformation in between source and
>>> target tables?
>>>
>>> Thanks,
>>> Ravi
>>>
>>

Re: [Question] Is message order preserved in windows?

2022-05-17 Thread Evan Galpin
Hi Gyorgy,

To my knowledge, ordering of elements is not guaranteed. To order your
elements in each window by time, you would need to sort the iterables
produced by the groupByKey. You might use SortValues[1] to do that.

[1]
https://beam.apache.org/documentation/sdks/java-extensions/#example-usage-of-sortvalues

Thanks,
Evan


On Tue, May 17, 2022 at 04:35 Balogh, György  wrote:

> Hi,
>
> I have video analytic data:
> - time
> - object type (car, person, etc)
> - object bounding box (x,y,width,height)
> - track id
>
> I'd like to do transformations where I need to process detections
> belonging to the same track in time order.
>
> My input data is ordered by time. I do some filtering first (eg.: filter
> cars) than apply sliding windows (eg.: 10 sec window size, with 5 sec step)
> and group by track id.
>
> Can we assume anything about the order of events in the windows? Is the
> order preserved? I need to process them in time order, if the order is not
> preserved what would be the best approach? I can of course collect and sort
> the data but it has significant performance and memory impact. (I need to
> process events on the billion scale).
>
> Thank you,
> Gyorgy
>
>
> --
>
> György Balogh
> CEO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209
> 
> .
> W www.ultinous.com
>


Re: Trino Runner

2022-03-30 Thread Evan Galpin
Though this might not be what's being asked, might it be possible to
integrate Presto using the JDBC driver[1] along with Beam's JdbcIO[2] ?

[1] https://prestodb.io/docs/current/installation/jdbc.html
[2]
https://beam.apache.org/releases/javadoc/2.37.0/org/apache/beam/sdk/io/jdbc/JdbcIO.html

- Evan

On Wed, Mar 30, 2022 at 1:35 PM Pablo Estrada  wrote:

> Hi Luning!
> I am not aware of any efforts so far to support these. Perhaps worth
> checking in their community if there's interest to work with Beam.
> Thanks!
> -P.
>
> On Wed, Mar 30, 2022 at 5:04 AM LuNing Wang  wrote:
>
>> Hi,
>>
>> Have any plans to support Trino/Pesto runner?
>>
>> Best,
>> LuNing Wang
>>
>


Re: [Python] Heterogeneous TaggedOutput Type Hints

2022-03-23 Thread Evan Galpin
Thanks for the update!  I also was not able to repro, so presumably
something is fixed? :-)

Thanks,
Evan

On Mon, Mar 21, 2022 at 8:40 PM Valentyn Tymofieiev 
wrote:

> I came across this thread and wasn't able to reproduce the `expecting a KV
> coder, but had Strings` error, so hopefully that's fixed now. I had to
> modify the repro to add .with_outputs() to the line 49 in
> https://gist.github.com/egalpin/2d6ad2210cf9f66108ff48a9c7566ebc
>
> On Mon, Sep 27, 2021 at 5:58 PM Robert Bradshaw 
> wrote:
>
>> As a workaround, can you try passing the use_portable_job_submission
>> experiment?
>>
>> On Mon, Sep 27, 2021 at 2:19 PM Luke Cwik  wrote:
>> >
>> > Sorry, I forgot that you had a minimal repro for this issue, I attached
>> details to the internal bug.
>> >
>> > On Mon, Sep 27, 2021 at 2:18 PM Luke Cwik  wrote:
>> >>
>> >> There is an internal bug 195053987 that matches what you're describing
>> but we were unable able to get a minimal repro for it. It would be useful
>> if you had a minimal repro for the issue that I could update the internal
>> bug with details and/or you could reach out to GCP support with job ids
>> and/or minimal repros to get support as well.
>> >>
>> >> On Wed, Sep 22, 2021 at 6:57 AM Evan Galpin 
>> wrote:
>> >>>
>> >>> Thanks for the response Luke :-)
>> >>>
>> >>> I did try setting .element_type for each resulting PCollection
>> using "apache_beam.typehints.typehints.KV" to describe the elements, which
>> passed type checking.  I also ran the full dataset (batch job) without the
>> GBK in question but instead using a dummy DoFn in its place which asserted
>> that every element that would be going into the GBK was a 2-tuple, along
>> with using --runtime_type_check, all of which run successfully without the
>> GBK after the TaggedOutput DoFn.
>> >>>
>> >>> Adding back the GBK also runs end-to-end successfully on the
>> DirectRunner using the identical dataset.  But as soon as I add the GBK and
>> use the DataflowRunner (v2), I get errors as soon as the optimized step
>> involving the GBK is in the "running" status:
>> >>>
>> >>> - "Could not start worker docker container"
>> >>> - "Error syncing pod"
>> >>> - "Check failed: pair_coder Strings" or "Check failed: kv_coder :
>> expecting a KV coder, but had Strings"
>> >>>
>> >>> Anything further to try? I can also provide Job IDs from Dataflow if
>> helpful (and safe to share).
>> >>>
>> >>> Thanks,
>> >>> Evan
>> >>>
>> >>> On Wed, Sep 22, 2021 at 1:09 AM Luke Cwik  wrote:
>> >>>>
>> >>>> Have you tried setting the element_type[1] explicitly on each output
>> PCollection that is returned after applying the multi-output ParDo?
>> >>>> I believe you'll get a DoOutputsTuple[2] returned after applying the
>> mult-output ParDo which allows access to the underlying PCollection objects.
>> >>>>
>> >>>> 1:
>> https://github.com/apache/beam/blob/ebf2aacf37b97fc85b167271f184f61f5b06ddc3/sdks/python/apache_beam/pvalue.py#L99
>> >>>> 2:
>> https://github.com/apache/beam/blob/ebf2aacf37b97fc85b167271f184f61f5b06ddc3/sdks/python/apache_beam/pvalue.py#L234
>> >>>>
>> >>>> On Tue, Sep 21, 2021 at 10:29 AM Evan Galpin 
>> wrote:
>> >>>>>
>> >>>>> This is badly plaguing a pipeline I'm currently developing, where
>> the exact same data set and code runs end-to-end on DirectRunner, but fails
>> on DataflowRunner with either "Check failed: kv_coder : expecting a KV
>> coder, but had Strings" or "Check failed: pair_coder Strings" hidden in the
>> harness logs. It seems to be consistently repeatable with any TaggedOutput
>> + GBK afterwards.
>> >>>>>
>> >>>>> Any advice on how to proceed?
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Evan
>> >>>>>
>> >>>>> On Fri, Sep 17, 2021 at 11:20 AM Evan Galpin 
>> wrote:
>> >>>>>>
>> >>>>>> The Dataflow error logs only showed 1 error which was:  "The job
>> failed because a work item has failed 4 times. Look in previous log entries
>> for the cause of each one of the 4 failures.

Re: Running Query Containing Aggregation Using ElasticsearchIO Read

2022-03-09 Thread Evan Galpin
Great idea Nick, it could definitely work to transform the data within
Elasticsearch first and then effectively run a "match_all" against the
transformed index within Beam.  Agreed that it's not ideal from a
user-experience perspective but hopefully serves as another potential path
to unblocking your development.

I'll also put some thought into how your requested functionality might be
able to be supported within ElasticsearchIO.  Feel free to open a ticket in
Jira[1] to track the idea :-)

[1] https://issues.apache.org/jira/projects/BEAM/summary

On Wed, Mar 9, 2022 at 3:18 PM Nick Pan  wrote:

> Hi Evan,
>
> Thanks for the info.  Since I need to support many aggregation types,
> implementing all the group-by logic in PTransform can be a lot of work.
> But I will look into the 2nd approach.
> Another approach I am considering is to directly kick off a single batch
> transform to group/aggregate/filter the data in Elasticsearch, so the
> workers will only need to search the result without needing to run
> aggregation.  But this can get ugly too because the workers need to wait
> for the Elasticsearch transform to finish.
>
> Nick
>
> On Wed, Mar 9, 2022 at 6:53 AM Evan Galpin  wrote:
>
>> You're correct in your assessment that ElasticsearchIO does not currently
>> support queries with aggregations.  There's a large difference between
>> scrolling over large sets of documents (which has a common interface
>> provided by ES) Vs aggregations where user-code in the query will impact
>> the output fields.  Another major difference is that queries with aggs
>> effectively have a singular output – the result of the aggs – rather than a
>> potentially huge number of documents as "hits".  However, aggs could
>> produce thousands of buckets where each bucket is desired to be an element
>> in a PCollection, so it could be useful to support this use-case in the
>> future.
>>
>> In order to work around this, there are 2 options that jump to mind:
>>
>>1. Transfer the filtering logic to Beam:  Read all the documents that
>>you might be interested in using ElasticsearchIO with just a search (no
>>aggs) and implement the grouping per user + filtering within Beam using
>>PTransforms
>>2. Write your own Source (SplittableDoFn?) which makes your above
>>query (with aggs) using the Elasticsearch RestClient, parses the response,
>>and outputs the parsed elements one-by-one to a PCollection for further
>>processing. This would effectively be the approach to supporting your
>>desired functionality within ElasticsearchIO as well.
>>
>> - Evan
>>
>> On Wed, Mar 9, 2022 at 3:52 AM Nick Pan  wrote:
>>
>>> Hello,
>>>
>>> I have a use case where I need to first compute an aggregation for each
>>> key, and then filter out the keys based on some criteria.   And finally
>>> feed the matched keys as an input to PCollection using ElasticsearchIO
>>> read.  But ElasticsearchIO does not seem to support query that contains
>>> aggregation:
>>>
>>> Error message from worker: org.elasticsearch.client.ResponseException:
>>> method [GET], host [https://...], URI [...], status line [HTTP/1.1 400
>>> Bad Request]
>>> {"error":{"root_cause":[{"type":"parsing_exception","reason":"request does
>>> not support
>>> [aggregations]","line":1,"col":135}],"type":"parsing_exception","reason":"request
>>> does not support [aggregations]","line":1,"col":135},"status":400}
>>> org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
>>> org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
>>> org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.queryCount(ElasticsearchIO.java:780)
>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:762)
>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.split(ElasticsearchIO.java:710)
>>>
>>>
>>> Here is an example of the Elasticsearch query I am trying to do:
>>>
>>> {
>>>
>>>  "aggs": {
>>>
>>> "user_id": {
>>>
>>>   "composite": {
>>>
>>> "sources": [
>>>
>>>   { "user_id

Re: Running Query Containing Aggregation Using ElasticsearchIO Read

2022-03-09 Thread Evan Galpin
You're correct in your assessment that ElasticsearchIO does not currently
support queries with aggregations.  There's a large difference between
scrolling over large sets of documents (which has a common interface
provided by ES) Vs aggregations where user-code in the query will impact
the output fields.  Another major difference is that queries with aggs
effectively have a singular output – the result of the aggs – rather than a
potentially huge number of documents as "hits".  However, aggs could
produce thousands of buckets where each bucket is desired to be an element
in a PCollection, so it could be useful to support this use-case in the
future.

In order to work around this, there are 2 options that jump to mind:

   1. Transfer the filtering logic to Beam:  Read all the documents that
   you might be interested in using ElasticsearchIO with just a search (no
   aggs) and implement the grouping per user + filtering within Beam using
   PTransforms
   2. Write your own Source (SplittableDoFn?) which makes your above query
   (with aggs) using the Elasticsearch RestClient, parses the response, and
   outputs the parsed elements one-by-one to a PCollection for further
   processing. This would effectively be the approach to supporting your
   desired functionality within ElasticsearchIO as well.

- Evan

On Wed, Mar 9, 2022 at 3:52 AM Nick Pan  wrote:

> Hello,
>
> I have a use case where I need to first compute an aggregation for each
> key, and then filter out the keys based on some criteria.   And finally
> feed the matched keys as an input to PCollection using ElasticsearchIO
> read.  But ElasticsearchIO does not seem to support query that contains
> aggregation:
>
> Error message from worker: org.elasticsearch.client.ResponseException:
> method [GET], host [https://...], URI [...], status line [HTTP/1.1 400
> Bad Request]
> {"error":{"root_cause":[{"type":"parsing_exception","reason":"request does
> not support
> [aggregations]","line":1,"col":135}],"type":"parsing_exception","reason":"request
> does not support [aggregations]","line":1,"col":135},"status":400}
> org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
> org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
> org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.queryCount(ElasticsearchIO.java:780)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:762)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.split(ElasticsearchIO.java:710)
>
>
> Here is an example of the Elasticsearch query I am trying to do:
>
> {
>
>  "aggs": {
>
> "user_id": {
>
>   "composite": {
>
> "sources": [
>
>   { "user_id": { "terms": { "field": "user_id" } } }
>
> ]
>
>   },
>
>   "aggs": {
>
> "min": {
>
>   "min": {
>
> "field": "play_time"
>
>   }
>
> },
>
> "max": {
>
>   "max": {
>
> "field": "play_time"
>
>   }
>
> },
>
> "diff": {
>
>   "bucket_selector": {
>
> "buckets_path": {
>
>   "min": "min",
>
>   "max": "max"
>
> },
>
> "script": "params.max - params.min > 5000"
>
>   }
>
> }
>
>   }
>
> }
>
>   }
>
> }
>
>
>
> Is Elasticsearch query that contains aggregation not supported in
> ElasticsearchIO?  If not, is there a way to work around this?
>
>
> Thanks,
>
> Nick
>


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2021-10-22 Thread Evan Galpin
Thanks for the ideas Luke. I checked out the json graphs as per your
recommendation (thanks for that, was previously unaware), and the
"output_info" was identical for both the running pipeline and the pipeline
I was hoping to update it with.  I ended up opting to just drain and submit
the updated pipeline as a new job.  Thanks for the tips!

Thanks,
Evan

On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:

> I would suggest dumping the JSON representation (with the
> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
> and looking to see what is being submitted to Dataflow. Dataflow's JSON
> graph representation is a bipartite graph where there are transform nodes
> with inputs and outputs and PCollection nodes with no inputs or outputs.
> The PCollection nodes typically end with the suffix ".out". This could help
> find steps that have been added/removed/renamed.
>
> The PipelineDotRenderer[1] might be of use as well.
>
> 1:
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>
> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
> wrote:
>
>> Hi all,
>>
>> I'm looking for any help regarding updating streaming jobs which are
>> already running on Dataflow.  Specifically I'm seeking guidance for
>> situations where Fusion is involved, and trying to decipher which old steps
>> should be mapped to which new steps.
>>
>> I have a case where I updated the steps which come after the step in
>> question, but when I attempt to update there is an error that "
>> no longer produces data to the steps ". I believe that
>>  is only changed as a result of fusion, and in reality it does in
>> fact produce data to  (confirmed when deployed as a new
>> job for testing purposes).
>>
>> Is there a guide for how to deal with updates and fusion?
>>
>> Thanks,
>> Evan
>>
>


[Dataflow][Java] Guidance on Transform Mapping Streaming Update

2021-10-21 Thread Evan Galpin
Hi all,

I'm looking for any help regarding updating streaming jobs which are
already running on Dataflow.  Specifically I'm seeking guidance for
situations where Fusion is involved, and trying to decipher which old steps
should be mapped to which new steps.

I have a case where I updated the steps which come after the step in
question, but when I attempt to update there is an error that "
no longer produces data to the steps ". I believe that
 is only changed as a result of fusion, and in reality it does in
fact produce data to  (confirmed when deployed as a new
job for testing purposes).

Is there a guide for how to deal with updates and fusion?

Thanks,
Evan


Re: coder in ReadFromBigQuery doesn't do "anything"

2021-10-17 Thread Evan Galpin
Is “the result” being printed or viewed via debugger? Is there a chance
that the __repr__ or similar method for proto produces a dict strictly for
printing/serialization?

Thanks,
Evan

On Sun, Oct 17, 2021 at 01:50 Mark Striebeck 
wrote:

> Hi,
>
> I have the following BigQuery table:
>
> Name: DailyVwap
> Field name Type
> TIMESTAMP   DATE
> SYMBOL_NAME STRING
> DAILY_VWAP  STRING
> inserted_time   TIMESTAMP
>
> I want to read it into the following proto:
> message DailyVwap {
> google.protobuf.Timestamp TIMESTAMP = 1;
> string SYMBOL_NAME = 2;
> float DAILY_VWAP = 3;
> google.protobuf.Timestamp inserted_time = 4;
> }
>
> This is the call that I make in my pipeline:
> ReadFromBigQuery(query='SELECT * FROM `my_project.my_dataset.DailyVwap`',
> use_standard_sql=True,
> project='my_project',
> coder=beam.coders.ProtoCoder(DailyVwap().__class__),
> gcs_location=temp_location)
>
> But the result is always a dictionary in the form:
> {'TIMESTAMP': datetime.date(2021, 9, 17), 'SYMBOL_NAME': 'AACIU',
> 'DAILY_VWAP': 'null', 'inserted_time': datetime.datetime(2021, 9, 27, 16,
> 45, 33, 779000, tzinfo=datetime.timezone.utc)}
>
> With or without the coder in the call. No error message or warning in the
> logs.
>
> Any help our pointer appreciated!
>
> Thanks
>  Mark
>


Re: Limit the concurrency of a Beam Step (or all the steps)

2021-09-24 Thread Evan Galpin
This has been mentioned a few times and seems to me to be a fairly common
requirement.

I think that a rate limit could be accomplished through stateful
processing, using a combination of bagState and Timers.
GroupIntoBatches.java would be a good example.

I wonder if this would be a good built-in transform given the number of
times it’s come up 🙂

Thanks,
Evan

On Fri, Sep 24, 2021 at 05:29 Sofia’s World  wrote:

> Hello
>  i was wondering if it's somehow possible to limit the concurrency of a
> beam Step?
> i have a workflow which involves a Webclient that uses an API for which
> my account has a max of 300/requests per minute...
> Alternatively, will i have to go through a combine and custom ParDo ?
>
> Has anyone came across this Usecase>?
>
> kind regards
> Marco
>


Re: [Python] Heterogeneous TaggedOutput Type Hints

2021-09-22 Thread Evan Galpin
Thanks for the response Luke :-)

I did try setting .element_type for each resulting PCollection using
"apache_beam.typehints.typehints.KV" to describe the elements, which passed
type checking.  I also ran the full dataset (batch job) without the GBK in
question but instead using a dummy DoFn in its place which asserted that
every element that would be going into the GBK was a 2-tuple, along with
using --runtime_type_check, all of which run successfully without the GBK
after the TaggedOutput DoFn.

Adding back the GBK also runs end-to-end successfully on the DirectRunner
using the identical dataset.  But as soon as I add the GBK and use the
DataflowRunner (v2), I get errors as soon as the optimized step involving
the GBK is in the "running" status:

- "Could not start worker docker container"
- "Error syncing pod"
- "Check failed: pair_coder Strings" or "Check failed: kv_coder : expecting
a KV coder, but had Strings"

Anything further to try? I can also provide Job IDs from Dataflow if
helpful (and safe to share).

Thanks,
Evan

On Wed, Sep 22, 2021 at 1:09 AM Luke Cwik  wrote:

> Have you tried setting the element_type[1] explicitly on each output
> PCollection that is returned after applying the multi-output ParDo?
> I believe you'll get a DoOutputsTuple[2] returned after applying the
> mult-output ParDo which allows access to the underlying PCollection objects.
>
> 1:
> https://github.com/apache/beam/blob/ebf2aacf37b97fc85b167271f184f61f5b06ddc3/sdks/python/apache_beam/pvalue.py#L99
> 2:
> https://github.com/apache/beam/blob/ebf2aacf37b97fc85b167271f184f61f5b06ddc3/sdks/python/apache_beam/pvalue.py#L234
>
> On Tue, Sep 21, 2021 at 10:29 AM Evan Galpin 
> wrote:
>
>> This is badly plaguing a pipeline I'm currently developing, where the
>> exact same data set and code runs end-to-end on DirectRunner, but fails on
>> DataflowRunner with either "Check failed: kv_coder : expecting a KV
>> coder, but had Strings" or "Check failed: pair_coder Strings" hidden in the
>> harness logs. It seems to be consistently repeatable with any TaggedOutput
>> + GBK afterwards.
>>
>> Any advice on how to proceed?
>>
>> Thanks,
>> Evan
>>
>> On Fri, Sep 17, 2021 at 11:20 AM Evan Galpin 
>> wrote:
>>
>>> The Dataflow error logs only showed 1 error which was:  "The job failed
>>> because a work item has failed 4 times. Look in previous log entries for
>>> the cause of each one of the 4 failures. For more information, see
>>> https://cloud.google.com/dataflow/docs/guides/common-errors. The work
>>> item was attempted on these workers: beamapp--X-kt85-harness-8k2c
>>> Root cause: The worker lost contact with the service."  In "Diagnostics"
>>> there were errors stating "Error syncing pod: Could not start worker docker
>>> container".  The harness logs i.e. "projects/my-project/logs/
>>> dataflow.googleapis.com%2Fharness" finally contained an error that
>>> looked suspect, which was "Check failed: kv_coder : expecting a KV
>>> coder, but had Strings", below[1] is a link to possibly a stacktrace or
>>> extra detail, but is internal to google so I don't have access.
>>>
>>> [1]
>>> https://symbolize.corp.google.com/r/?trace=55a197abcf56,55a197abbe33,55a197abb97e,55a197abd708,55a196d4e22f,55a196d4d8d3,55a196d4da35,55a1967ec247,55a196f62b26,55a1968969b3,55a196886613,55a19696b0e6,55a196969815,55a1969693eb,55a19696916e,55a1969653bc,55a196b0150a,55a196b04e11,55a1979fc8df,7fe7736674e7,7fe7734dc22c&map=13ddc0ac8b57640c29c5016eb26ef88e:55a1956e7000-55a197bd5010,f1c96c67b57b74a4d7050f34aca016eef674f765:7fe77366-7fe773676dac,76b955c7af655a4c1e53b8d4aaa0255f3721f95f:7fe7734a5000-7fe7736464c4
>>>
>>> On Thu, Sep 9, 2021 at 6:46 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> Huh, that's strange. Yes, the exact error on the service would be
>>>> helpful.
>>>>
>>>> On Wed, Sep 8, 2021 at 10:12 AM Evan Galpin 
>>>> wrote:
>>>> >
>>>> > Thanks for the response. I've created a gist here to demonstrate a
>>>> minimal repro:
>>>> https://gist.github.com/egalpin/2d6ad2210cf9f66108ff48a9c7566ebc
>>>> >
>>>> > It seemed to run fine both on DirectRunner and PortableRunner (embed
>>>> mode), but Dataflow v2 runner raised an error at runtime seemingly
>>>> associated with the Shuffle service?  I have job IDs and trace links if
>>>> those are helpful as well.
>>>> >
>>>> > Thanks,
>>>>

Re: [Python] Heterogeneous TaggedOutput Type Hints

2021-09-21 Thread Evan Galpin
This is badly plaguing a pipeline I'm currently developing, where the exact
same data set and code runs end-to-end on DirectRunner, but fails on
DataflowRunner with either "Check failed: kv_coder : expecting a KV coder,
but had Strings" or "Check failed: pair_coder Strings" hidden in the
harness logs. It seems to be consistently repeatable with any TaggedOutput
+ GBK afterwards.

Any advice on how to proceed?

Thanks,
Evan

On Fri, Sep 17, 2021 at 11:20 AM Evan Galpin  wrote:

> The Dataflow error logs only showed 1 error which was:  "The job failed
> because a work item has failed 4 times. Look in previous log entries for
> the cause of each one of the 4 failures. For more information, see
> https://cloud.google.com/dataflow/docs/guides/common-errors. The work
> item was attempted on these workers: beamapp--X-kt85-harness-8k2c
> Root cause: The worker lost contact with the service."  In "Diagnostics"
> there were errors stating "Error syncing pod: Could not start worker docker
> container".  The harness logs i.e. "projects/my-project/logs/
> dataflow.googleapis.com%2Fharness" finally contained an error that looked
> suspect, which was "Check failed: kv_coder : expecting a KV coder, but
> had Strings", below[1] is a link to possibly a stacktrace or extra detail,
> but is internal to google so I don't have access.
>
> [1]
> https://symbolize.corp.google.com/r/?trace=55a197abcf56,55a197abbe33,55a197abb97e,55a197abd708,55a196d4e22f,55a196d4d8d3,55a196d4da35,55a1967ec247,55a196f62b26,55a1968969b3,55a196886613,55a19696b0e6,55a196969815,55a1969693eb,55a19696916e,55a1969653bc,55a196b0150a,55a196b04e11,55a1979fc8df,7fe7736674e7,7fe7734dc22c&map=13ddc0ac8b57640c29c5016eb26ef88e:55a1956e7000-55a197bd5010,f1c96c67b57b74a4d7050f34aca016eef674f765:7fe77366-7fe773676dac,76b955c7af655a4c1e53b8d4aaa0255f3721f95f:7fe7734a5000-7fe7736464c4
>
> On Thu, Sep 9, 2021 at 6:46 PM Robert Bradshaw 
> wrote:
>
>> Huh, that's strange. Yes, the exact error on the service would be helpful.
>>
>> On Wed, Sep 8, 2021 at 10:12 AM Evan Galpin 
>> wrote:
>> >
>> > Thanks for the response. I've created a gist here to demonstrate a
>> minimal repro:
>> https://gist.github.com/egalpin/2d6ad2210cf9f66108ff48a9c7566ebc
>> >
>> > It seemed to run fine both on DirectRunner and PortableRunner (embed
>> mode), but Dataflow v2 runner raised an error at runtime seemingly
>> associated with the Shuffle service?  I have job IDs and trace links if
>> those are helpful as well.
>> >
>> > Thanks,
>> > Evan
>> >
>> > On Tue, Sep 7, 2021 at 4:35 PM Robert Bradshaw 
>> wrote:
>> >>
>> >> This is not yet supported. Using a union for now is the way to go. (If
>> >> only the last value of the union was used, that sounds like a bug. Do
>> >> you have a minimal repro?)
>> >>
>> >> On Tue, Sep 7, 2021 at 1:23 PM Evan Galpin 
>> wrote:
>> >> >
>> >> > Hi all,
>> >> >
>> >> > What is the recommended way to write type hints for a tagged output
>> DoFn where the outputs to different tags have different types?
>> >> >
>> >> > I tried using a Union to describe each of the possible output types,
>> but that resulted in mismatched coder errors where only the last entry in
>> the Union was used as the assumed type.  Is there a way to associate a type
>> hint to a tag or something like that?
>> >> >
>> >> > Thanks,
>> >> > Evan
>>
>


Re: [Python] Heterogeneous TaggedOutput Type Hints

2021-09-17 Thread Evan Galpin
The Dataflow error logs only showed 1 error which was:  "The job failed
because a work item has failed 4 times. Look in previous log entries for
the cause of each one of the 4 failures. For more information, see
https://cloud.google.com/dataflow/docs/guides/common-errors. The work item
was attempted on these workers: beamapp--X-kt85-harness-8k2c Root
cause: The worker lost contact with the service."  In "Diagnostics" there
were errors stating "Error syncing pod: Could not start worker docker
container".  The harness logs i.e. "projects/my-project/logs/
dataflow.googleapis.com%2Fharness" finally contained an error that looked
suspect, which was "Check failed: kv_coder : expecting a KV coder, but had
Strings", below[1] is a link to possibly a stacktrace or extra detail, but
is internal to google so I don't have access.

[1]
https://symbolize.corp.google.com/r/?trace=55a197abcf56,55a197abbe33,55a197abb97e,55a197abd708,55a196d4e22f,55a196d4d8d3,55a196d4da35,55a1967ec247,55a196f62b26,55a1968969b3,55a196886613,55a19696b0e6,55a196969815,55a1969693eb,55a19696916e,55a1969653bc,55a196b0150a,55a196b04e11,55a1979fc8df,7fe7736674e7,7fe7734dc22c&map=13ddc0ac8b57640c29c5016eb26ef88e:55a1956e7000-55a197bd5010,f1c96c67b57b74a4d7050f34aca016eef674f765:7fe77366-7fe773676dac,76b955c7af655a4c1e53b8d4aaa0255f3721f95f:7fe7734a5000-7fe7736464c4

On Thu, Sep 9, 2021 at 6:46 PM Robert Bradshaw  wrote:

> Huh, that's strange. Yes, the exact error on the service would be helpful.
>
> On Wed, Sep 8, 2021 at 10:12 AM Evan Galpin  wrote:
> >
> > Thanks for the response. I've created a gist here to demonstrate a
> minimal repro:
> https://gist.github.com/egalpin/2d6ad2210cf9f66108ff48a9c7566ebc
> >
> > It seemed to run fine both on DirectRunner and PortableRunner (embed
> mode), but Dataflow v2 runner raised an error at runtime seemingly
> associated with the Shuffle service?  I have job IDs and trace links if
> those are helpful as well.
> >
> > Thanks,
> > Evan
> >
> > On Tue, Sep 7, 2021 at 4:35 PM Robert Bradshaw 
> wrote:
> >>
> >> This is not yet supported. Using a union for now is the way to go. (If
> >> only the last value of the union was used, that sounds like a bug. Do
> >> you have a minimal repro?)
> >>
> >> On Tue, Sep 7, 2021 at 1:23 PM Evan Galpin 
> wrote:
> >> >
> >> > Hi all,
> >> >
> >> > What is the recommended way to write type hints for a tagged output
> DoFn where the outputs to different tags have different types?
> >> >
> >> > I tried using a Union to describe each of the possible output types,
> but that resulted in mismatched coder errors where only the last entry in
> the Union was used as the assumed type.  Is there a way to associate a type
> hint to a tag or something like that?
> >> >
> >> > Thanks,
> >> > Evan
>


Re: [Python] Heterogeneous TaggedOutput Type Hints

2021-09-08 Thread Evan Galpin
Thanks for the response. I've created a gist here to demonstrate a minimal
repro: https://gist.github.com/egalpin/2d6ad2210cf9f66108ff48a9c7566ebc

It seemed to run fine both on DirectRunner and PortableRunner (embed mode),
but Dataflow v2 runner raised an error at runtime seemingly associated with
the Shuffle service?  I have job IDs and trace links if those are helpful
as well.

Thanks,
Evan

On Tue, Sep 7, 2021 at 4:35 PM Robert Bradshaw  wrote:

> This is not yet supported. Using a union for now is the way to go. (If
> only the last value of the union was used, that sounds like a bug. Do
> you have a minimal repro?)
>
> On Tue, Sep 7, 2021 at 1:23 PM Evan Galpin  wrote:
> >
> > Hi all,
> >
> > What is the recommended way to write type hints for a tagged output DoFn
> where the outputs to different tags have different types?
> >
> > I tried using a Union to describe each of the possible output types, but
> that resulted in mismatched coder errors where only the last entry in the
> Union was used as the assumed type.  Is there a way to associate a type
> hint to a tag or something like that?
> >
> > Thanks,
> > Evan
>


[Python] Heterogeneous TaggedOutput Type Hints

2021-09-07 Thread Evan Galpin
Hi all,

What is the recommended way to write type hints for a tagged output DoFn
where the outputs to different tags have different types?

I tried using a Union to describe each of the possible output types, but
that resulted in mismatched coder errors where only the last entry in the
Union was used as the assumed type.  Is there a way to associate a type
hint to a tag or something like that?

Thanks,
Evan


Re: Processing historical data

2021-08-24 Thread Evan Galpin
Hi Kishore,

You may be able to introduce a new pipeline which reads from BQ and
publishes to PubSub like you mentioned. By default, elements read via
PubSub will have a timestamp which is equal to the publish time of the
message (internally established by PubSub service).

Are you using a custom withTimestampAttribute? If not, I believe there
should be no issue with watermark and late data. As per javadoc for PubSub
Read [1]:

> Note that the system can guarantee that no late data will ever be seen
when it assigns timestamps by arrival time (i.e. timestampAttribute is not
provided).

[1]
https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-

Hope that helps.

Thanks,
Evan

On Tue, Aug 24, 2021 at 11:16 KV 59  wrote:

> Hi Ankur,
>
> Thanks for the response. I have a follow up to option 1 & 2
>
> If I were able to stream the historical data to PubSub and be able to
> restart the job, even then I believe it will not work because of the
> watermarks, am I right?
>
> As for option 2, if I were able to make an incompatible change (by doing a
> hard restart) even then it would be a challenge because of the watermarks.
>
> Thanks
> Kishore
>
>
>
> On Tue, Aug 24, 2021 at 8:00 AM Ankur Goenka  wrote:
>
>> I suppose historic data Historical data processing will be one time
>> activity so it will be best to use a batch job to process historical data.
>> As for the options you mentioned,
>> Option 1 is not feasible as you will have to update the pipeline and I
>> believe the update will not be compatible because of the source change.
>> Option 2 also requires changes to 1st job which can be done using update
>> or drain and restart so has the same problem while being more complicated.
>>
>> Thanks,
>> Ankur
>>
>> On Tue, Aug 24, 2021 at 7:44 AM KV 59  wrote:
>>
>>> Hi,
>>>
>>> I have a Beam streaming pipeline processing live data from PubSub using
>>> sliding windows on event timestamps. I want to recompute the metrics for
>>> historical data in BigQuery. What are my options?
>>>
>>> I have looked at
>>> https://stackoverflow.com/questions/56702878/how-to-use-apache-beam-to-process-historic-time-series-data
>>> and I have a couple of questions
>>>
>>> 1. Can I use the same instance of the streaming pipeline? I don't think
>>> so as the watermark would be way past the historical event timestamps.
>>>
>>> 2. Could I possibly split the pipeline and use one branch for historical
>>> data and one for the live streaming data?
>>>
>>> I am trying hard not to raise parallel infrastructure to process
>>> historical data.
>>>
>>> Any inputs would be very much appreciated
>>>
>>> Thanks
>>> Kishore
>>>
>>


Re: [Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

2021-08-10 Thread Evan Galpin
>
> It is likely that the incorrect transform was edited...
>

It appears you're right; I tried to reproduce but  this time was able to
clear the issue by making "the same" code change and updating the
pipeline.  I believe it was just a change in the wrong place in code.

Good to know this works! Thanks Luke 🙏

On Tue, Aug 10, 2021 at 1:19 PM Luke Cwik  wrote:

>
>
> On Tue, Aug 10, 2021 at 10:11 AM Evan Galpin 
> wrote:
>
>> Thanks for your responses Luke. One point I have confusion over:
>>
>> * Modify the sink implementation to do what you want with the bad data
>>> and update the pipeline.
>>>
>>
>>  I modified the sink implementation to ignore the specific error that was
>> the problem and updated the pipeline. The update succeeded.  However, the
>> problem persisted.  How might that happen?  Is there caching involved?
>> Checkpointing? I changed the very last method called in the pipeline in
>> order to ensure the validation would apply, but the problem persisted.
>>
>
> It is likely that the incorrect transform was edited. You should take a
> look at the worker logs and find the exception that is being thrown and
> find the step name it is associated with (e.g.
> BigQueryIO/Write/StreamingInserts) and browse the source for a
> "StreamingInserts" transform that is applied from the "Write" transform
> which is applied from the BigQueryIO transform.
>
>
>>
>> And in the case where one is using a Sink which is a built-in IO module
>> of Beam, modification of the Sink may not be immediately feasible. Is the
>> only recourse in that case to drain a job an start a new one?
>>
>> The Beam IOs are open source allowing you to edit the code and build a
> new version locally which you would consume in your project. Dataflow does
> have an optimization and replaces the pubsub source/sink but all others to
> my knowledge should be based upon the Apache Beam source.
>
>
>> On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik  wrote:
>>
>>>
>>>
>>> On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I recently had an experience where a streaming pipeline became
>>>> "clogged" due to invalid data reaching the final step in my pipeline such
>>>> that the data was causing a non-transient error when writing to my Sink.
>>>> Since the job is a streaming job, the element (bundle) was continuously
>>>> retrying.
>>>>
>>>> What options are there for getting out of this state when it occurs?
>>>>
>>>
>>> * Modify the sink implementation to do what you want with the bad data
>>> and update the pipeline.
>>> * Cancel the pipeline and update the implementation to handle the bad
>>> records and rerun from last known good position reprocessing whatever was
>>> necessary.
>>>
>>> I attempted to add validation and update the streaming job to remove the
>>>> bad entity; though the update was successful, I believe the bad entity was
>>>> already checkpointed (?) further downstream in the pipeline. What then?
>>>>
>>> And for something like a database schema and evolving it over time, what
>>>> is the typical solution?
>>>>
>>>
>>> * Pipeline update containing newly updated schema before data with new
>>> schema starts rolling in.
>>> * Use a format and encoding that is agnostic to changes with a
>>> source/sink that can consume this agnostic format. See this thread[1] and
>>> others like it in the user and dev mailing lists.
>>>
>>>
>>>> - Should pipelines mirror a DB schema and do validation of all data
>>>> types in the pipeline?
>>>>
>>>
>>> Perform validation at critical points in the pipeline like data ingress
>>> and egress. Insertion of the data into the DB failing via a dead letter
>>> queue works for the cases that are loud and throw exceptions but for the
>>> cases where they are inserted successfully but are still invalid from a
>>> business logic standpoint won't be caught without validation.
>>>
>>>
>>>> - Should all sinks implement a way to remove non-transient failures
>>>> from retrying and output them via PCollectionTuple (such as with BigQuery
>>>> failed inserts)?
>>>>
>>>
>>> Yes, dead letter queues (DLQs) are quite a good solution for this since
>>> it provides a lot of flexibility and allows for a process to fix it up
>>> (typically a manual process).
>>>
>>> 1:
>>> https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E
>>>
>>


Re: [Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

2021-08-10 Thread Evan Galpin
Thanks for your responses Luke. One point I have confusion over:

* Modify the sink implementation to do what you want with the bad data and
> update the pipeline.
>

 I modified the sink implementation to ignore the specific error that was
the problem and updated the pipeline. The update succeeded.  However, the
problem persisted.  How might that happen?  Is there caching involved?
Checkpointing? I changed the very last method called in the pipeline in
order to ensure the validation would apply, but the problem persisted.

And in the case where one is using a Sink which is a built-in IO module of
Beam, modification of the Sink may not be immediately feasible. Is the only
recourse in that case to drain a job an start a new one?

On Tue, Aug 10, 2021 at 12:54 PM Luke Cwik  wrote:

>
>
> On Tue, Aug 10, 2021 at 8:54 AM Evan Galpin  wrote:
>
>> Hi all,
>>
>> I recently had an experience where a streaming pipeline became "clogged"
>> due to invalid data reaching the final step in my pipeline such that the
>> data was causing a non-transient error when writing to my Sink.  Since the
>> job is a streaming job, the element (bundle) was continuously retrying.
>>
>> What options are there for getting out of this state when it occurs?
>>
>
> * Modify the sink implementation to do what you want with the bad data and
> update the pipeline.
> * Cancel the pipeline and update the implementation to handle the bad
> records and rerun from last known good position reprocessing whatever was
> necessary.
>
> I attempted to add validation and update the streaming job to remove the
>> bad entity; though the update was successful, I believe the bad entity was
>> already checkpointed (?) further downstream in the pipeline. What then?
>>
> And for something like a database schema and evolving it over time, what
>> is the typical solution?
>>
>
> * Pipeline update containing newly updated schema before data with new
> schema starts rolling in.
> * Use a format and encoding that is agnostic to changes with a source/sink
> that can consume this agnostic format. See this thread[1] and others like
> it in the user and dev mailing lists.
>
>
>> - Should pipelines mirror a DB schema and do validation of all data types
>> in the pipeline?
>>
>
> Perform validation at critical points in the pipeline like data ingress
> and egress. Insertion of the data into the DB failing via a dead letter
> queue works for the cases that are loud and throw exceptions but for the
> cases where they are inserted successfully but are still invalid from a
> business logic standpoint won't be caught without validation.
>
>
>> - Should all sinks implement a way to remove non-transient failures from
>> retrying and output them via PCollectionTuple (such as with BigQuery failed
>> inserts)?
>>
>
> Yes, dead letter queues (DLQs) are quite a good solution for this since it
> provides a lot of flexibility and allows for a process to fix it up
> (typically a manual process).
>
> 1:
> https://lists.apache.org/thread.html/r4b31c8b76fa81dcb130397077b981ab6429f2999b9d864c815214c5a%40%3Cuser.beam.apache.org%3E
>


[Dataflow][Java][2.30.0] Best practice for clearing stuck data in streaming pipeline

2021-08-10 Thread Evan Galpin
Hi all,

I recently had an experience where a streaming pipeline became "clogged"
due to invalid data reaching the final step in my pipeline such that the
data was causing a non-transient error when writing to my Sink.  Since the
job is a streaming job, the element (bundle) was continuously retrying.

What options are there for getting out of this state when it occurs? I
attempted to add validation and update the streaming job to remove the bad
entity; though the update was successful, I believe the bad entity was
already checkpointed (?) further downstream in the pipeline. What then?

And for something like a database schema and evolving it over time, what is
the typical solution?

- Should pipelines mirror a DB schema and do validation of all data types
in the pipeline?
- Should all sinks implement a way to remove non-transient failures from
retrying and output them via PCollectionTuple (such as with BigQuery failed
inserts)?


Re: How to test ReadFromPubSub

2021-07-27 Thread Evan Galpin
This SO question is a bit dated, but there's also a pubsub emulator[1] that
you might find useful.

[1]
https://stackoverflow.com/questions/43354599/dataflow-pipeline-and-pubsub-emulator

On Tue, Jul 27, 2021 at 1:00 PM Ning Kang  wrote:

> In the example test, the `mock_pubsub` is from `patch`:
> @mock.patch('google.cloud.pubsub.SubscriberClient') for `class
> TestReadFromPubSub(unittest.TestCase)`, then in the test functions, the
> second arguments are the patched `google.cloud.pubsub.SubscriberClient`.
> For example, "def test_read_messages_success(self, mock_pubsub)".
> See the usage of patch decorator here
> 
> .
>
>
> You can also use TestStream to simulate a streaming source:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/test_stream_test.py
>
> On Mon, Jul 26, 2021 at 11:21 PM Bergmeier, Andreas <
> andreas.bergme...@otto.de> wrote:
>
>> We have a PTransform, which uses ReadFromPubSub and now would like to
>> test it.
>> Looking at
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/pubsub_test.py
>> it seems like we would need something similar to TestReadFromPubSub.
>> I do however not understand how this works at all since e.g. there seems
>> to appear a mock_pubsub out of thin air.
>> Can someone please give a rough sketch which parts need to work together
>> how in order to test ReadFromPubSub in a TestPipeline?
>>
>


Re: Can I write to disk (using Dataflow)? Handling large files?

2021-07-16 Thread Evan Galpin
Thanks Luke for the info regarding GCS compose. Do you know if the GCS
FileIO implementation supports the "compose" api that you mentioned?

Thanks,
Evan

On Fri, Jul 16, 2021 at 12:17 PM Luke Cwik  wrote:

>
>
> On Fri, Jul 16, 2021 at 6:53 AM Evan Galpin  wrote:
>
>> Hi there,
>>
>> I imagine the answer to this question might depend on the underlying
>> runner, but simply put: can I write files, temporarily, to disk? I'm
>> currently using the DataflowRunner if that's a helpful detail.
>>
>
> For all runners that use containers (e.g. Dataflow), your limited to the
> amount of free space in the container typically (maybe a few GBs). Dataflow
> does also mount I believe /tmp as a persistent directory from the VM (only
> valid for the lifetime of the VM) and then you're limited to the amount of
> disk space on the VM itself.
>
>
>> Relatedly, how does Beam handle large files? Say that my pipeline reads
>> files from a distributed file system like AWS S3 or GCP Cloud Storage. If a
>> file is 10 GB and I read its contents, those contents will be held in
>> memory, correct?
>>
>
> Beam processes ranges/partitions/... of sources at a time. Many file
> formats are actually split at byte offsets and each byte range is processed
> independently and in parallel. As these records are processed they are
> typically output to something like a sink or a GroupByKey so the only
> memory necessary for processing is the working set size for all the
> elements currently being processed. Note that depending on the runners
> implementation of GroupByKey, some runners may do the grouping operation in
> memory and hence you might be limited by the amount of memory some set of
> workers have. Dataflow does its grouping operation in a separate service
> which is backed by durable storage so the size of a GroupByKey isn't
> limited by the VMs available memory.
>
> To be concrete, if your files each contain one record which is 10gbs in
> size then yes you will need 10gbs+ of memory since that is the working set
> size but if your files contain 10k records and each record is about 1kb
> then your working set size is much smaller.
>
> As a somewhat contrived example, what would be the recommended approach if
>> I wanted to read a set of large files, tar them, and upload them elsewhere?
>>
>
> Create.of(listOfFileNames) -> ParDo(TarFiles)
>
> In your TarFiles DoFn you would have:
> processElement(List files) {
>   OutputStream tarOutput = // initialize output stream for tar file
>   for (String file : files) {
> InputStream input = // open file
> // copy input stream to output stream
>   }
> }
> This will consume memory relative to the buffer sizes used in the input
> and output streams and will process in parallel based upon how many output
> tar files you want to create.
>
> Conveniently tar is a format which allows you to concatenate together
> pretty easily so if the underlying file system has a concatenate operation
> you could design a better pipeline. For example GCS allows you to
> compose[1] up to 32 files allowing you to write a pipeline where you split
> up your list of files into 32 partitions and have each write one tar file
> and then have a transform after all those are complete which composes the
> 32 files together and then deletes the 32 pieces.
>
>
>>
>> Thanks!
>> Evan
>>
>
> 1: https://cloud.google.com/storage/docs/json_api/v1/objects/compose
>
>


Can I write to disk (using Dataflow)? Handling large files?

2021-07-16 Thread Evan Galpin
Hi there,

I imagine the answer to this question might depend on the underlying
runner, but simply put: can I write files, temporarily, to disk? I'm
currently using the DataflowRunner if that's a helpful detail.

Relatedly, how does Beam handle large files? Say that my pipeline reads
files from a distributed file system like AWS S3 or GCP Cloud Storage. If a
file is 10 GB and I read its contents, those contents will be held in
memory, correct?

As a somewhat contrived example, what would be the recommended approach if
I wanted to read a set of large files, tar them, and upload them elsewhere?

Thanks!
Evan


Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-07-09 Thread Evan Galpin
I wanted to circle back here and state that using release 2.31.0, with _or_
without  --experiments=use_deprecated_read, seems to resolve the slowness
in my case. It's still on my radar to get a minimal pipeline extracted from
my previously problematic pipeline so as to hopefully aid in debugging
efforts.

Thanks,
Evan

On Mon, Jun 14, 2021 at 8:39 PM Robert Bradshaw  wrote:

> Awesome, thanks!
>
> On Mon, Jun 14, 2021 at 5:36 PM Evan Galpin  wrote:
> >
> > I’ll try to create something as small as possible from the pipeline I
> mentioned 👍 I should have time this week to do so.
> >
> > Thanks,
> > Evan
> >
> > On Mon, Jun 14, 2021 at 18:09 Robert Bradshaw 
> wrote:
> >>
> >> Is it possible to post the code? (Or the code of a similar, but
> >> minimal, pipeline that exhibits the same issues?)
> >>
> >> On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin 
> wrote:
> >> >
> >> > @robert I have a pipeline which consistently shows a major slowdown
> (10 seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can
> be boiled down to:
> >> >
> >> > - Read GCS file patterns from PubSub
> >> > - Window into Fixed windows (repeating every 15 seconds)
> >> > - Deduplicate/distinct (have tried both)
> >> > - Read GCS blobs via patterns from the first step
> >> > - Write file contents to sink
> >> >
> >> > It doesn't seem to matter if there are 0 messages in a subscription
> or 50k messages at startup. The rate of new messages however is very low.
> Not sure if those are helpful details, let me know if there's anything else
> specific which would help.
> >> >
> >> > On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw 
> wrote:
> >> >>
> >> >> +1, we'd really like to get to the bottom of this, so clear
> >> >> instructions on a pipeline/conditions that can reproduce it would be
> >> >> great.
> >> >>
> >> >> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský 
> wrote:
> >> >> >
> >> >> > Hi Eddy,
> >> >> >
> >> >> > you are probably hitting a not-yet discovered bug in SDF
> implementation in FlinkRunner that (under some currently unknown
> conditions) seems to stop advancing the watermark. This has been observed
> in one other instance (that I'm aware of). I think we don't yet have a
> tracking JIRA for that, would you mind filling it? It would be awesome if
> you could include estimations of messages per sec throughput that causes
> the issue in your case.
> >> >> >
> >> >> > +Tobias Kaymak
> >> >> >
> >> >> > Tobias, could you please confirm that the case you had with Flink
> stopping progressing watermark resembled this one?
> >> >> >
> >> >> > Thanks.
> >> >> >
> >> >> >  Jan
> >> >> >
> >> >> > On 6/14/21 4:11 PM, Eddy G wrote:
> >> >> >
> >> >> > Hi Jan,
> >> >> >
> >> >> > I've added --experiments=use_deprecated_read and it seems to work
> flawlessly (with my current Window and the one proposed by Evan).
> >> >> >
> >> >> > Why is this? Do Splittable DoFn now break current implementations?
> Are there any posts of possible breaking changes?
> >> >> >
> >> >> > On 2021/06/14 13:19:39, Jan Lukavský  wrote:
> >> >> >
> >> >> > Hi Eddy,
> >> >> >
> >> >> > answers inline.
> >> >> >
> >> >> > On 6/14/21 3:05 PM, Eddy G wrote:
> >> >> >
> >> >> > Hi Jan,
> >> >> >
> >> >> > Thanks for replying so fast!
> >> >> >
> >> >> > Regarding your questions,
> >> >> >
> >> >> > - "Does your data get buffered in a state?"
> >> >> > Yes, I do have a state within a stage prior ParquetIO writing
> together with a Timer with PROCESSING_TIME.
> >> >> >
> >> >> > The stage which contains the state does send bytes to the next one
> which is the ParquetIO writing. Seems the @OnTimer doesn't get triggered
> and it's not clearing the state. This however does work under normal
> circumstances without having too much data queued waiting to be processed.
> >> >> >
> >> >>

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
I’ll try to create something as small as possible from the pipeline I
mentioned 👍 I should have time this week to do so.

Thanks,
Evan

On Mon, Jun 14, 2021 at 18:09 Robert Bradshaw  wrote:

> Is it possible to post the code? (Or the code of a similar, but
> minimal, pipeline that exhibits the same issues?)
>
> On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin  wrote:
> >
> > @robert I have a pipeline which consistently shows a major slowdown (10
> seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be
> boiled down to:
> >
> > - Read GCS file patterns from PubSub
> > - Window into Fixed windows (repeating every 15 seconds)
> > - Deduplicate/distinct (have tried both)
> > - Read GCS blobs via patterns from the first step
> > - Write file contents to sink
> >
> > It doesn't seem to matter if there are 0 messages in a subscription or
> 50k messages at startup. The rate of new messages however is very low. Not
> sure if those are helpful details, let me know if there's anything else
> specific which would help.
> >
> > On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw 
> wrote:
> >>
> >> +1, we'd really like to get to the bottom of this, so clear
> >> instructions on a pipeline/conditions that can reproduce it would be
> >> great.
> >>
> >> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský  wrote:
> >> >
> >> > Hi Eddy,
> >> >
> >> > you are probably hitting a not-yet discovered bug in SDF
> implementation in FlinkRunner that (under some currently unknown
> conditions) seems to stop advancing the watermark. This has been observed
> in one other instance (that I'm aware of). I think we don't yet have a
> tracking JIRA for that, would you mind filling it? It would be awesome if
> you could include estimations of messages per sec throughput that causes
> the issue in your case.
> >> >
> >> > +Tobias Kaymak
> >> >
> >> > Tobias, could you please confirm that the case you had with Flink
> stopping progressing watermark resembled this one?
> >> >
> >> > Thanks.
> >> >
> >> >  Jan
> >> >
> >> > On 6/14/21 4:11 PM, Eddy G wrote:
> >> >
> >> > Hi Jan,
> >> >
> >> > I've added --experiments=use_deprecated_read and it seems to work
> flawlessly (with my current Window and the one proposed by Evan).
> >> >
> >> > Why is this? Do Splittable DoFn now break current implementations?
> Are there any posts of possible breaking changes?
> >> >
> >> > On 2021/06/14 13:19:39, Jan Lukavský  wrote:
> >> >
> >> > Hi Eddy,
> >> >
> >> > answers inline.
> >> >
> >> > On 6/14/21 3:05 PM, Eddy G wrote:
> >> >
> >> > Hi Jan,
> >> >
> >> > Thanks for replying so fast!
> >> >
> >> > Regarding your questions,
> >> >
> >> > - "Does your data get buffered in a state?"
> >> > Yes, I do have a state within a stage prior ParquetIO writing
> together with a Timer with PROCESSING_TIME.
> >> >
> >> > The stage which contains the state does send bytes to the next one
> which is the ParquetIO writing. Seems the @OnTimer doesn't get triggered
> and it's not clearing the state. This however does work under normal
> circumstances without having too much data queued waiting to be processed.
> >> >
> >> > OK, this suggests, that the watermark is for some reason "stuck". If
> you
> >> > checkpoints enabled, you should see the size of the checkpoint to grow
> >> > over time.
> >> >
> >> > - "Do you see watermark being updated in your Flink WebUI?"
> >> > The stages that do have a watermark don't get updated. The same
> watermark value has been constant since the pipeline started.
> >> >
> >> > If no lateness is set, any late data should be admitted right?
> >> >
> >> > If no lateness is set, it means allowed lateness of Duration.ZERO,
> which
> >> > means that data that arrive after end-of-window will be dropped.
> >> >
> >> > Regarding 'droppedDueToLateness' metric, can't see it exposed
> anywhere, neither in Flink UI or Prometheus. I've seen it in Dataflow but
> seems to be a Dataflow specific metric right?
> >> >
> >> > Should not be Dataflow specific. But if you don't see it, it means 

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
@robert I have a pipeline which consistently shows a major slowdown (10
seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be
boiled down to:

- Read GCS file patterns from PubSub
- Window into Fixed windows (repeating every 15 seconds)
- Deduplicate/distinct (have tried both)
- Read GCS blobs via patterns from the first step
- Write file contents to sink

It doesn't seem to matter if there are 0 messages in a subscription or 50k
messages at startup. The rate of new messages however is very low. Not sure
if those are helpful details, let me know if there's anything else specific
which would help.

On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw 
wrote:

> +1, we'd really like to get to the bottom of this, so clear
> instructions on a pipeline/conditions that can reproduce it would be
> great.
>
> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský  wrote:
> >
> > Hi Eddy,
> >
> > you are probably hitting a not-yet discovered bug in SDF implementation
> in FlinkRunner that (under some currently unknown conditions) seems to stop
> advancing the watermark. This has been observed in one other instance (that
> I'm aware of). I think we don't yet have a tracking JIRA for that, would
> you mind filling it? It would be awesome if you could include estimations
> of messages per sec throughput that causes the issue in your case.
> >
> > +Tobias Kaymak
> >
> > Tobias, could you please confirm that the case you had with Flink
> stopping progressing watermark resembled this one?
> >
> > Thanks.
> >
> >  Jan
> >
> > On 6/14/21 4:11 PM, Eddy G wrote:
> >
> > Hi Jan,
> >
> > I've added --experiments=use_deprecated_read and it seems to work
> flawlessly (with my current Window and the one proposed by Evan).
> >
> > Why is this? Do Splittable DoFn now break current implementations? Are
> there any posts of possible breaking changes?
> >
> > On 2021/06/14 13:19:39, Jan Lukavský  wrote:
> >
> > Hi Eddy,
> >
> > answers inline.
> >
> > On 6/14/21 3:05 PM, Eddy G wrote:
> >
> > Hi Jan,
> >
> > Thanks for replying so fast!
> >
> > Regarding your questions,
> >
> > - "Does your data get buffered in a state?"
> > Yes, I do have a state within a stage prior ParquetIO writing together
> with a Timer with PROCESSING_TIME.
> >
> > The stage which contains the state does send bytes to the next one which
> is the ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's
> not clearing the state. This however does work under normal circumstances
> without having too much data queued waiting to be processed.
> >
> > OK, this suggests, that the watermark is for some reason "stuck". If you
> > checkpoints enabled, you should see the size of the checkpoint to grow
> > over time.
> >
> > - "Do you see watermark being updated in your Flink WebUI?"
> > The stages that do have a watermark don't get updated. The same
> watermark value has been constant since the pipeline started.
> >
> > If no lateness is set, any late data should be admitted right?
> >
> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
> > means that data that arrive after end-of-window will be dropped.
> >
> > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere,
> neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to be
> a Dataflow specific metric right?
> >
> > Should not be Dataflow specific. But if you don't see it, it means it
> > could be zero. So, we can rule this out.
> >
> > We're using KinesisIO for reading messages.
> >
> > Kinesis uses UnboundedSource, which is expended to SDF starting from
> > Beam 2.25.0. The flag should change that as well. Can you try the
> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
> > (should not contain Impulse transform at the beginning) and if it solves
> > your issues?
> >
> > On 2021/06/14 12:48:58, Jan Lukavský  wrote:
> >
> > Hi Eddy,
> >
> > does your data get buffered in a state - e.g. does the size of the state
> > grow over time? Do you see watermark being updated in your Flink WebUI?
> > When a stateful operation (and GroupByKey is a stateful operation) does
> > not output any data, the first place to look at is if watermark
> > correctly progresses. If it does not progress, then the input data must
> > be buffered in state and the size of the state should grow over time. If
> > it progresses, then it might be the case, that the data is too late
> > after the watermark (the watermark estimator might need tuning) and the
> > data gets dropped (note you don't set any allowed lateness, which
> > _might_ cause issues). You could see if your pipeline drops data in
> > "droppedDueToLateness" metric. The size of you state would not grow much
> > in that situation.
> >
> > Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> > using "--experiments=use_deprecated_read" on command line (which you
> > then must pass to PipelineOptionsFactory). There is some suspicion that
> > SDF wrapper for Kafka might not wo

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
There have been varied reports of slowness loosely attributed to SDF
default wrapper change from 2.25.0.  Ex

https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E

https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-10670/comment/17316858

http://mail-archives.apache.org/mod_mbox/beam-dev/202105.mbox/%3ccae7uba_v0vfl9ck7n06n2zf6e+xcebdircez7yftlfwuvch...@mail.gmail.com%3e



On Mon, Jun 14, 2021 at 10:11 Eddy G  wrote:

> Hi Jan,
>
> I've added --experiments=use_deprecated_read and it seems to work
> flawlessly (with my current Window and the one proposed by Evan).
>
> Why is this? Do Splittable DoFn now break current implementations? Are
> there any posts of possible breaking changes?
>
> On 2021/06/14 13:19:39, Jan Lukavský  wrote:
> > Hi Eddy,
> >
> > answers inline.
> >
> > On 6/14/21 3:05 PM, Eddy G wrote:
> > > Hi Jan,
> > >
> > > Thanks for replying so fast!
> > >
> > > Regarding your questions,
> > >
> > > - "Does your data get buffered in a state?"
> > > Yes, I do have a state within a stage prior ParquetIO writing together
> with a Timer with PROCESSING_TIME.
> > >
> > > The stage which contains the state does send bytes to the next one
> which is the ParquetIO writing. Seems the @OnTimer doesn't get triggered
> and it's not clearing the state. This however does work under normal
> circumstances without having too much data queued waiting to be processed.
> > OK, this suggests, that the watermark is for some reason "stuck". If you
> > checkpoints enabled, you should see the size of the checkpoint to grow
> > over time.
> > >
> > > - "Do you see watermark being updated in your Flink WebUI?"
> > > The stages that do have a watermark don't get updated. The same
> watermark value has been constant since the pipeline started.
> > >
> > > If no lateness is set, any late data should be admitted right?
> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
> > means that data that arrive after end-of-window will be dropped.
> > >
> > > Regarding 'droppedDueToLateness' metric, can't see it exposed
> anywhere, neither in Flink UI or Prometheus. I've seen it in Dataflow but
> seems to be a Dataflow specific metric right?
> > Should not be Dataflow specific. But if you don't see it, it means it
> > could be zero. So, we can rule this out.
> > >
> > > We're using KinesisIO for reading messages.
> > Kinesis uses UnboundedSource, which is expended to SDF starting from
> > Beam 2.25.0. The flag should change that as well. Can you try the
> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
> > (should not contain Impulse transform at the beginning) and if it solves
> > your issues?
> > >
> > > On 2021/06/14 12:48:58, Jan Lukavský  wrote:
> > >> Hi Eddy,
> > >>
> > >> does your data get buffered in a state - e.g. does the size of the
> state
> > >> grow over time? Do you see watermark being updated in your Flink
> WebUI?
> > >> When a stateful operation (and GroupByKey is a stateful operation)
> does
> > >> not output any data, the first place to look at is if watermark
> > >> correctly progresses. If it does not progress, then the input data
> must
> > >> be buffered in state and the size of the state should grow over time.
> If
> > >> it progresses, then it might be the case, that the data is too late
> > >> after the watermark (the watermark estimator might need tuning) and
> the
> > >> data gets dropped (note you don't set any allowed lateness, which
> > >> _might_ cause issues). You could see if your pipeline drops data in
> > >> "droppedDueToLateness" metric. The size of you state would not grow
> much
> > >> in that situation.
> > >>
> > >> Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> > >> using "--experiments=use_deprecated_read" on command line (which you
> > >> then must pass to PipelineOptionsFactory). There is some suspicion
> that
> > >> SDF wrapper for Kafka might not work as expected in certain situations
> > >> with Flink.
> > >>
> > >> Please feel free to share any results,
> > >>
> > >> Jan
> > >>
> > >> On 6/14/21 1:39 PM, Eddy G wrote:
> > >>> As seen in this image https://imgur.com/a/wrZET97, I'm trying to
> deal with late data (intentionally stopped my consumer so data has been
> accumulating for several days now). Now, with the following Window... I'm
> using Beam 2.27 and Flink 1.12.
> > >>>
> > >>>
>  Window.into(FixedWindows.of(Duration.standardMinutes(10)))
> > >>>
> > >>> And several parsing stages after, once it's time to write within the
> ParquetIO stage...
> > >>>
> > >>>   FileIO
> > >>>   .writeDynamic()
> > >>>   .by(...)
> > >>>   .via(...)
> > >>>   .to(...)
> > >>>   .withNaming(...)
> > >>>
>  .withDestinationCoder(StringUtf8Coder.of())

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
I believe that by default windows will only trigger one time [1]. This has
definitely caught me by surprise before.

I think that default strategy might fine for a batch pipeline,
but  typically does not for streaming (which I assume you’re using because
you mentioned Flink).

I believe you’ll want to add a non-default triggering mechanism to the
window strategy that you mentioned. I would recommend reading through the
triggering docs[2] for background. The Repeatedly.forever[3] function may
work for your use case. Something like:

Window.into(FixedWindows.of(Duration.standardMinutes(10)))

.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();


[1]
https://beam.apache.org/documentation/programming-guide/#default-trigger
[2]
https://beam.apache.org/documentation/programming-guide/#triggers
[3]
https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/windowing/Repeatedly.html

On Mon, Jun 14, 2021 at 07:39 Eddy G  wrote:

> As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal
> with late data (intentionally stopped my consumer so data has been
> accumulating for several days now). Now, with the following Window... I'm
> using Beam 2.27 and Flink 1.12.
>
>
> Window.into(FixedWindows.of(Duration.standardMinutes(10)))
>
> And several parsing stages after, once it's time to write within the
> ParquetIO stage...
>
> FileIO
> .writeDynamic()
> .by(...)
> .via(...)
> .to(...)
> .withNaming(...)
> .withDestinationCoder(StringUtf8Coder.of())
> .withNumShards(options.getNumShards())
>
> it won't send bytes across all stages so no data is being written, still
> it accumulates in the first stage seen in the image and won't go further
> than that.
>
> Any reason why this may be happening? Wrong windowing strategy?
>


Re: Extremely Slow DirectRunner

2021-05-14 Thread Evan Galpin
Any further thoughts here? Or tips on profiling Beam DirectRunner?

Thanks,
Evan

On Wed, May 12, 2021 at 6:22 PM Evan Galpin  wrote:

> Ok gotcha. In my tests, all sdk versions 2.25.0 and higher exhibit slow
> behaviour regardless of use_deprecated_reads. Not sure if that points to
> something different then.
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 18:16 Steve Niemitz  wrote:
>
>> I think it was only broken in 2.29.
>>
>> On Wed, May 12, 2021 at 5:53 PM Evan Galpin 
>> wrote:
>>
>>> Ah ok thanks for that. Do you mean use_deprecated_reads is broken
>>> specifically in 2.29.0 (regression) or broken in all versions up to and
>>> including 2.29.0 (ie never worked)?
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Wed, May 12, 2021 at 17:12 Steve Niemitz  wrote:
>>>
>>>> Yeah, sorry my email was confusing.  use_deprecated_reads is broken on
>>>> the DirectRunner in 2.29.
>>>>
>>>> The behavior you describe is exactly the behavior I ran into as well
>>>> when reading from pubsub with the new read method.  I believe that soon the
>>>> default is being reverted back to the old read method, not using SDFs,
>>>> which will fix your performance issue.
>>>>
>>>> On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang 
>>>> wrote:
>>>>
>>>>> Hi Evan,
>>>>>
>>>>> It seems like the slow step is not the read that use_deprecated_read
>>>>> targets for. Would you like to share your pipeline code if possible?
>>>>>
>>>>> On Wed, May 12, 2021 at 1:35 PM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
>>>>>> observed slow behavior again. Is it possible that use_deprecated_read is
>>>>>> broken in 2.29.0 as well?
>>>>>>
>>>>>> Thanks,
>>>>>> Evan
>>>>>>
>>>>>> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
>>>>>> wrote:
>>>>>>
>>>>>>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>>>>>>
>>>>>>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the
>>>>>>>> "faster" behavior, as did v2.23.0. But that "fast" behavior stopped at
>>>>>>>> v2.25.0 (for my use case at least) regardless of use_deprecated_read
>>>>>>>> setting.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Evan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> use_deprecated_read was broken in 2.19 on the direct runner and
>>>>>>>>> didn't do anything. [1]  I don't think the fix is in 2.20 either, but 
>>>>>>>>> will
>>>>>>>>> be in 2.21.
>>>>>>>>>
>>>>>>>>> [1] https://github.com/apache/beam/pull/14469
>>>>>>>>>
>>>>>>>>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I forgot to also mention that in all tests I was setting
>>>>>>>>>> --experiments=use_deprecated_read
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Evan
>>>>>>>>>>
>>>>>>>>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin <
>>>>>>>>>> evan.gal...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hmm, I think I spoke too soon. I'm still seeing an issue of
>>>>>>>>>>> overall DirectRunner slowness, not just pubsub. I have a pipeline 
>>>>>>>>>>> like so:
>>>>>>>>>>>
>>>>>>>>>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()
>>>>>>>>>>> |  FileIO.readMatches()  |  Read file conte

Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
Ok gotcha. In my tests, all sdk versions 2.25.0 and higher exhibit slow
behaviour regardless of use_deprecated_reads. Not sure if that points to
something different then.

Thanks,
Evan

On Wed, May 12, 2021 at 18:16 Steve Niemitz  wrote:

> I think it was only broken in 2.29.
>
> On Wed, May 12, 2021 at 5:53 PM Evan Galpin  wrote:
>
>> Ah ok thanks for that. Do you mean use_deprecated_reads is broken
>> specifically in 2.29.0 (regression) or broken in all versions up to and
>> including 2.29.0 (ie never worked)?
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 17:12 Steve Niemitz  wrote:
>>
>>> Yeah, sorry my email was confusing.  use_deprecated_reads is broken on
>>> the DirectRunner in 2.29.
>>>
>>> The behavior you describe is exactly the behavior I ran into as well
>>> when reading from pubsub with the new read method.  I believe that soon the
>>> default is being reverted back to the old read method, not using SDFs,
>>> which will fix your performance issue.
>>>
>>> On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang  wrote:
>>>
>>>> Hi Evan,
>>>>
>>>> It seems like the slow step is not the read that use_deprecated_read
>>>> targets for. Would you like to share your pipeline code if possible?
>>>>
>>>> On Wed, May 12, 2021 at 1:35 PM Evan Galpin 
>>>> wrote:
>>>>
>>>>> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
>>>>> observed slow behavior again. Is it possible that use_deprecated_read is
>>>>> broken in 2.29.0 as well?
>>>>>
>>>>> Thanks,
>>>>> Evan
>>>>>
>>>>> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
>>>>> wrote:
>>>>>
>>>>>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>>>>>
>>>>>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the
>>>>>>> "faster" behavior, as did v2.23.0. But that "fast" behavior stopped at
>>>>>>> v2.25.0 (for my use case at least) regardless of use_deprecated_read
>>>>>>> setting.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Evan
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> use_deprecated_read was broken in 2.19 on the direct runner and
>>>>>>>> didn't do anything. [1]  I don't think the fix is in 2.20 either, but 
>>>>>>>> will
>>>>>>>> be in 2.21.
>>>>>>>>
>>>>>>>> [1] https://github.com/apache/beam/pull/14469
>>>>>>>>
>>>>>>>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I forgot to also mention that in all tests I was setting
>>>>>>>>> --experiments=use_deprecated_read
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Evan
>>>>>>>>>
>>>>>>>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hmm, I think I spoke too soon. I'm still seeing an issue of
>>>>>>>>>> overall DirectRunner slowness, not just pubsub. I have a pipeline 
>>>>>>>>>> like so:
>>>>>>>>>>
>>>>>>>>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()
>>>>>>>>>> |  FileIO.readMatches()  |  Read file contents  |  etc
>>>>>>>>>>
>>>>>>>>>> I have temporarily set up a transform between each step to log
>>>>>>>>>> what's going on and illustrate timing issues.  I ran a series of 
>>>>>>>>>> tests
>>>>>>>>>> changing only the SDK version each time since I hadn't noticed this
>>>>>>>>>> performance issue with 2.19.0 (effectively git-bisect). Before each 
>>>>&

Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
Ah ok thanks for that. Do you mean use_deprecated_reads is broken
specifically in 2.29.0 (regression) or broken in all versions up to and
including 2.29.0 (ie never worked)?

Thanks,
Evan

On Wed, May 12, 2021 at 17:12 Steve Niemitz  wrote:

> Yeah, sorry my email was confusing.  use_deprecated_reads is broken on the
> DirectRunner in 2.29.
>
> The behavior you describe is exactly the behavior I ran into as well when
> reading from pubsub with the new read method.  I believe that soon the
> default is being reverted back to the old read method, not using SDFs,
> which will fix your performance issue.
>
> On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang  wrote:
>
>> Hi Evan,
>>
>> It seems like the slow step is not the read that use_deprecated_read
>> targets for. Would you like to share your pipeline code if possible?
>>
>> On Wed, May 12, 2021 at 1:35 PM Evan Galpin 
>> wrote:
>>
>>> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
>>> observed slow behavior again. Is it possible that use_deprecated_read is
>>> broken in 2.29.0 as well?
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
>>> wrote:
>>>
>>>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>>>
>>>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>>>> wrote:
>>>>
>>>>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
>>>>> behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
>>>>> my use case at least) regardless of use_deprecated_read setting.
>>>>>
>>>>> Thanks,
>>>>> Evan
>>>>>
>>>>>
>>>>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>>>>> wrote:
>>>>>
>>>>>> use_deprecated_read was broken in 2.19 on the direct runner and
>>>>>> didn't do anything. [1]  I don't think the fix is in 2.20 either, but 
>>>>>> will
>>>>>> be in 2.21.
>>>>>>
>>>>>> [1] https://github.com/apache/beam/pull/14469
>>>>>>
>>>>>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>>>>>> wrote:
>>>>>>
>>>>>>> I forgot to also mention that in all tests I was setting
>>>>>>> --experiments=use_deprecated_read
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Evan
>>>>>>>
>>>>>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>>>>>>>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>>>>>>>
>>>>>>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>>>>>>>> FileIO.readMatches()  |  Read file contents  |  etc
>>>>>>>>
>>>>>>>> I have temporarily set up a transform between each step to log
>>>>>>>> what's going on and illustrate timing issues.  I ran a series of tests
>>>>>>>> changing only the SDK version each time since I hadn't noticed this
>>>>>>>> performance issue with 2.19.0 (effectively git-bisect). Before each 
>>>>>>>> test, I
>>>>>>>> seeded the pubsub subscription with the exact same contents.
>>>>>>>>
>>>>>>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
>>>>>>>> seem to resolve) and onward show a significant slowdown.
>>>>>>>>
>>>>>>>> Here is a snippet of logging from v2.25.0:
>>>>>>>>
>>>>>>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>>>>>>> processElement
>>>>>>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>>>>>>> May 12, 2021 11:16:59 A.M.
>>>>>>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>>>>>>> INFO: Matched 2 files for pattern
>>>>>>>> gs://my-bucket/my-dir/5004728247517184/**
>>>>>>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline

Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
I'd be happy to share what I can. The applicable portion is this `expand`
method of a PTransform (which does nothing more complex than group these
other transforms together for re-use). The input to this PTransform is
pubsub message bodies as strings. I'll paste it as plain-text.

@Override
public PCollection> expand(PCollection
input) {
return input
.apply(
"Convert OCNs to wildcard paths",
ParDo.of(new BlobPathToRootPath()))

.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))

.apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED))
.apply(
MapElements
.into(kvs(strings(), strings()))
.via(
(ReadableFile f) -> {
try {
String[] blobUri =
f.getMetadata().resourceId().toString().split("/");
String doc_id = blobUri[4];
return KV.of(doc_id,
f.readFullyAsUTF8String());
} catch (IOException ex) {
throw new RuntimeException(
"Failed to read the file", ex);
}
}));
}

Thanks,
Evan

On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang  wrote:

> Hi Evan,
>
> It seems like the slow step is not the read that use_deprecated_read
> targets for. Would you like to share your pipeline code if possible?
>
> On Wed, May 12, 2021 at 1:35 PM Evan Galpin  wrote:
>
>> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
>> observed slow behavior again. Is it possible that use_deprecated_read is
>> broken in 2.29.0 as well?
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
>> wrote:
>>
>>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>>
>>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>>> wrote:
>>>
>>>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
>>>> behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
>>>> my use case at least) regardless of use_deprecated_read setting.
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>>
>>>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>>>> wrote:
>>>>
>>>>> use_deprecated_read was broken in 2.19 on the direct runner and didn't
>>>>> do anything. [1]  I don't think the fix is in 2.20 either, but will be in
>>>>> 2.21.
>>>>>
>>>>> [1] https://github.com/apache/beam/pull/14469
>>>>>
>>>>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> I forgot to also mention that in all tests I was setting
>>>>>> --experiments=use_deprecated_read
>>>>>>
>>>>>> Thanks,
>>>>>> Evan
>>>>>>
>>>>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>>>>>> wrote:
>>>>>>
>>>>>>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>>>>>>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>>>>>>
>>>>>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>>>>>>> FileIO.readMatches()  |  Read file contents  |  etc
>>>>>>>
>>>>>>> I have temporarily set up a transform between each step to log
>>>>>>> what's going on and illustrate timing issues.  I ran a series of tests
>>>>>>> changing only the SDK version each time since I hadn't noticed this
>>>>>>> performance issue with 2.19.0 (effectively git-bisect). Before each 
>>>>>>> test, I
>>>>>>> seeded the pubsub subscription with the exact same contents.
>>>>>>>
>>>>>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
>>>>>>> seem to resolve) and onward show a significant slowdown.
>>>>>>>
>>>>>>> Here is a snippet of logging from v2.25.0:
>>>>>>>
>>>>>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>>>>>> processElement
>>>>>>> INFO: Got file p

Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
I just tried with v2.29.0 and use_deprecated_read but unfortunately I
observed slow behavior again. Is it possible that use_deprecated_read is
broken in 2.29.0 as well?

Thanks,
Evan

On Wed, May 12, 2021 at 3:21 PM Steve Niemitz  wrote:

> oops sorry I was off by 10...I meant 2.29 not 2.19.
>
> On Wed, May 12, 2021 at 2:55 PM Evan Galpin  wrote:
>
>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
>> behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
>> my use case at least) regardless of use_deprecated_read setting.
>>
>> Thanks,
>> Evan
>>
>>
>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>> wrote:
>>
>>> use_deprecated_read was broken in 2.19 on the direct runner and didn't
>>> do anything. [1]  I don't think the fix is in 2.20 either, but will be in
>>> 2.21.
>>>
>>> [1] https://github.com/apache/beam/pull/14469
>>>
>>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>>> wrote:
>>>
>>>> I forgot to also mention that in all tests I was setting
>>>> --experiments=use_deprecated_read
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>>>> wrote:
>>>>
>>>>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>>>>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>>>>
>>>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>>>>> FileIO.readMatches()  |  Read file contents  |  etc
>>>>>
>>>>> I have temporarily set up a transform between each step to log what's
>>>>> going on and illustrate timing issues.  I ran a series of tests changing
>>>>> only the SDK version each time since I hadn't noticed this performance
>>>>> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded the
>>>>> pubsub subscription with the exact same contents.
>>>>>
>>>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
>>>>> seem to resolve) and onward show a significant slowdown.
>>>>>
>>>>> Here is a snippet of logging from v2.25.0:
>>>>>
>>>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>>>> processElement
>>>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>>>> May 12, 2021 11:16:59 A.M.
>>>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>>>> INFO: Matched 2 files for pattern
>>>>> gs://my-bucket/my-dir/5004728247517184/**
>>>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>>> processElement
>>>>> INFO: Got ReadableFile: my-file1.json
>>>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>>> processElement
>>>>> INFO: Got ReadableFile: my-file2.json
>>>>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>>>> processElement
>>>>> INFO: Got file contents for document_id my-file1.json
>>>>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>>>>> processElement
>>>>> INFO: Got file contents for document_id my-file2.json
>>>>>
>>>>> Note that end-to-end, these steps took about *13 minutes*. With SDK
>>>>> 2.23.0 and identical user code, the same section of the pipeline took *2
>>>>> seconds*:
>>>>>
>>>>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>>>> processElement
>>>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>>>> May 12, 2021 11:03:40 A.M.
>>>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>>>> INFO: Matched 2 files for pattern
>>>>> gs://my-bucket/my-dir/5004728247517184/**
>>>>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>>> processElement
>>>>> INFO: Got ReadableFile: my-file1.json
>>>>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>>> processElement
>>>>> INFO: Got ReadableFile: my-file2.json
>>>>> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>>>> processElement
>>>>> INFO: Got f

Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
my use case at least) regardless of use_deprecated_read setting.

Thanks,
Evan


On Wed, May 12, 2021 at 2:47 PM Steve Niemitz  wrote:

> use_deprecated_read was broken in 2.19 on the direct runner and didn't do
> anything. [1]  I don't think the fix is in 2.20 either, but will be in 2.21.
>
> [1] https://github.com/apache/beam/pull/14469
>
> On Wed, May 12, 2021 at 1:41 PM Evan Galpin  wrote:
>
>> I forgot to also mention that in all tests I was setting
>> --experiments=use_deprecated_read
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>> wrote:
>>
>>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>>
>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>>> FileIO.readMatches()  |  Read file contents  |  etc
>>>
>>> I have temporarily set up a transform between each step to log what's
>>> going on and illustrate timing issues.  I ran a series of tests changing
>>> only the SDK version each time since I hadn't noticed this performance
>>> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded the
>>> pubsub subscription with the exact same contents.
>>>
>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't seem
>>> to resolve) and onward show a significant slowdown.
>>>
>>> Here is a snippet of logging from v2.25.0:
>>>
>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>> processElement
>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:16:59 A.M.
>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>> INFO: Matched 2 files for pattern
>>> gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file1.json
>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file2.json
>>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file1.json
>>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file2.json
>>>
>>> Note that end-to-end, these steps took about *13 minutes*. With SDK
>>> 2.23.0 and identical user code, the same section of the pipeline took *2
>>> seconds*:
>>>
>>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>> processElement
>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:03:40 A.M.
>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>> INFO: Matched 2 files for pattern
>>> gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file1.json
>>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file2.json
>>> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file1.json
>>> *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file2.json
>>>
>>> Any thoughts on what could be causing this?
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Wed, May 12, 2021 at 9:53 AM Evan Galpin 
>>> wrote:
>>>
>>>>
>>>>
>>>> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang 
>>>> wrote:
>>>>
>>>>> Hi Evan,
>>>>>
>>>>> What do you mean startup delay? Is it the time that from you start the
>>>>> pipeline to the time that you notice the first output record from PubSub?
>>>>>
>>>>
>>>> Yes that's what I meant, the seemingly idle system waiting for pubsub
>>>> output despite data being in the subscription at pipeline start time.
>>>>
>>>> On 

Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
I forgot to also mention that in all tests I was setting
--experiments=use_deprecated_read

Thanks,
Evan

On Wed, May 12, 2021 at 1:39 PM Evan Galpin  wrote:

> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>
> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
> FileIO.readMatches()  |  Read file contents  |  etc
>
> I have temporarily set up a transform between each step to log what's
> going on and illustrate timing issues.  I ran a series of tests changing
> only the SDK version each time since I hadn't noticed this performance
> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded the
> pubsub subscription with the exact same contents.
>
> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't seem
> to resolve) and onward show a significant slowdown.
>
> Here is a snippet of logging from v2.25.0:
>
> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
> processElement
> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:16:59 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
> process
> INFO: Matched 2 files for pattern gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file1.json
> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file2.json
> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file1.json
> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file2.json
>
> Note that end-to-end, these steps took about *13 minutes*. With SDK
> 2.23.0 and identical user code, the same section of the pipeline took *2
> seconds*:
>
> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
> processElement
> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:03:40 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
> process
> INFO: Matched 2 files for pattern gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file1.json
> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file2.json
> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file1.json
> *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file2.json
>
> Any thoughts on what could be causing this?
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 9:53 AM Evan Galpin  wrote:
>
>>
>>
>> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang  wrote:
>>
>>> Hi Evan,
>>>
>>> What do you mean startup delay? Is it the time that from you start the
>>> pipeline to the time that you notice the first output record from PubSub?
>>>
>>
>> Yes that's what I meant, the seemingly idle system waiting for pubsub
>> output despite data being in the subscription at pipeline start time.
>>
>> On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía  wrote:
>>>
>>>> Can you try running direct runner with the option
>>>> `--experiments=use_deprecated_read`
>>>>
>>>
>> This seems to work for me, thanks for this! 👍
>>
>>
>>>> Seems like an instance of
>>>> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
>>>> also reported in
>>>> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
>>>>
>>>> We should rollback using the SDF wrapper by default because of the
>>>> usability and performance issues reported.
>>>>
>>>>
>>>> On Sat, May 8, 2021 at 12:57 AM Evan Galpin 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I’m experiencing very slow performance and startup delay when testing
>>>>> a pipeline locally. I’m reading data from a Google PubSub subscription as
>>>>> the data source, and before each pipeline execution I ensure that data is
>>>>> present in the subscription (readable from GCP console).
>>>>>
>>>>> I’m seeing startup delay on the order of minutes with DirectRunner
>>>>> (5-10 min). Is that expected? I did find a Jira ticket[1] that at first
>>>>> seemed related, but I think it has more to do with BQ than DirectRunner.
>>>>>
>>>>> I’ve run the pipeline with a debugger connected and confirmed that
>>>>> it’s minutes before the first DoFn in my pipeline receives any data. Is
>>>>> there a way I can profile the direct runner to see what it’s churning on?
>>>>>
>>>>> Thanks,
>>>>> Evan
>>>>>
>>>>> [1]
>>>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-4548
>>>>>
>>>>


Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
Hmm, I think I spoke too soon. I'm still seeing an issue of overall
DirectRunner slowness, not just pubsub. I have a pipeline like so:

Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
FileIO.readMatches()  |  Read file contents  |  etc

I have temporarily set up a transform between each step to log what's going
on and illustrate timing issues.  I ran a series of tests changing only the
SDK version each time since I hadn't noticed this performance issue with
2.19.0 (effectively git-bisect). Before each test, I seeded the pubsub
subscription with the exact same contents.

SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't seem to
resolve) and onward show a significant slowdown.

Here is a snippet of logging from v2.25.0:

*May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
processElement
INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
May 12, 2021 11:16:59 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
process
INFO: Matched 2 files for pattern gs://my-bucket/my-dir/5004728247517184/**
May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3 processElement
INFO: Got ReadableFile: my-file1.json
May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3 processElement
INFO: Got ReadableFile: my-file2.json
May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4 processElement
INFO: Got file contents for document_id my-file1.json
*May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
processElement
INFO: Got file contents for document_id my-file2.json

Note that end-to-end, these steps took about *13 minutes*. With SDK 2.23.0
and identical user code, the same section of the pipeline took *2 seconds*:

*May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
processElement
INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
May 12, 2021 11:03:40 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
process
INFO: Matched 2 files for pattern gs://my-bucket/my-dir/5004728247517184/**
May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3 processElement
INFO: Got ReadableFile: my-file1.json
May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3 processElement
INFO: Got ReadableFile: my-file2.json
May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4 processElement
INFO: Got file contents for document_id my-file1.json
*May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
processElement
INFO: Got file contents for document_id my-file2.json

Any thoughts on what could be causing this?

Thanks,
Evan

On Wed, May 12, 2021 at 9:53 AM Evan Galpin  wrote:

>
>
> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang  wrote:
>
>> Hi Evan,
>>
>> What do you mean startup delay? Is it the time that from you start the
>> pipeline to the time that you notice the first output record from PubSub?
>>
>
> Yes that's what I meant, the seemingly idle system waiting for pubsub
> output despite data being in the subscription at pipeline start time.
>
> On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía  wrote:
>>
>>> Can you try running direct runner with the option
>>> `--experiments=use_deprecated_read`
>>>
>>
> This seems to work for me, thanks for this! 👍
>
>
>>> Seems like an instance of
>>> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
>>> also reported in
>>> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
>>>
>>> We should rollback using the SDF wrapper by default because of the
>>> usability and performance issues reported.
>>>
>>>
>>> On Sat, May 8, 2021 at 12:57 AM Evan Galpin 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I’m experiencing very slow performance and startup delay when testing a
>>>> pipeline locally. I’m reading data from a Google PubSub subscription as the
>>>> data source, and before each pipeline execution I ensure that data is
>>>> present in the subscription (readable from GCP console).
>>>>
>>>> I’m seeing startup delay on the order of minutes with DirectRunner
>>>> (5-10 min). Is that expected? I did find a Jira ticket[1] that at first
>>>> seemed related, but I think it has more to do with BQ than DirectRunner.
>>>>
>>>> I’ve run the pipeline with a debugger connected and confirmed that it’s
>>>> minutes before the first DoFn in my pipeline receives any data. Is there a
>>>> way I can profile the direct runner to see what it’s churning on?
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> [1]
>>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-4548
>>>>
>>>


Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang  wrote:

> Hi Evan,
>
> What do you mean startup delay? Is it the time that from you start the
> pipeline to the time that you notice the first output record from PubSub?
>

Yes that's what I meant, the seemingly idle system waiting for pubsub
output despite data being in the subscription at pipeline start time.

On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía  wrote:
>
>> Can you try running direct runner with the option
>> `--experiments=use_deprecated_read`
>>
>
This seems to work for me, thanks for this! 👍


>> Seems like an instance of
>> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
>> also reported in
>> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
>>
>> We should rollback using the SDF wrapper by default because of the
>> usability and performance issues reported.
>>
>>
>> On Sat, May 8, 2021 at 12:57 AM Evan Galpin 
>> wrote:
>>
>>> Hi all,
>>>
>>> I’m experiencing very slow performance and startup delay when testing a
>>> pipeline locally. I’m reading data from a Google PubSub subscription as the
>>> data source, and before each pipeline execution I ensure that data is
>>> present in the subscription (readable from GCP console).
>>>
>>> I’m seeing startup delay on the order of minutes with DirectRunner (5-10
>>> min). Is that expected? I did find a Jira ticket[1] that at first seemed
>>> related, but I think it has more to do with BQ than DirectRunner.
>>>
>>> I’ve run the pipeline with a debugger connected and confirmed that it’s
>>> minutes before the first DoFn in my pipeline receives any data. Is there a
>>> way I can profile the direct runner to see what it’s churning on?
>>>
>>> Thanks,
>>> Evan
>>>
>>> [1]
>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-4548
>>>
>>


Re: Batch load with BigQueryIO fails because of a few bad records.

2021-05-06 Thread Evan Galpin
Hey Matthew,

I believe you might also need to use the “ignoreUnknownValues”[1] or
skipInvalidRows[2] options depending on your use case if your goal is to
allow valid entities to succeed even if invalid entities exist and
separately process failed entities via “getFailedResults”. You could also
consider introducing additional validation before the Write step if the
failure modes are predictable.

Thanks,
Evan

[1]
https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#ignoreUnknownValues--
[2]
https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html#skipInvalidRows--

On Thu, May 6, 2021 at 18:01 Matthew Ouyang 
wrote:

> I am loading a batch load of records with BigQueryIO.Write, but because
> some records don't match the target table schema the entire and the write
> step fails and nothing gets written to the table.  Is there a way for
> records that do match the target table schema to be inserted, and the
> records that don't match don't cause the entire step to fail?  I noticed
> BigQueryIO.Write returns a WriteResult that has a method getFailedInserts.
> Will that meet my needs?
>


Re: How avoid blocking when decompressing large GZIP files.

2021-04-23 Thread Evan Galpin
Hmm in my somewhat limited experience, I was not able to combine state and
Splittable DoFn. Definitely could be user error on my part though.

RE sequence numbers, could it work to embed those numbers in the CSV itself?

Thanks,
Evan

On Fri, Apr 23, 2021 at 07:55 Simon Gauld  wrote:

> Thank you and I will have a look however some concerns I have
>
> - the gzip itself is not splittable as such
> - I need to apply a sequence number 1..n so I believe the read *must* be
> sequential
>
> However what I am looking to achieve is handing off the newly decorated
> row as soon as the sequence is applied to it.   The issue is that the
> entire step of applying the sequence number appear to be blocking. Also of
> note, I am using a @DoFn.StateId.
>
> I'll look at SplittableDoFns, thanks.
>
>
> On Fri, Apr 23, 2021 at 12:50 PM Evan Galpin 
> wrote:
>
>> I could be wrong but I believe that if your large file is being read by a
>> DoFn, it’s likely that the file is being processed atomically inside that
>> DoFn, which cannot be parallelized further by the runner.
>>
>> One purpose-built way around that constraint is by using Splittable
>> DoFn[1][2] which could be used to allow each split to read a portion of the
>> file. I don’t know, however, how this might (or might not) work with
>> compression.
>>
>> [1]
>> https://beam.apache.org/blog/splittable-do-fn-is-available/
>> [2]
>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>
>> Thanks,
>> Evan
>>
>> On Fri, Apr 23, 2021 at 07:34 Simon Gauld  wrote:
>>
>>> Hello,
>>>
>>> I am trying to apply a transformation to each row in a reasonably large
>>> (1b row) gzip compressed CSV.
>>>
>>> The first operation is to assign a sequence number, in this case 1,2,3..
>>>
>>> The second operation is the actual transformation.
>>>
>>> I would like to apply the sequence number *as* each row is read from the
>>> compressed source and then hand off the 'real' transformation work in
>>> parallel, using DataFlow to autoscale the workers for the transformation.
>>>
>>> I don't seem to be able to scale *until* all rows have been read; this
>>> appears to be blocking the pipeline until decompression of the entire file
>>> is completed.   At this point DataFlow autoscaling works as expected, it
>>> scales upwards and throughput is then high. The issue is the decompression
>>> appears to block.
>>>
>>> My question: in beam, is it possible to stream records from a compressed
>>> source? without blocking the pipeline?
>>>
>>> thank you
>>>
>>> .s
>>>
>>>


Re: How avoid blocking when decompressing large GZIP files.

2021-04-23 Thread Evan Galpin
I could be wrong but I believe that if your large file is being read by a
DoFn, it’s likely that the file is being processed atomically inside that
DoFn, which cannot be parallelized further by the runner.

One purpose-built way around that constraint is by using Splittable
DoFn[1][2] which could be used to allow each split to read a portion of the
file. I don’t know, however, how this might (or might not) work with
compression.

[1]
https://beam.apache.org/blog/splittable-do-fn-is-available/
[2]
https://beam.apache.org/documentation/programming-guide/#splittable-dofns

Thanks,
Evan

On Fri, Apr 23, 2021 at 07:34 Simon Gauld  wrote:

> Hello,
>
> I am trying to apply a transformation to each row in a reasonably large
> (1b row) gzip compressed CSV.
>
> The first operation is to assign a sequence number, in this case 1,2,3..
>
> The second operation is the actual transformation.
>
> I would like to apply the sequence number *as* each row is read from the
> compressed source and then hand off the 'real' transformation work in
> parallel, using DataFlow to autoscale the workers for the transformation.
>
> I don't seem to be able to scale *until* all rows have been read; this
> appears to be blocking the pipeline until decompression of the entire file
> is completed.   At this point DataFlow autoscaling works as expected, it
> scales upwards and throughput is then high. The issue is the decompression
> appears to block.
>
> My question: in beam, is it possible to stream records from a compressed
> source? without blocking the pipeline?
>
> thank you
>
> .s
>
>


Re: Rate Limiting in Beam

2021-04-15 Thread Evan Galpin
Could you possibly use a side input with fixed interval triggering[1] to
query the Dataflow API to get the most recent log statement of scaling as
suggested here[2]?

[1]
https://beam.apache.org/documentation/patterns/side-inputs/
[2]
https://stackoverflow.com/a/54406878/6432284

On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen  wrote:

> Hi folks,
>
> I've been working on a custom PTransform that makes requests to another
> service, and would like to add a rate limiting feature there. The
> fundamental issue that I'm running into here is that I need a decent
> heuristic to estimate the worker count, so that each worker can
> independently set a limit which globally comes out to the right value. All
> of this is easy if I know how many machines I have, but I'd like to use
> Dataflow's autoscaling, which would easily break any pre-configured value.
> I have seen two main approaches for rate limiting, both for a configurable
> variable x:
>
>- Simply assume worker count is x, then divide by x to figure out the
>"local" limit. The issue I have here is that if we assume x is 500, but it
>is actually 50, I'm now paying for 50 nodes to throttle 10 times as much as
>necessary. I know the pipeline options have a reference to the runner, is
>it possible to get an approximate current worker count from that at bundle
>start (*if* runner is DataflowRunner)?
>- Add another PTransform in front of the API requests, which groups by
>x number of keys, throttles, and keeps forwarding elements with an instant
>trigger. I initially really liked this solution because even if x is
>misconfigured, I will have at most x workers running and throttle
>appropriately. However, I noticed that for batch pipelines, this
>effectively also caps the API request stage at x workers. If I throw in a
>`Reshuffle`, there is another GroupByKey (-> another stage), and nothing
>gets done until every element has passed through the throttler.
>
> Has anyone here tried to figure out rate limiting with Beam before, and
> perhaps run into similar issues? I would love to know if there is a
> preferred solution to this type of problem.
> I know sharing state like that runs a little counter to the Beam pipeline
> paradigm, but really all I need is an approximate worker count with few
> guarantees.
>
> Cheers,
> Daniel
>


Re: Does Beam DynamoDBIO support DynamoDB Streams?

2021-04-03 Thread Evan Galpin
Maybe others can answer more directly from personal experience about
reading streams using the existing DynamoDBIO connector, but it doesn’t
seem to support streams looking at the source and doc.

I haven’t done this myself, but it looks as though Kinesis is “the
recommended was to consume streams from Amazon DynamoDB”. I also found this
[1] guided example of using Beam with Kineses Data Analytics. It might not
be exactly what you’re after, but maybe there are some good building
blocks? Hope there’s something helpful in there.

[1]
https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-beam.html

On Sat, Apr 3, 2021 at 13:24 Tao Li  wrote:

> Hi Beam community,
>
>
>
> Does Beam DynamoDBIO support ingesting DynamoDB Streams
> ?
> Thanks!
>


Re: ElasticsearchIO Write Batching Problems

2018-12-07 Thread Evan Galpin
I've actually found that this was just a matter of pipeline processing speed. I 
removed many layers of transforms such that entities flowed through the 
pipeline faster, and saw the batch sizes increase. I think I may make a 
separate pipeline to take full advantage of batch indexing.

Thanks!

On 2018/12/07 14:36:44, Evan Galpin  wrote: 
> I forgot to reiterate that the PCollection on which EsIO operates is of type 
> String, where each element is a valid JSON document serialized _without_ 
> pretty printing (i.e. without line breaks). If the PCollection should be of a 
> different type, please let me know. From the EsIO source code, I believe it 
> is correct to have a PCollection of String
> 
> On 2018/12/07 14:33:17, Evan Galpin  wrote: 
> > Thanks for confirming that this is unexpected behaviour Tim; certainly the 
> > EsIO code looks to handle bundling. For the record, I've also confirmed via 
> > debugger that `flushBatch()` is not being triggered by large document size.
> > 
> > I'm sourcing records from Google's BigQuery.  I have 2 queries which each 
> > create a PCollection. I use a JacksonFactory to convert BigQuery results to 
> > a valid JSON string (confirmed valid via debugger + linter). I have a few 
> > Transforms to group the records from the 2 queries together, and then 
> > convert again to JSON string via Jackson. I do know that the system creates 
> > valid requests to the bulk API, it's just that it's only 1 document per 
> > request.
> > 
> > Thanks for starting the process with this. If there are other specific 
> > details that I can provide to be helpful, please let me know. Here are the 
> > versions of modules I'm using now:
> > 
> > Beam SDK (beam-sdks-java-core): 2.8.0
> > EsIO (beam-sdks-java-io-elasticsearch): 2.8.0
> > BigQuery IO (beam-sdks-java-io-google-cloud-platform): 2.8.0
> > DirectRunner (beam-runners-direct-java): 2.8.0
> > DataflowRunner (beam-runners-google-cloud-dataflow-java): 2.8.0
> > 
> > 
> > On 2018/12/07 06:36:58, Tim  wrote: 
> > > Hi Evan
> > > 
> > > That is definitely not the expected behaviour and I believe is covered in 
> > > tests which use DirectRunner. Are you able to share your pipeline code, 
> > > or describe how you source your records please? It could be that 
> > > something else is causing EsIO to see bundles sized at only one record.
> > > 
> > > I’ll verify ES IO behaviour when I get to a computer too.
> > > 
> > > Tim (on phone)
> > > 
> > > > On 6 Dec 2018, at 22:00, e...@calabs.ca  wrote:
> > > > 
> > > > Hi all,
> > > > 
> > > > I’m having a bit of trouble with ElasticsearchIO Write transform. I’m 
> > > > able to successfully index documents into my elasticsearch cluster, but 
> > > > batching does not seem to work. There ends up being a 1:1 ratio between 
> > > > HTTP requests sent to `/my-index/_doc/_bulk` and the number of 
> > > > documents in my PCollection to which I apply the ElasticsearchIO 
> > > > PTransform. I’ve noticed this specifically under the DirectRunner by 
> > > > utilizing a debugger.
> > > > 
> > > > Am I missing something? Is this possibly a difference between execution 
> > > > environments (Ex. DirectRunner Vs. DataflowRunner)? How can I make sure 
> > > > my program is taking advantage of batching/bulk indexing?
> > > > 
> > > > Thanks,
> > > > Evan
> > > 
> > 
> 


Re: ElasticsearchIO Write Batching Problems

2018-12-07 Thread Evan Galpin
I forgot to reiterate that the PCollection on which EsIO operates is of type 
String, where each element is a valid JSON document serialized _without_ pretty 
printing (i.e. without line breaks). If the PCollection should be of a 
different type, please let me know. From the EsIO source code, I believe it is 
correct to have a PCollection of String

On 2018/12/07 14:33:17, Evan Galpin  wrote: 
> Thanks for confirming that this is unexpected behaviour Tim; certainly the 
> EsIO code looks to handle bundling. For the record, I've also confirmed via 
> debugger that `flushBatch()` is not being triggered by large document size.
> 
> I'm sourcing records from Google's BigQuery.  I have 2 queries which each 
> create a PCollection. I use a JacksonFactory to convert BigQuery results to a 
> valid JSON string (confirmed valid via debugger + linter). I have a few 
> Transforms to group the records from the 2 queries together, and then convert 
> again to JSON string via Jackson. I do know that the system creates valid 
> requests to the bulk API, it's just that it's only 1 document per request.
> 
> Thanks for starting the process with this. If there are other specific 
> details that I can provide to be helpful, please let me know. Here are the 
> versions of modules I'm using now:
> 
> Beam SDK (beam-sdks-java-core): 2.8.0
> EsIO (beam-sdks-java-io-elasticsearch): 2.8.0
> BigQuery IO (beam-sdks-java-io-google-cloud-platform): 2.8.0
> DirectRunner (beam-runners-direct-java): 2.8.0
> DataflowRunner (beam-runners-google-cloud-dataflow-java): 2.8.0
> 
> 
> On 2018/12/07 06:36:58, Tim  wrote: 
> > Hi Evan
> > 
> > That is definitely not the expected behaviour and I believe is covered in 
> > tests which use DirectRunner. Are you able to share your pipeline code, or 
> > describe how you source your records please? It could be that something 
> > else is causing EsIO to see bundles sized at only one record.
> > 
> > I’ll verify ES IO behaviour when I get to a computer too.
> > 
> > Tim (on phone)
> > 
> > > On 6 Dec 2018, at 22:00, e...@calabs.ca  wrote:
> > > 
> > > Hi all,
> > > 
> > > I’m having a bit of trouble with ElasticsearchIO Write transform. I’m 
> > > able to successfully index documents into my elasticsearch cluster, but 
> > > batching does not seem to work. There ends up being a 1:1 ratio between 
> > > HTTP requests sent to `/my-index/_doc/_bulk` and the number of documents 
> > > in my PCollection to which I apply the ElasticsearchIO PTransform. I’ve 
> > > noticed this specifically under the DirectRunner by utilizing a debugger.
> > > 
> > > Am I missing something? Is this possibly a difference between execution 
> > > environments (Ex. DirectRunner Vs. DataflowRunner)? How can I make sure 
> > > my program is taking advantage of batching/bulk indexing?
> > > 
> > > Thanks,
> > > Evan
> > 
> 


Re: ElasticsearchIO Write Batching Problems

2018-12-07 Thread Evan Galpin
Thanks for confirming that this is unexpected behaviour Tim; certainly the EsIO 
code looks to handle bundling. For the record, I've also confirmed via debugger 
that `flushBatch()` is not being triggered by large document size.

I'm sourcing records from Google's BigQuery.  I have 2 queries which each 
create a PCollection. I use a JacksonFactory to convert BigQuery results to a 
valid JSON string (confirmed valid via debugger + linter). I have a few 
Transforms to group the records from the 2 queries together, and then convert 
again to JSON string via Jackson. I do know that the system creates valid 
requests to the bulk API, it's just that it's only 1 document per request.

Thanks for starting the process with this. If there are other specific details 
that I can provide to be helpful, please let me know. Here are the versions of 
modules I'm using now:

Beam SDK (beam-sdks-java-core): 2.8.0
EsIO (beam-sdks-java-io-elasticsearch): 2.8.0
BigQuery IO (beam-sdks-java-io-google-cloud-platform): 2.8.0
DirectRunner (beam-runners-direct-java): 2.8.0
DataflowRunner (beam-runners-google-cloud-dataflow-java): 2.8.0


On 2018/12/07 06:36:58, Tim  wrote: 
> Hi Evan
> 
> That is definitely not the expected behaviour and I believe is covered in 
> tests which use DirectRunner. Are you able to share your pipeline code, or 
> describe how you source your records please? It could be that something else 
> is causing EsIO to see bundles sized at only one record.
> 
> I’ll verify ES IO behaviour when I get to a computer too.
> 
> Tim (on phone)
> 
> > On 6 Dec 2018, at 22:00, e...@calabs.ca  wrote:
> > 
> > Hi all,
> > 
> > I’m having a bit of trouble with ElasticsearchIO Write transform. I’m able 
> > to successfully index documents into my elasticsearch cluster, but batching 
> > does not seem to work. There ends up being a 1:1 ratio between HTTP 
> > requests sent to `/my-index/_doc/_bulk` and the number of documents in my 
> > PCollection to which I apply the ElasticsearchIO PTransform. I’ve noticed 
> > this specifically under the DirectRunner by utilizing a debugger.
> > 
> > Am I missing something? Is this possibly a difference between execution 
> > environments (Ex. DirectRunner Vs. DataflowRunner)? How can I make sure my 
> > program is taking advantage of batching/bulk indexing?
> > 
> > Thanks,
> > Evan
>