Thanks.

I am trying a fail-fast approach here, so I thought the best thing is to
validate the options before starting the pipeline.

I think validating the options in processElements on each element is
wasteful since the validation code will run many times instead of just
once. The options are not available for DoFn's @Setup method - only for
@StartBundle. Is there any better place to validate the options except for
@StartBundle?

Thanks

On Tue, Feb 1, 2022 at 4:10 PM Chris Soujon <ch...@doit-intl.com> wrote:

> Hi Ori,
>
> The error message hints that you are trying to use a ValueProvider during
> pipeline construction. You should only call the .get() method from code
> that is run during pipeline execution, i.e. in a lambda for MapElements or
> in a DoFn's processElements.
>
> Check the Dataflow docs on this topic here [1]. It also spells out your
> error message.
>
> Best,
> Chris
>
>
> [1]
> https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#about-runtime-parameters-and-the-valueprovider-interface
>
> On Tue, Feb 1, 2022 at 2:54 PM Ori Popowski <ori....@gmail.com> wrote:
>
>> Hi all!
>>
>> I am extending PipelineOptions and I have getters/setters for several
>> properties which are passed to my job from Dataflow. The parameters are
>> indeed passed correctly and I can access them from my pipeline.
>>
>> However, I'd like to have some custom logic for my parameters which
>> cannot be expressed by a metadata file with regexes, or
>> @Validation.Required annotation.
>>
>> I tried to add the following code:
>>
>> val options = PipelineOptionsFactory.fromArgs(args:
>> _*).withValidation().create().as(classOf[JobOptions])
>>
>> if (options.getUserId().get() == "…" && …) {
>>     throw new IllegalArgumentException("oops")
>> }
>>
>> Unfortunately, I am getting this exception when I am staging the pipeline
>> to Dataflow with mvn -Pdataflow-runner compile exec:java …:
>>
>> [info] running (fork) walkme.Main --runner=DataflowRunner --project=…
>> --stagingLocation=gs://…/staging --templateLocation=gs://…/template
>> --region=europe-west3
>> [error] Exception in thread "main" java.lang.IllegalStateException: Value
>> only available at runtime, but accessed from a non-runtime context:
>> RuntimeValueProvider{propertyName=userId, default=null}
>> [error]         at
>> org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:254)
>> [error]         at walkme.Main$.main(Main.scala:13)
>> [error]         at walkme.Main.main(Main.scala)
>>
>> What is the recommended way to achieve what I described?
>>
>> Thanks!
>>
>
>
> --
> [image: DoiT International] <https://www.doit-intl.com/> Chris Soujon
> <ch...@doit-intl.com>
> Staff Cloud Architect
> EMEA North
> [image: facebook] <https://fb.me/DoIT.International> [image: twitter]
> <https://twitter.com/doitint> [image: linkedin]
> <https://www.linkedin.com/company/doitintl> [image: medium]
> <https://blog.doit-intl.com/>
>

Reply via email to