The argument for being able to accept (possibly ordered list of)
execution environments is in that this could make a single instance of
execution service reusable by various clients with different
requirements. Moreover, the two approaches are probably orthogonal -
users could specify 'defaultExecutionEnvironment' for the service which
could be used in case when there is no preference given by the client.
On 6/29/21 7:03 PM, Luke Cwik wrote:
I would be much more inclined for the user being able to configure the
expansion service for their needs instead of changing the expansion
service API.
On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
If I understand it correctly, there is currently no place to set the
defaultEnvironmentType - python's KafkaIO uses either
'expansion_service' given by the user (which might be a host:port,
or an
object that has appropriate method), or calls
'default_io_expansion_service' - which in turn runs ExpansionService
using gradle. Either way, it ends up in ExpansionService#main [1]. It
could be possible to adapt ExpansionService and call it locally -
provided ExpansionService would provide a way to extend it (using
protected method createPipeline()) seems to be enough - but that
is not
too much user-friendly. If we could specify the
defaultEnvironmentConfig
when starting the ExpansionService, it would be possible to add these
parameters in the python SDK's KafkaIO, which would mean users do not
have to worry about the expansion service at all (leaving aside that
using too many ReafFromKafka or WriteToKafka transforms would
somewhat
hurt performance during pipeline build, but that applies to the
pipeline
build time only). I have created [2] to track that.
Does that make sense, or is my analysis incorrect?
Jan
[1]
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
[2] https://issues.apache.org/jira/browse/BEAM-12539
<https://issues.apache.org/jira/browse/BEAM-12539>
On 6/29/21 6:24 PM, Alexey Romanenko wrote:
> I’m sorry if I missed something but do you mean that
PortablePipelineOptions.setDefaultEnvironmentType(String) doesn’t
work for you? Or it’s only a specific case while using portable
KafkaIO?
>
>> On 29 Jun 2021, at 09:51, Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
>>
>> Hi,
>>
>> I have come across an issue with cross-language transforms. My
setup is I have working environment type PROCESS and I cannot use
DOCKER. When I use Python's KafkaIO, it unfortunately - by default
- expands to docker environment, which then fails due to missing
'docker' command. I didn't find a solution without tackling the
expansion service, yet.
>>
>> I see several possible solutions to that:
>>
>> a) I would say, that the cleanest solution would be to add
preferred environment type to the expansion request to the
expansion service (probably along with additional flags, probably
--experiments?). This requires deeper changes to the expansion RPC
defintion, probably serializing the PipelineOptions from the
client environment into the ExpansionRequest.
>>
>> b) Another option would be to allow specifying some of the
command-line arguments when starting the expansion service, which
currently accepts only port on command line, see [1]. The
straightforward 'fix' (see [2]) unfortunately does not work,
because it requires DirectRunner to be on the classpath, which
then breaks other runners (see [3]). It seems possible to copy
hand selected options from command line to the Pipeline, but that
feels hackish. It would require to either be able to construct the
Pipeline without a runner specified (which seems possible when
calling Pipeline.create(), but not when using PipelineOptions
create by parsing command-line arguments) or to be able to create
a Map<String, String> from PIpelineOptions and then the ability to
copy all options into the Pipeline's options.
>>
>> My proposal would be to create a hackish shortcut and just copy
the --defaultEnvironmentType, --defaultEnvironmentConfig and
--experiments into Pipeline's options for now, and create an issue
for a proper solution (possible a)?).
>>
>> WDYT? Or did I miss a way to override the default expansion?
>>
>> Thanks for comments,
>>
>> Jan
>>
>> [1]
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
>>
>> [2] https://github.com/apache/beam/pull/15082
<https://github.com/apache/beam/pull/15082>
>>
>> [3]
https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
<https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
>>