Hi,

I have come across an issue with cross-language transforms. My setup is I have working environment type PROCESS and I cannot use DOCKER. When I use Python's KafkaIO, it unfortunately - by default - expands to docker environment, which then fails due to missing 'docker' command. I didn't find a solution without tackling the expansion service, yet.

I see several possible solutions to that:

 a) I would say, that the cleanest solution would be to add preferred environment type to the expansion request to the expansion service (probably along with additional flags, probably --experiments?). This requires deeper changes to the expansion gRPC defintion, probably serializing the PipelineOptions from the client environment into the ExpansionRequest.

 b) Another option would be to allow specifying some of the command-line arguments when starting the expansion service, which currently accepts only port on command line, see [1]. The straightforward 'fix' (see [2]) unfortunately does not work, because it requires DirectRunner to be on the classpath, which then breaks other runners (see [3]). It seems possible to copy hand selected options from command line to the Pipeline, but that feels hackish. It would require to either be able to construct the Pipeline without a runner specified (which seems possible when calling Pipeline.create(), but not when using PipelineOptions create by parsing command-line arguments) or to be able to create a Map<String, String> from PIpelineOptions and then the ability to copy all options into the Pipeline's options.

My proposal would be to create a hackish shortcut and just copy the --defaultEnvironmentType, --defaultEnvironmentConfig and --experiments into Pipeline's options for now, and create an issue for a proper solution (possible a)?).

WDYT? Or did I miss a way to override the default expansion?

Thanks for comments,

 Jan

[1] https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511

[2] https://github.com/apache/beam/pull/15082

[3] https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/

Reply via email to