I would slightly disagree that this breaks the black box nature of the
expansion, the "how the transform expands" remains unknown to the SDK
requesting the expansion, the "how the transform executes" - on the
other hand - is something that the SDK must cooperate on - it knows (or
could or should know) what is the environment that the pipeline is going
to be executed on looks like. That is why expansion service on its own
cannot correctly define the execution environment. It could, if it would
be bound to runner (and its environemnt) - for instance
FlinkRunnerExpansionService could probably expand KafkaIO to something
more 'native'. But that requires knowledge of the target runner. If the
expansion service is not dedicated to a runner, the only place where it
can be defined, is the SDK - and therefore the expansion request.
> Power users can always modify the output produced by the expansion
service as well.
I'm not sure if I follow this, do you mean that power users, who run the
expansion service can modify the output? Or is the output (protobuf) of
the expansion service easily transferable between different execution
environments?- I had the impression, that execution environments do not
necessarily have to have the same payloads associated with them, and
therefore it is impossible to 'postprocess' the output of the expansion.
Is that wrong assumption?
On 6/29/21 7:55 PM, Luke Cwik wrote:
This would "break" the black box where the expansion service is
supposed to hide the implementation internals from the caller and
pushes compatibility of these kinds of environment overrides on to the
expansion service and its implementer.
Power users can always modify the output produced by the expansion
service as well.
On Tue, Jun 29, 2021 at 10:08 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
The argument for being able to accept (possibly ordered list of)
execution environments is in that this could make a single
instance of execution service reusable by various clients with
different requirements. Moreover, the two approaches are probably
orthogonal - users could specify 'defaultExecutionEnvironment' for
the service which could be used in case when there is no
preference given by the client.
On 6/29/21 7:03 PM, Luke Cwik wrote:
I would be much more inclined for the user being able to
configure the expansion service for their needs instead of
changing the expansion service API.
On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
If I understand it correctly, there is currently no place to
set the
defaultEnvironmentType - python's KafkaIO uses either
'expansion_service' given by the user (which might be a
host:port, or an
object that has appropriate method), or calls
'default_io_expansion_service' - which in turn runs
ExpansionService
using gradle. Either way, it ends up in ExpansionService#main
[1]. It
could be possible to adapt ExpansionService and call it
locally -
provided ExpansionService would provide a way to extend it
(using
protected method createPipeline()) seems to be enough - but
that is not
too much user-friendly. If we could specify the
defaultEnvironmentConfig
when starting the ExpansionService, it would be possible to
add these
parameters in the python SDK's KafkaIO, which would mean
users do not
have to worry about the expansion service at all (leaving
aside that
using too many ReafFromKafka or WriteToKafka transforms would
somewhat
hurt performance during pipeline build, but that applies to
the pipeline
build time only). I have created [2] to track that.
Does that make sense, or is my analysis incorrect?
Jan
[1]
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
[2] https://issues.apache.org/jira/browse/BEAM-12539
<https://issues.apache.org/jira/browse/BEAM-12539>
On 6/29/21 6:24 PM, Alexey Romanenko wrote:
> I’m sorry if I missed something but do you mean that
PortablePipelineOptions.setDefaultEnvironmentType(String)
doesn’t work for you? Or it’s only a specific case while
using portable KafkaIO?
>
>> On 29 Jun 2021, at 09:51, Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
>>
>> Hi,
>>
>> I have come across an issue with cross-language
transforms. My setup is I have working environment type
PROCESS and I cannot use DOCKER. When I use Python's KafkaIO,
it unfortunately - by default - expands to docker
environment, which then fails due to missing 'docker'
command. I didn't find a solution without tackling the
expansion service, yet.
>>
>> I see several possible solutions to that:
>>
>> a) I would say, that the cleanest solution would be to
add preferred environment type to the expansion request to
the expansion service (probably along with additional flags,
probably --experiments?). This requires deeper changes to the
expansion RPC defintion, probably serializing the
PipelineOptions from the client environment into the
ExpansionRequest.
>>
>> b) Another option would be to allow specifying some of
the command-line arguments when starting the expansion
service, which currently accepts only port on command line,
see [1]. The straightforward 'fix' (see [2]) unfortunately
does not work, because it requires DirectRunner to be on the
classpath, which then breaks other runners (see [3]). It
seems possible to copy hand selected options from command
line to the Pipeline, but that feels hackish. It would
require to either be able to construct the Pipeline without a
runner specified (which seems possible when calling
Pipeline.create(), but not when using PipelineOptions create
by parsing command-line arguments) or to be able to create a
Map<String, String> from PIpelineOptions and then the ability
to copy all options into the Pipeline's options.
>>
>> My proposal would be to create a hackish shortcut and just
copy the --defaultEnvironmentType, --defaultEnvironmentConfig
and --experiments into Pipeline's options for now, and create
an issue for a proper solution (possible a)?).
>>
>> WDYT? Or did I miss a way to override the default expansion?
>>
>> Thanks for comments,
>>
>> Jan
>>
>> [1]
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511
<https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511>
>>
>> [2] https://github.com/apache/beam/pull/15082
<https://github.com/apache/beam/pull/15082>
>>
>> [3]
https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/
<https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/>
>>