Thanks for pointing to that thread.

1) I'm - as well as Kyle - fine with the approach that we use a "preferred environment" for the expansion service. We only need to pass it via command line. Yes, the command line might be generally SDK-dependent, and that makes it expansion dependent, because whether or not particular transform is "external" or not is implementation detail. That is the nasty part. The rest of my original question is about, how exactly to do that, because it seems to be tricky, due to the fact, that it is not possible to include runtime dependency on DirectRunner (fails many, many tests) and it is not possible to extract PipelineOptions as a Map either.

2) Regarding SDK injecting environment, I still think that is the correct way. The SDK (the driver code) own the execution environment. It should be able to define (or at least prioritize) runtime environments of all transforms. If we cannot know in advance, which transform is going to expand to how many nested (and possibly external) transforms, I think that the SDK could be fine with providing a Map(SDK -> environment). That is: "Run Java using PROCESS", "Run Python using DOCKER", and so on. A default mapping might exist on the expansion service as well (which might be passed through command line and that is the point 1)). Yes, the Map approach is definitely not universal, because one can imagine that the SDK itself is not enough for specifying the environment, but seems that vast majority of cases would fit into that.

3) The best might be for the SDK to provide a list of supported environments with additional metrics which the expansion service might choose from.

These three approaches are all extensions to the current state. Current state has predefined environment without possibility to change it. Option 1) changes it to single configurable environment, option 2) to N environments based on SDK and option 3) to M environments based on SDK-dependent metrics (and/or capabilitites of particular environment). Seems like gradual extensions of the current state, so maybe we can focus on the first one, and maybe add other, when there is a need?

If this could be the first conclusion, then the next one would be, what should be the preferred way to implement it.

WDYT?

On 6/29/21 9:15 PM, Robert Bradshaw wrote:
+1, thanks for digging up that thread.

I am still of the same opinion that I wrote there. To touch on some
things brought up here, copying something like
defaultEnvironmentConfig doesn't make sense from language to language
(e.g. the docker image name or CLI arguments for subprocess mode just
isn't going to work for all of Python, Java, and Go, and clearly
embedded type is only going to work for one.)

In the short term, to change environment (or anything else) about the
"default" expansions service, the thing to do is build and start your
own expansion service that sets up the environment for its transforms
in a custom way.

FYI, in Python, one can use --beam_services to use a custom expansion
service. E.g.

--beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
"localhost:port"}'

would override the default one when using SqlTransform.

On Tue, Jun 29, 2021 at 11:47 AM Kyle Weaver <kcwea...@google.com> wrote:
For context, there was a previous thread which touched on many of the same 
points: 
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E

On Tue, Jun 29, 2021 at 11:16 AM Jan Lukavský <je...@seznam.cz> wrote:
I would slightly disagree that this breaks the black box nature of the expansion, the "how the 
transform expands" remains unknown to the SDK requesting the expansion, the "how the 
transform executes" - on the other hand - is something that the SDK must cooperate on - it 
knows (or could or should know) what is the environment that the pipeline is going to be executed 
on looks like. That is why expansion service on its own cannot correctly define the execution 
environment. It could, if it would be bound to runner (and its environemnt) - for instance 
FlinkRunnerExpansionService could probably expand KafkaIO to something more 'native'. But that 
requires knowledge of the target runner. If the expansion service is not dedicated to a runner, the 
only place where it can be defined, is the SDK - and therefore the expansion request.

Power users can always modify the output produced by the expansion service as 
well.
I'm not sure if I follow this, do you mean that power users, who run the 
expansion service can modify the output? Or is the output (protobuf) of the 
expansion service easily transferable between different execution 
environments?- I had the impression, that execution environments do not 
necessarily have to have the same payloads associated with them, and therefore 
it is impossible to 'postprocess' the output of the expansion. Is that wrong 
assumption?

On 6/29/21 7:55 PM, Luke Cwik wrote:

This would "break" the black box where the expansion service is supposed to 
hide the implementation internals from the caller and pushes compatibility of these kinds 
of environment overrides on to the expansion service and its implementer.

Power users can always modify the output produced by the expansion service as 
well.

On Tue, Jun 29, 2021 at 10:08 AM Jan Lukavský <je...@seznam.cz> wrote:
The argument for being able to accept (possibly ordered list of) execution 
environments is in that this could make a single instance of execution service 
reusable by various clients with different requirements. Moreover, the two 
approaches are probably orthogonal - users could specify 
'defaultExecutionEnvironment' for the service which could be used in case when 
there is no preference given by the client.

On 6/29/21 7:03 PM, Luke Cwik wrote:

I would be much more inclined for the user being able to configure the 
expansion service for their needs instead of changing the expansion service API.

On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský <je...@seznam.cz> wrote:
If I understand it correctly, there is currently no place to set the
defaultEnvironmentType - python's KafkaIO uses either
'expansion_service' given by the user (which might be a host:port, or an
object that has appropriate method), or calls
'default_io_expansion_service' - which in turn runs ExpansionService
using gradle. Either way, it ends up in ExpansionService#main [1]. It
could be possible to adapt ExpansionService and call it locally -
provided ExpansionService would provide a way to extend it (using
protected method createPipeline()) seems to be enough - but that is not
too much user-friendly. If we could specify the defaultEnvironmentConfig
when starting the ExpansionService, it would be possible to add these
parameters in the python SDK's KafkaIO, which would mean users do not
have to worry about the expansion service at all (leaving aside that
using too many ReafFromKafka or WriteToKafka transforms would somewhat
hurt performance during pipeline build, but that applies to the pipeline
build time only). I have created [2] to track that.

Does that make sense, or is my analysis incorrect?

   Jan

[1]
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511

[2] https://issues.apache.org/jira/browse/BEAM-12539


On 6/29/21 6:24 PM, Alexey Romanenko wrote:
I’m sorry if I missed something but do you mean that 
PortablePipelineOptions.setDefaultEnvironmentType(String) doesn’t work for you? 
Or it’s only a specific case while using portable KafkaIO?

On 29 Jun 2021, at 09:51, Jan Lukavský <x666je...@gmail.com> wrote:

Hi,

I have come across an issue with cross-language transforms. My setup is I have 
working environment type PROCESS and I cannot use DOCKER. When I use Python's 
KafkaIO, it unfortunately - by default - expands to docker environment, which 
then fails due to missing 'docker' command. I didn't find a solution without 
tackling the expansion service, yet.

I see several possible solutions to that:

   a) I would say, that the cleanest solution would be to add preferred 
environment type to the expansion request to the expansion service (probably 
along with additional flags, probably --experiments?). This requires deeper 
changes to the expansion RPC defintion, probably serializing the 
PipelineOptions from the client environment into the ExpansionRequest.

   b) Another option would be to allow specifying some of the command-line arguments 
when starting the expansion service, which currently accepts only port on command 
line, see [1]. The straightforward 'fix' (see [2]) unfortunately does not work, 
because it requires DirectRunner to be on the classpath, which then breaks other 
runners (see [3]). It seems possible to copy hand selected options from command line 
to the Pipeline, but that feels hackish. It would require to either be able to 
construct the Pipeline without a runner specified (which seems possible when calling 
Pipeline.create(), but not when using PipelineOptions create by parsing command-line 
arguments) or to be able to create a Map<String, String> from PIpelineOptions 
and then the ability to copy all options into the Pipeline's options.

My proposal would be to create a hackish shortcut and just copy the 
--defaultEnvironmentType, --defaultEnvironmentConfig and --experiments into 
Pipeline's options for now, and create an issue for a proper solution (possible 
a)?).

WDYT? Or did I miss a way to override the default expansion?

Thanks for comments,

   Jan

[1] 
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511

[2] https://github.com/apache/beam/pull/15082

[3] https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/

Reply via email to