Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-16 Thread Luke Cwik via user
I upgraded the docker version on Jenkins workers and the tests passed.
(also installed Python 3.11 so we are ready for that)

On Tue, Feb 14, 2023 at 3:21 PM Kenneth Knowles  wrote:

> SGTM. I asked on the PR if this could impact users, but having read the
> docker release calendar I am not concerned. The last update to the old
> version was in 2019, and the introduction of compatible versions was 2020.
>
> On Tue, Feb 14, 2023 at 3:01 PM Byron Ellis via user 
> wrote:
>
>> FWIW I am Team Upgrade Docker :-)
>>
>> On Tue, Feb 14, 2023 at 2:53 PM Luke Cwik via user 
>> wrote:
>>
>>> I made some progress in testing the container and did hit an issue where
>>> Ubuntu 22.04 "Jammy" is dependent on the version of Docker installed. It
>>> turns out that our boot.go crashes with "runtime/cgo: pthread_create
>>> failed: Operation not permitted" because the Ubuntu 22.04 is using new
>>> syscalls that Docker 18.09.4 doesn't have a seccomp policy for (and uses a
>>> default of deny). We have a couple of choices here:
>>> 1) upgrade the version of docker on Jenkins and require users to
>>> similarly use a new enough version of Docker so that this isn't an issue
>>> for them
>>> 2) use Ubuntu 20.04 "Focal" as the docker container
>>>
>>> I was using Docker 20.10.21 which is why I didn't hit this issue when
>>> testing the change locally.
>>>
>>> We could also do these but they same strictly worse then either of the
>>> two options discussed above:
>>> A) disable the seccomp policy on Jenkins
>>> B) use a custom seccomp policy on Jenkins
>>>
>>> My suggestion is to upgrade Docker versions on Jenkins and use Ubuntu
>>> 22.04 as it will have LTS releases till 2027 and then security patches till
>>> 2032 which gives everyone the longest runway till we need to swap OS
>>> versions again for users of Apache Beam. Any concerns or ideas?
>>>
>>>
>>>
>>> On Thu, Feb 9, 2023 at 10:20 AM Luke Cwik  wrote:
>>>
>>>> Our current container java 8 container is 262 MiBs and layers on top of
>>>> openjdk:8-bullseye which is 226 MiBs compressed while eclipse-temurin:8 is
>>>> 92 MiBs compressed and eclipse-temurin:8-alpine is 65 MiBs compressed.
>>>>
>>>> I would rather not get into issues with C library differences caused by
>>>> the alpine project so I would stick with the safer option and let users
>>>> choose alpine when building their custom container if they feel it provides
>>>> a large win for them. We can always swap to alpine in the future as well if
>>>> the C library differences become a non-issue.
>>>>
>>>> So swapping to eclipse-temurin will save us a bunch on the container
>>>> size which should help with container transfer and hopefully for startup
>>>> times as well.
>>>>
>>>> On Tue, Feb 7, 2023 at 5:41 PM Andrew Pilloud 
>>>> wrote:
>>>>
>>>>> This sounds reasonable to me as well.
>>>>>
>>>>> I've made swaps like this in the past, the base image of each is
>>>>> probably a bigger factor than the JDK. The openjdk images were based on
>>>>> Debian 11. The default eclipse-temurin images are based on Ubuntu 22.04
>>>>> with an alpine option. Ubuntu is a Debian derivative but the versions and
>>>>> package names aren't exact matches and Ubuntu tends to update a little
>>>>> faster. For most users I don't think this will matter but users building
>>>>> custom containers may need to make minor changes. The alpine option will 
>>>>> be
>>>>> much smaller (which could be a significant improvement) but would be a 
>>>>> more
>>>>> significant change to the environment.
>>>>>
>>>>> On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev <
>>>>> d...@beam.apache.org> wrote:
>>>>>
>>>>>> Seams reasonable to me.
>>>>>>
>>>>>> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>> >
>>>>>> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses
>>>>>> have stopped being built and supported since July 2022. I have filed [2] 
>>>>>> to
>>>>>> track the resolution of this issue.
>>>>>&

Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-14 Thread Luke Cwik via user
I made some progress in testing the container and did hit an issue where
Ubuntu 22.04 "Jammy" is dependent on the version of Docker installed. It
turns out that our boot.go crashes with "runtime/cgo: pthread_create
failed: Operation not permitted" because the Ubuntu 22.04 is using new
syscalls that Docker 18.09.4 doesn't have a seccomp policy for (and uses a
default of deny). We have a couple of choices here:
1) upgrade the version of docker on Jenkins and require users to similarly
use a new enough version of Docker so that this isn't an issue for them
2) use Ubuntu 20.04 "Focal" as the docker container

I was using Docker 20.10.21 which is why I didn't hit this issue when
testing the change locally.

We could also do these but they same strictly worse then either of the two
options discussed above:
A) disable the seccomp policy on Jenkins
B) use a custom seccomp policy on Jenkins

My suggestion is to upgrade Docker versions on Jenkins and use Ubuntu 22.04
as it will have LTS releases till 2027 and then security patches till 2032
which gives everyone the longest runway till we need to swap OS versions
again for users of Apache Beam. Any concerns or ideas?



On Thu, Feb 9, 2023 at 10:20 AM Luke Cwik  wrote:

> Our current container java 8 container is 262 MiBs and layers on top of
> openjdk:8-bullseye which is 226 MiBs compressed while eclipse-temurin:8 is
> 92 MiBs compressed and eclipse-temurin:8-alpine is 65 MiBs compressed.
>
> I would rather not get into issues with C library differences caused by
> the alpine project so I would stick with the safer option and let users
> choose alpine when building their custom container if they feel it provides
> a large win for them. We can always swap to alpine in the future as well if
> the C library differences become a non-issue.
>
> So swapping to eclipse-temurin will save us a bunch on the container size
> which should help with container transfer and hopefully for startup times
> as well.
>
> On Tue, Feb 7, 2023 at 5:41 PM Andrew Pilloud  wrote:
>
>> This sounds reasonable to me as well.
>>
>> I've made swaps like this in the past, the base image of each is probably
>> a bigger factor than the JDK. The openjdk images were based on Debian 11.
>> The default eclipse-temurin images are based on Ubuntu 22.04 with an alpine
>> option. Ubuntu is a Debian derivative but the versions and package names
>> aren't exact matches and Ubuntu tends to update a little faster. For most
>> users I don't think this will matter but users building custom containers
>> may need to make minor changes. The alpine option will be much smaller
>> (which could be a significant improvement) but would be a more significant
>> change to the environment.
>>
>> On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev <
>> d...@beam.apache.org> wrote:
>>
>>> Seams reasonable to me.
>>>
>>> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user 
>>> wrote:
>>> >
>>> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
>>> stopped being built and supported since July 2022. I have filed [2] to
>>> track the resolution of this issue.
>>> >
>>> > Based upon [1], almost everyone is swapping to the eclipse-temurin
>>> container[3] as their base based upon the linked issues from the
>>> deprecation notice[1]. The eclipse-temurin container is released under
>>> these licenses:
>>> > Apache License, Version 2.0
>>> > Eclipse Distribution License 1.0 (BSD)
>>> > Eclipse Public License 2.0
>>> > 一 (Secondary) GNU General Public License, version 2 with OpenJDK
>>> Assembly Exception
>>> > 一 (Secondary) GNU General Public License, version 2 with the GNU
>>> Classpath Exception
>>> >
>>> > I propose that we swap all our containers to the eclipse-temurin
>>> containers[3].
>>> >
>>> > Open to other ideas and also would be great to hear about your
>>> experience in any other projects that you have had to make a similar
>>> decision.
>>> >
>>> > 1: https://github.com/docker-library/openjdk/issues/505
>>> > 2: https://github.com/apache/beam/issues/25371
>>> > 3: https://hub.docker.com/_/eclipse-temurin
>>>
>>


Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-09 Thread Luke Cwik via user
Our current container java 8 container is 262 MiBs and layers on top of
openjdk:8-bullseye which is 226 MiBs compressed while eclipse-temurin:8 is
92 MiBs compressed and eclipse-temurin:8-alpine is 65 MiBs compressed.

I would rather not get into issues with C library differences caused by the
alpine project so I would stick with the safer option and let users choose
alpine when building their custom container if they feel it provides a
large win for them. We can always swap to alpine in the future as well if
the C library differences become a non-issue.

So swapping to eclipse-temurin will save us a bunch on the container size
which should help with container transfer and hopefully for startup times
as well.

On Tue, Feb 7, 2023 at 5:41 PM Andrew Pilloud  wrote:

> This sounds reasonable to me as well.
>
> I've made swaps like this in the past, the base image of each is probably
> a bigger factor than the JDK. The openjdk images were based on Debian 11.
> The default eclipse-temurin images are based on Ubuntu 22.04 with an alpine
> option. Ubuntu is a Debian derivative but the versions and package names
> aren't exact matches and Ubuntu tends to update a little faster. For most
> users I don't think this will matter but users building custom containers
> may need to make minor changes. The alpine option will be much smaller
> (which could be a significant improvement) but would be a more significant
> change to the environment.
>
> On Tue, Feb 7, 2023 at 5:18 PM Robert Bradshaw via dev <
> d...@beam.apache.org> wrote:
>
>> Seams reasonable to me.
>>
>> On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user 
>> wrote:
>> >
>> > As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
>> stopped being built and supported since July 2022. I have filed [2] to
>> track the resolution of this issue.
>> >
>> > Based upon [1], almost everyone is swapping to the eclipse-temurin
>> container[3] as their base based upon the linked issues from the
>> deprecation notice[1]. The eclipse-temurin container is released under
>> these licenses:
>> > Apache License, Version 2.0
>> > Eclipse Distribution License 1.0 (BSD)
>> > Eclipse Public License 2.0
>> > 一 (Secondary) GNU General Public License, version 2 with OpenJDK
>> Assembly Exception
>> > 一 (Secondary) GNU General Public License, version 2 with the GNU
>> Classpath Exception
>> >
>> > I propose that we swap all our containers to the eclipse-temurin
>> containers[3].
>> >
>> > Open to other ideas and also would be great to hear about your
>> experience in any other projects that you have had to make a similar
>> decision.
>> >
>> > 1: https://github.com/docker-library/openjdk/issues/505
>> > 2: https://github.com/apache/beam/issues/25371
>> > 3: https://hub.docker.com/_/eclipse-temurin
>>
>


OpenJDK8 / OpenJDK11 container deprecation

2023-02-07 Thread Luke Cwik via user
As per [1], the JDK8 and JDK11 containers that Apache Beam uses have
stopped being built and supported since July 2022. I have filed [2] to
track the resolution of this issue.

Based upon [1], almost everyone is swapping to the eclipse-temurin
container[3] as their base based upon the linked issues from the
deprecation notice[1]. The eclipse-temurin container is released under
these licenses:
Apache License, Version 2.0
Eclipse Distribution License 1.0 (BSD)
Eclipse Public License 2.0
一 (Secondary) GNU General Public License, version 2 with OpenJDK Assembly
Exception
一 (Secondary) GNU General Public License, version 2 with the GNU Classpath
Exception

I propose that we swap all our containers to the eclipse-temurin
containers[3].

Open to other ideas and also would be great to hear about your experience
in any other projects that you have had to make a similar decision.

1: https://github.com/docker-library/openjdk/issues/505
2: https://github.com/apache/beam/issues/25371
3: https://hub.docker.com/_/eclipse-temurin


Re: Dataflow and mounting large data sets

2023-01-31 Thread Luke Cwik via user
I would also suggest looking at NFS client implementations in Java that
would allow you to talk to the NFS server without needing to mount it
within the OS. A quick search yielded https://github.com/raisercostin/yanfs
or https://github.com/EMCECS/nfs-client-java

On Tue, Jan 31, 2023 at 3:31 PM Chad Dombrova  wrote:

> Thanks for the info.  We are going to test this further and we'll let you
> know how it goes.
>
> -chad
>
>
> On Mon, Jan 30, 2023 at 2:14 PM Valentyn Tymofieiev 
> wrote:
>
>> It applies to custom containers as well. You can find the container
>> manifest in the GCE VM metadata, and it should have an entry for privileged
>> mode. The reason for this was to enable GPU accelerator support, but agree
>> with Robert that it is not part of any contracts, so in theory this could
>> change or perhaps be more strictly limited to accelerator support. In fact,
>> originally, this was only enabled for pipelines using accelerators but for
>> purely internal implementation details I believe it is currently enabled
>> for all pipelines.
>>
>> So for prototyping purposes I think you could try it, but I can't make
>> any guarantees in this thread that privileged mode will continue to work.
>>
>> cc: @Aaron Li  FYI
>>
>>
>> On Mon, Jan 30, 2023 at 12:16 PM Robert Bradshaw 
>> wrote:
>>
>>> I'm also not sure it's part of the contract that the containerization
>>> technology we use will always have these capabilities.
>>>
>>> On Mon, Jan 30, 2023 at 10:53 AM Chad Dombrova 
>>> wrote:
>>> >
>>> > Hi Valentyn,
>>> >
>>> >>
>>> >> Beam SDK docker containers on Dataflow VMs are currently launched in
>>> privileged mode.
>>> >
>>> >
>>> > Does this only apply to stock sdk containers?  I'm asking because we
>>> use a custom sdk container that we build.  We've tried various ways of
>>> running mount from within our custom beam container in Dataflow and we
>>> could not get it to work, while the same thing succeeds in local tests and
>>> in our CI (gitlab).  The assessment at the time (this was maybe a year ago)
>>> was that the container was not running in privileged mode, but if you think
>>> that's incorrect we can revisit this and report back with some error logs.
>>> >
>>> > -chad
>>> >
>>>
>>


Re: KafkaIo Metrics

2023-01-20 Thread Luke Cwik via user
KafkaIO#commitOffsetsInFinalize[1] is likely what you want if you want to
see Kafka's view of how the pipeline is consuming from it since the
pipeline will ensure that offsets are committed as the pipeline has
guaranteed to ingest the data.

I would suggested to use pipeline level concepts and metrics so have you
considered looking at pipeline level metrics like:
* PCollection elements processed/size instead of bytes-consumed-rate
* watermark lag / processing time lag instead of records-lag

Obviously if your trying to dig down into an existing problem then it sure
does make sense to look at Kafka level metrics if pipeline level metrics
are tell you that there is a problem in the part of the pipeline containing
Kafka.

1:
https://beam.apache.org/releases/javadoc/2.44.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--



On Fri, Jan 20, 2023 at 8:28 AM Alexey Romanenko 
wrote:

> IIRC, we don’t expose any Kafka Consumer metrics, so I’m afraid, that
> there is no easy way to get them in a Beam pipeline.
>
> —
> Alexey
>
> On 18 Jan 2023, at 21:43, Lydian  wrote:
>
> Hi,
> I know that Beam KafkaIO doesn't use the native kafka offset, and
> therefore I cannot use kafka metrics directly.
>
> Wondering what would be the right way to expose those metrics of my
> KafkaIO pipeline?
> Things I am interested includes:
>
>- bytes-consumed-rate
>- fetch-latency-avg
>- records-lag
>- commit-rate
>
> consumer lagWondering how people get these metrics or instead of doing
> this? or we should just enable `commit_offset_in_finalize` and then use the
> Kafka metrics directly?
>
> also wondering if there's anything to notice when enabling the
> commit_offset_in_finalize? Thanks!
>
> Sincerely,
> Lydian Lee
>
>
>


Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Luke Cwik via user
Sorry, I should have said that you should Flatten and do a GroupByKey, not
a CoGroupByKey making the pipeline like:
PCollectionA -> Flatten -> GroupByKey -> ParDo(EmitOnlyFirstElementPerKey)
PCollectionB -/

The CoGroupByKey will have one iterable per PCollection containing zero or
more elements depending on how many elements each PCollection had for that
key. So yes you could solve it with CoGroupByKey but Flatten+GroupByKey is
much simpler.

On Wed, Aug 10, 2022 at 1:31 PM Shivam Singhal 
wrote:

> Think this should solve my problem.
>
> Thanks Evan ans Luke!
>
> On Thu, 11 Aug 2022 at 1:49 AM, Luke Cwik via user 
> wrote:
>
>> Use CoGroupByKey to join the two PCollections and emit only the first
>> value of each iterable with the key.
>>
>> Duplicates will appear as iterables with more then one value while keys
>> without duplicates will have iterables containing exactly one value.
>>
>> On Wed, Aug 10, 2022 at 12:25 PM Shivam Singhal <
>> shivamsinghal5...@gmail.com> wrote:
>>
>>> I have two PCollections, CollectionA & CollectionB of type KV>> Byte[]>.
>>>
>>>
>>> I would like to merge them into one PCollection but CollectionA &
>>> CollectionB might have some elements with the same key. In those repeated
>>> cases, I would like to keep the element from CollectionA & drop the
>>> repeated element from CollectionB.
>>>
>>> Does anyone know a simple method to do this?
>>>
>>> Thanks,
>>> Shivam Singhal
>>>
>>


Re: [JAVA] Handling repeated elements when merging two pcollections

2022-08-10 Thread Luke Cwik via user
Use CoGroupByKey to join the two PCollections and emit only the first value
of each iterable with the key.

Duplicates will appear as iterables with more then one value while keys
without duplicates will have iterables containing exactly one value.

On Wed, Aug 10, 2022 at 12:25 PM Shivam Singhal 
wrote:

> I have two PCollections, CollectionA & CollectionB of type KV Byte[]>.
>
>
> I would like to merge them into one PCollection but CollectionA &
> CollectionB might have some elements with the same key. In those repeated
> cases, I would like to keep the element from CollectionA & drop the
> repeated element from CollectionB.
>
> Does anyone know a simple method to do this?
>
> Thanks,
> Shivam Singhal
>


Re: SDK Worker availability metrics

2022-08-10 Thread Luke Cwik via user
Flink has a set of workers, each worker has a number of task slots. A
pipeline will use the number of slots based upon what it was configured to
run with.

Are you trying to get the total number of workers, total number of tasks
slots, number of task slots your pipeline is using or number of workers
your pipeline is executing on?

I was under the impression that the first two were properties of the Flink
cluster and don't change while the third property is configured at job
submission time and also doesn't change.

I may not be understanding what you're trying to measure and why at
pipeline runtime for Flink since many of these values don't change through
the lifetime of the cluster and/or job.

On Mon, Aug 8, 2022 at 4:59 PM aryan m  wrote:

> Hi Luke!
> Thanks !! We use the Flink Runner and run SDK workers as processes [1]
> within a k8s pod. Can you please share broad steps on how one can do in the
> runner ?
>
>
> [1]
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
>
>
> On Mon, Aug 8, 2022 at 8:51 AM Luke Cwik via user 
> wrote:
>
>> That code only executes within a runner and is only used by certain
>> runners and wouldn't work in general from user code that is monitoring the
>> job or user code executing within one of the workers.
>>
>> You would need to author code that is likely runner specific to look up
>> the number of workers associated with a job as I don't believe there is a
>> general way to do this for an arbitrary Apache Beam runner.
>>
>>  Which runner would you most likely want to use?
>>
>> On Sun, Aug 7, 2022 at 1:02 PM aryan m  wrote:
>>
>>> Hi Users!
>>> Is there a recommended approach to publish metrics on the number of
>>> sdk workers available/running as a gauge ?
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L267
>>> [2]
>>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L148
>>>
>>>
>>> -- Aryan
>>>
>>


Re: SDK Worker availability metrics

2022-08-08 Thread Luke Cwik via user
That code only executes within a runner and is only used by certain runners
and wouldn't work in general from user code that is monitoring the job or
user code executing within one of the workers.

You would need to author code that is likely runner specific to look up the
number of workers associated with a job as I don't believe there is a
general way to do this for an arbitrary Apache Beam runner.

 Which runner would you most likely want to use?

On Sun, Aug 7, 2022 at 1:02 PM aryan m  wrote:

> Hi Users!
> Is there a recommended approach to publish metrics on the number of
> sdk workers available/running as a gauge ?
>
>
> [1]
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L267
> [2]
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java#L148
>
>
> -- Aryan
>


Re: [Dataflow][Java][stateful] Workflow Failed when trying to introduce stateful RateLimit

2022-08-05 Thread Luke Cwik via user
You could try Dataflow Runner v2. The difference in the implementation may
allow you to work around what is impacting the pipelines.

On Fri, Aug 5, 2022 at 9:40 AM Evan Galpin  wrote:

> Thanks Luke, I've opened a support case as well but thought it would be
> prudent to ask here in case there was something obvious with the code.  Is
> there any additional/optional validation that I can opt to use when
> building and deploying the pipeline that might give hints? Otherwise I'll
> just wait on the support case.
>
> Thanks,
> Evan
>
> On Fri, Aug 5, 2022 at 11:22 AM Luke Cwik via user 
> wrote:
>
>> I took a look at the code and nothing obvious stood out to me in the code
>> as this is a ParDo with OnWindowExpiration. Just to make sure, the rate
>> limit is per key and would only be a global rate limit if there was a
>> single key.
>>
>> Are the workers trying to start?
>> * If no, then you would need to open a support case and share some
>> job ids so that someone could debug internal service logs.
>> * If yes, then did the workers start successfully?
>> ** If no, logs should have some details as to why the worker couldn't
>> start.
>> ** If yes, are the workers getting work items?
>> *** If no, then you would need to open a support case and share some
>> job ids so that someone could debug internal service logs.
>> *** If yes then the logs should have some details as to why the work
>> items are failing.
>>
>>
>> On Fri, Aug 5, 2022 at 7:36 AM Evan Galpin  wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to create a RateLimit[1] transform that's based fairly
>>> heavily on GroupIntoBatches[2]. I've been able to run unit tests using
>>> TestPipeline to verify desired behaviour and have also run successfully
>>> using DirectRunner.  However, when I submit the same job to Dataflow it
>>> completely fails to start and only gives the error message "Workflow
>>> Failed." The job builds/uploads/submits without error, but never starts and
>>> gives no detail as to why.
>>>
>>> Is there anything I can do to gain more insight about what is going
>>> wrong?  I've included a gist of the RateLimit[1] code in case there is
>>> anything obvious wrong there.
>>>
>>> Thanks in advance,
>>> Evan
>>>
>>> [1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
>>> [2]
>>> https://github.com/apache/beam/blob/c8d92b03b6b2029978dbc2bf824240232c5e61ac/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
>>>
>>


Re: [Dataflow][Java][stateful] Workflow Failed when trying to introduce stateful RateLimit

2022-08-05 Thread Luke Cwik via user
I took a look at the code and nothing obvious stood out to me in the code
as this is a ParDo with OnWindowExpiration. Just to make sure, the rate
limit is per key and would only be a global rate limit if there was a
single key.

Are the workers trying to start?
* If no, then you would need to open a support case and share some job ids
so that someone could debug internal service logs.
* If yes, then did the workers start successfully?
** If no, logs should have some details as to why the worker couldn't start.
** If yes, are the workers getting work items?
*** If no, then you would need to open a support case and share some
job ids so that someone could debug internal service logs.
*** If yes then the logs should have some details as to why the work items
are failing.


On Fri, Aug 5, 2022 at 7:36 AM Evan Galpin  wrote:

> Hi all,
>
> I'm trying to create a RateLimit[1] transform that's based fairly heavily
> on GroupIntoBatches[2]. I've been able to run unit tests using TestPipeline
> to verify desired behaviour and have also run successfully using
> DirectRunner.  However, when I submit the same job to Dataflow it
> completely fails to start and only gives the error message "Workflow
> Failed." The job builds/uploads/submits without error, but never starts and
> gives no detail as to why.
>
> Is there anything I can do to gain more insight about what is going
> wrong?  I've included a gist of the RateLimit[1] code in case there is
> anything obvious wrong there.
>
> Thanks in advance,
> Evan
>
> [1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
> [2]
> https://github.com/apache/beam/blob/c8d92b03b6b2029978dbc2bf824240232c5e61ac/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
>


Re: [Dataflow][Python] Guidance on HTTP ingestion on Dataflow

2022-07-19 Thread Luke Cwik via user
Even if you don't have the resource ids ahead of time, you can have a
pipeline like:
Impulse -> ParDo(GenerateResourceIds) -> Reshuffle ->
ParDo(ReadResourceIds) -> ...

You could also compose these as splittable DoFns [1, 2, 3]:
ParDo(SplittableGenerateResourceIds) -> ParDo(SplittableReadResourceIds)

The first approach is the simplest as the reshuffle will rebalance the
reading of each resource id across worker nodes but is limited in
generating resource ids on one worker. Making the generation a splittable
DoFn will mean that you can increase the parallelism of generation which is
important if there are so many that it could crash a worker or fail to have
the output committed (these kinds of failures are runner dependent on how
well they handle single bundles with large outputs). Making the reading
splittable allows you to handle a large resource (imagine a large file) so
that it can be read and processed in parallel (and will have similar
failures if the runner can't handle single bundles with large outputs).

You can always start with the first solution and swap either piece to be a
splittable DoFn depending on your performance requirements and how well the
simple solution works.

1: https://beam.apache.org/blog/splittable-do-fn/
2: https://beam.apache.org/blog/splittable-do-fn-is-available/
3: https://beam.apache.org/documentation/programming-guide/#splittable-dofns


On Tue, Jul 19, 2022 at 10:05 AM Damian Akpan 
wrote:

> Provided you have all the resources ids ahead of fetching, Beam will
> spread the fetches to its workers. It will still fetch synchronously but
> within that worker.
>
> On Tue, Jul 19, 2022 at 5:40 PM Shree Tanna  wrote:
>
>> Hi all,
>>
>> I'm planning to use Apache beam to extract and load part of the ETL
>> pipeline and run the jobs on Dataflow. I will have to do the REST API
>> ingestion on our platform. I can opt to make sync API calls from DoFn. With
>> that pipelines will stall while REST requests are made over the network.
>>
>> Is it best practice to run the REST ingestion job on Dataflow? Is there
>> any best practice I can follow to accomplish this? Just as a reference I'm
>> adding this
>> 
>> StackOverflow thread here too. Also, I notice that Rest I/O transform
>>  built-in connector
>> is in progress for Java.
>>
>> Let me know if this is the right group to ask this question. I can also
>> ask d...@beam.apache.org if needed.
>> --
>> Thanks,
>> Shree
>>
>


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-08 Thread Luke Cwik via user
I was suggesting GCP support mainly because I don't think you want to share
the 2.36 and 2.40 version of your job file publicly as someone familiar
with the layout and format may spot a meaningful difference.

Also, if it turns out that there is no meaningful difference between the
two then the internal mechanics of how the graph is modified by Dataflow is
not surfaced back to you in enough depth to debug further.



On Fri, Jul 8, 2022 at 6:12 AM Evan Galpin  wrote:

> Thanks for your response Luke :-)
>
> Updating in 2.36.0 works as expected, but as you alluded to I'm attempting
> to update to the latest SDK; in this case there are no code changes in the
> user code, only the SDK version.  Is GCP support the only tool when it
> comes to deciphering the steps added by Dataflow?  I would love to be able
> to inspect the complete graph with those extra steps like
> "Unzipped-2/FlattenReplace" that aren't in the job file.
>
> Thanks,
> Evan
>
> On Wed, Jul 6, 2022 at 4:21 PM Luke Cwik via user 
> wrote:
>
>> Does doing a pipeline update in 2.36 work or do you want to do an update
>> to get the latest version?
>>
>> Feel free to share the job files with GCP support. It could be something
>> internal but the coders for ephemeral steps that Dataflow adds are based
>> upon existing coders within the graph.
>>
>> On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:
>>
>>> +dev@
>>>
>>> Reviving this thread as it has hit me again on Dataflow.  I am trying to
>>> upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally, I
>>> received an error that the step "Flatten.pCollections" was missing from the
>>> new job graph.  I knew from the code that that wasn't true, so I dumped the
>>> job file via "--dataflowJobFile" for both the running pipeline and for the
>>> new version I'm attempting to update to.  Both job files showed identical
>>> data for the Flatten.pCollections step, which raises the question of why
>>> that would have been reported as missing.
>>>
>>> Out of curiosity I then tried mapping the step to the same name, which
>>> changed the error to:  "The Coder or type for step
>>> Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
>>> job files show identical coders for the Flatten step (though
>>> "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
>>> internal Dataflow thing?), so I'm confident that the coder hasn't actually
>>> changed.
>>>
>>> I'm not sure how to proceed in updating the running pipeline, and I'd
>>> really prefer not to drain.  Any ideas?
>>>
>>> Thanks,
>>> Evan
>>>
>>>
>>> On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin 
>>> wrote:
>>>
>>>> Thanks for the ideas Luke. I checked out the json graphs as per your
>>>> recommendation (thanks for that, was previously unaware), and the
>>>> "output_info" was identical for both the running pipeline and the pipeline
>>>> I was hoping to update it with.  I ended up opting to just drain and submit
>>>> the updated pipeline as a new job.  Thanks for the tips!
>>>>
>>>> Thanks,
>>>> Evan
>>>>
>>>> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>>>>
>>>>> I would suggest dumping the JSON representation (with the
>>>>> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
>>>>> and looking to see what is being submitted to Dataflow. Dataflow's JSON
>>>>> graph representation is a bipartite graph where there are transform nodes
>>>>> with inputs and outputs and PCollection nodes with no inputs or outputs.
>>>>> The PCollection nodes typically end with the suffix ".out". This could 
>>>>> help
>>>>> find steps that have been added/removed/renamed.
>>>>>
>>>>> The PipelineDotRenderer[1] might be of use as well.
>>>>>
>>>>> 1:
>>>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>>>>>
>>>>> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm looking for any help regarding updating streaming jobs which are
>>>>>> already running on Dataflow.  Specifically I'm seeking guidance for
>>>>>> situations where Fusion is involved, and trying to decipher which old 
>>>>>> steps
>>>>>> should be mapped to which new steps.
>>>>>>
>>>>>> I have a case where I updated the steps which come after the step in
>>>>>> question, but when I attempt to update there is an error that "
>>>>>> no longer produces data to the steps ". I believe that
>>>>>>  is only changed as a result of fusion, and in reality it does 
>>>>>> in
>>>>>> fact produce data to  (confirmed when deployed as a new
>>>>>> job for testing purposes).
>>>>>>
>>>>>> Is there a guide for how to deal with updates and fusion?
>>>>>>
>>>>>> Thanks,
>>>>>> Evan
>>>>>>
>>>>>


Re: [Dataflow][Java] Guidance on Transform Mapping Streaming Update

2022-07-06 Thread Luke Cwik via user
Does doing a pipeline update in 2.36 work or do you want to do an update to
get the latest version?

Feel free to share the job files with GCP support. It could be something
internal but the coders for ephemeral steps that Dataflow adds are based
upon existing coders within the graph.

On Tue, Jul 5, 2022 at 8:03 AM Evan Galpin  wrote:

> +dev@
>
> Reviving this thread as it has hit me again on Dataflow.  I am trying to
> upgrade an active streaming pipeline from 2.36.0 to 2.40.0.  Originally, I
> received an error that the step "Flatten.pCollections" was missing from the
> new job graph.  I knew from the code that that wasn't true, so I dumped the
> job file via "--dataflowJobFile" for both the running pipeline and for the
> new version I'm attempting to update to.  Both job files showed identical
> data for the Flatten.pCollections step, which raises the question of why
> that would have been reported as missing.
>
> Out of curiosity I then tried mapping the step to the same name, which
> changed the error to:  "The Coder or type for step
> Flatten.pCollections/Unzipped-2/FlattenReplace has changed."  Again, the
> job files show identical coders for the Flatten step (though
> "Unzipped-2/FlattenReplace" is not present in the job file, maybe an
> internal Dataflow thing?), so I'm confident that the coder hasn't actually
> changed.
>
> I'm not sure how to proceed in updating the running pipeline, and I'd
> really prefer not to drain.  Any ideas?
>
> Thanks,
> Evan
>
>
> On Fri, Oct 22, 2021 at 3:36 PM Evan Galpin  wrote:
>
>> Thanks for the ideas Luke. I checked out the json graphs as per your
>> recommendation (thanks for that, was previously unaware), and the
>> "output_info" was identical for both the running pipeline and the pipeline
>> I was hoping to update it with.  I ended up opting to just drain and submit
>> the updated pipeline as a new job.  Thanks for the tips!
>>
>> Thanks,
>> Evan
>>
>> On Thu, Oct 21, 2021 at 7:02 PM Luke Cwik  wrote:
>>
>>> I would suggest dumping the JSON representation (with the
>>> --dataflowJobFile=/path/to/output.json) of the pipeline before and after
>>> and looking to see what is being submitted to Dataflow. Dataflow's JSON
>>> graph representation is a bipartite graph where there are transform nodes
>>> with inputs and outputs and PCollection nodes with no inputs or outputs.
>>> The PCollection nodes typically end with the suffix ".out". This could help
>>> find steps that have been added/removed/renamed.
>>>
>>> The PipelineDotRenderer[1] might be of use as well.
>>>
>>> 1:
>>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/renderer/PipelineDotRenderer.java
>>>
>>> On Thu, Oct 21, 2021 at 11:54 AM Evan Galpin 
>>> wrote:
>>>
 Hi all,

 I'm looking for any help regarding updating streaming jobs which are
 already running on Dataflow.  Specifically I'm seeking guidance for
 situations where Fusion is involved, and trying to decipher which old steps
 should be mapped to which new steps.

 I have a case where I updated the steps which come after the step in
 question, but when I attempt to update there is an error that "
 no longer produces data to the steps ". I believe that
  is only changed as a result of fusion, and in reality it does in
 fact produce data to  (confirmed when deployed as a new
 job for testing purposes).

 Is there a guide for how to deal with updates and fusion?

 Thanks,
 Evan

>>>