> Not sure why we need the hacks with NoOpRunner

As noted earlier (and that was why I started this thread in the first place :)), adding :runners:direct-java as runtime dependency of the expansion service causes something like 200 tests in pre-commit to fail. Looks like there is some kind of conflict among Flink and Direct runner. I didn't dig too deep into that, though.

> You could use the Python utilities in your script to start/stop it manually.

Yes, that is possible. I'll probably follow that path.

> This is where the runner's ability to customize environments would come in handy--e.g. a Java runner could decide to swap out the Java docker environment for EMBEDDED or LOOPBACK (and a Python-based runner could do the same for the Python docker env).

That would be just perfect, as that would make it possible to finally unify 'classical' and 'portable' runners. But that is a whole different story. :)

 Jan

On 6/30/21 5:35 PM, Robert Bradshaw wrote:
On Wed, Jun 30, 2021 at 7:41 AM Jan Lukavský <je...@seznam.cz> wrote:
java -jar beam-sdks-java-io-expansion-service-2.30.0.jar <port>
This does not accept any other parameters than the port. That is the first part of 
this thread - the intent was to enable this to accept additional arguments, but 
there are (still waiting to be addressed unresolved) issues. There currently even 
seems to be no other way to adapt ExpansionService than to copy&paste the code 
and modify it, because it simply is not extensible. What would be enough is 
wrapping Pipeline.create() [1] call to a protected method, or add (protected) 
constructor that would accept PipelineOptions (probably better in this regard). 
That would make it more easy for users to create customized ExpansionService and it 
would (sort of) help solving described issues.
Yes, let's make it easy to extend/customize/start up a custom
ExpansionService, including adding optional command line arguments to
the "default" one. Not sure why we need the hacks with NoOpRunner
(IMHO, the direct runner should just be part of the SDK, but that's
not where we live now).

But even if we do that, we still need to deal with the expansion service on two 
places:

  a) run it (and stop it)

  b) specify it in the

Using the default expansion service is much, much easier, it is started and 
stopped automatically for the user. Morever, the JavaJarExpansionService 
actually even presumes that there can be additional arguments passed to the 
service ([2]), the ExpansionService only does not accept them (and kafka IO 
does not expose that - that could be worked-around by users by manually 
creating the JavaJarExpansionService from own jar, yes). I would find it 
natural to add the command-line parsing (somehow!) to the ExpansionService 
itself, so that it doesn't need end-user modifications and then to figure out 
how to most easily expose there command-line arguments to end-users.
You could use the Python utilities in your script to start/stop it manually.

Yes, I verified that Flink can use Python Kafka IO over PROCESS environment 
with some hacking of the ExpansionService as shown in one of the linked PRs 
(though there is probably still some bugs regarding SDF - [3]). Adding 
--experiments seems have the same issues, need expose that to the CLI of 
ExpansionService. And I'm not sure if this [4] is not in conflict with 
--experiments=use_deprecated_read. That is something I still need to 
investigate.

LOOPBACK is currently not supported by Flink. That is nice-to-have feature.
Local Flink does support LOOPBACK mode. If you just want to run
locally, just specifying "FlinkRunner" is enough. It's distributed
Flink that does not. It seems a lot of complexities are due to trying
to using minikube, which acts like it's distributed, but trying to
make it as easy as if it were all local (and the docker deficiencies
as well, which would make it just work...) Which is a worthy goal.

This is where the runner's ability to customize environments would
come in handy--e.g. a Java runner could decide to swap out the Java
docker environment for EMBEDDED or LOOPBACK (and a Python-based runner
could do the same for the Python docker env).

[1] 
https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L394

[2] 
https://github.com/apache/beam/blob/f9a4bfcb027f2e3a8e32578adf49981aeef3586a/sdks/python/apache_beam/transforms/external.py#L481

[3] https://issues.apache.org/jira/browse/BEAM-11998

[4] 
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L398

On 6/30/21 3:57 PM, Chamikara Jayalath wrote:



On Wed, Jun 30, 2021 at 6:54 AM Chamikara Jayalath <chamik...@google.com> wrote:


On Wed, Jun 30, 2021 at 1:20 AM Jan Lukavský <je...@seznam.cz> wrote:
On 6/30/21 1:16 AM, Robert Bradshaw wrote:
<rant>Why doesn't docker in docker just work, rather than having to do
ugly hacks when composing two technologies that both rely on
docker...</rant>

Presumably you're setting up a node for Kafka and Flink; why not set
one up for the expansion service as well? The UX of

      ReadFromKafka(default_expansion_service_args={"defaultEnvironmentType":
"PROCESS", "defaultEnvironmentConfig": ""{\"os\": \"linux\", \"arch\":
\"amd64\", \"command\": \"/path/to/launcher/boot
cp=/some/other/long/path\" ...}")"})

isn't that great either. Rather than pass arbitrary arguments to a
default expansion service, I still think once you get to this level
it's better to just start your own expansion service.
Sure, that is possible (seems to me, that it would still require some
changes to ExpansionService to be extendable, but yes, kind of tiny
changes). The problem is not with Flink or Kafka - those are
technologies you are actually expecting to set up, because you want to
use them. The problem is what everything else you must set up for making
something that seems as easy as "read a few messages from kafka in beam
python" to work. You must have:

   a) Python SDK harness (OK, that is something that should be probably
expected) - there are few problems with it, namely it is somewhat
hardcoded that it must run in the same pod as Flink's taskmanager to be
able to use EXTERNAL environment, but ok, let's go on

   b) Java SDK harness, at least installed in docker image of taskmanager
(to be usable via PROCESS environment) - OK, that starts to be weird,
taskmanager is java, right? Something like LOOPBACK would be cool there,
but never mind. You create custom docker image for your Flink JM and TM
and continue.

   c) Implement (extend) and deploy own expansion service - ouch, that
starts to hurt, that is even going to be a pod that is running even
though there is nothing using it (yes, can be scaled down).

The complexity of a simple task starts to be somewhat extraordinary. And
most of the users will not be willing to follow this path, I'm afraid.
People generally don't like to set up complex environment for something
that looks it should "just work".  There is non-trivial work necessary
to make all of this working, mostly when you are starting to evaluate
Beam and don't have much experience with it.

I don't think we should expect end-users to implement or extend the expansion 
service. Everything should be already implemented and maybe we can even provide 
a script to easily startup a local Java expansion service with additional 
parameters.

Today, to start a Java expansion service for Kafka users have to do the 
following.

* Download expansion service jar released with Beam for Kafka. For example [1]

* Run following command:
java -jar beam-sdks-java-io-expansion-service-2.30.0.jar <port>

* To use this they just have to provide "localhost:<port>" to [2].

This is a few extra steps but mostly a one time setup for the user and nothing 
to do with portability or other complexities of Beam.

I'm all for simplifying the user-experience, but adding changes to the 
transform API that might have to be deprecated later sounds like a bad idea. 
I'd much rather provide additional scripts/documentation/examples to simplify 
such use-cases. I think that will be adequate for most users.

BTW, slightly orthogonal, I don't think multi-language would work in LOOPBACK 
mode today without additional changes to portable runners (at least I've never 
tested this). Did you confirm that this works ?

Or PROCESS mode.


Thanks,
Cham

[1] 
https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-io-expansion-service/2.30.0/beam-sdks-java-io-expansion-service-2.30.0.jar
[2]  
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133



We can get rid of b) (implement LOOPBACK in Flink) and c) (enable Python
SDK Kafka IO to spawn expansion service with the LOOPBACK environment
when submitting to Flink). That is why I still think that this
simplification matters a lot.

On Tue, Jun 29, 2021 at 3:33 PM Jan Lukavský <je...@seznam.cz> wrote:
I believe we could change that more or less the same as we can deprecate / stop 
supporting any other parameter of any method. If python starts to support 
natively Kafka IO, then we can simply log warning / raise exception (one after 
the other). That seems like natural development.

Maybe I should have described the case - I'm trying to setup a "simple" use-case for 
users that want to try Python SDK to read using Flink from Kafka using Minikube (both Kafka and 
Flink are running inside Minikube). There are tons of problems to use docker from within Minkube 
and I would not say that is the "simple" way we would like to present to users. Setting 
up own expansion service is possibility - but that also lacks the UX approach. I pretty much think 
that understanding portability on it's own is already a burden we put on users (yes, we do that for 
a reason, but everything else should be as simple as possible).

On 6/30/21 12:16 AM, Chamikara Jayalath wrote:

So I think one downside to this PR is that we assume that the default expansion 
service used by the transform (Kafka in this case) will not change. Currently 
it's fully opaque. In the default case we just promise that the transform will 
work (if conditions I mentioned above are met). Nothing else.
If we add a "param default_expansion_service_args", we leak the nature of the 
default expansion service to the API and it will be hard to change it in the future.

Thanks,
Cham

On Tue, Jun 29, 2021 at 3:07 PM Jan Lukavský <je...@seznam.cz> wrote:
I would absolutely understand this, if it would be mostly impossible or at 
least really hard to get the user friendly behavior. But we are mostly there in 
this case. When we can actually quite simply pass the supported environment via 
parameter, I think we should go for it.

I have created a sketch (I verified that when the ExpansionService is patched 
'enough' this works) in [1]. This is only a sketch, because we first must know 
how to support the default execution environment in ExpansionService.

[1] https://github.com/apache/beam/pull/15099/files

On 6/29/21 11:51 PM, Chamikara Jayalath wrote:



On Tue, Jun 29, 2021 at 2:39 PM Jan Lukavský <je...@seznam.cz> wrote:
On 6/29/21 11:04 PM, Robert Bradshaw wrote:
You can configure the environment in the current state, you just have
to run your own expansion service that has a different environment
backed into it (or, makes this configurable).
Yes, that is true. On the other hand that lacks some user-friendliness,
because ideally, you don't want to worry about expansion services,
mostly when it comes to some mostly standard IO. The ideal case is that
you either do not basically know that you use external transform (which
is probably the case when you can use docker), or you are able to
overcome the problem within the SDK (Python) by passing some argument to
the input transform.
Arguments passed to the pipeline level apply to the whole pipeline (not just 
one transform). So if you pass in a default environment (and configs) at 
pipeline level, that would mean the default environment and configs used by the 
pipeline (so Python SDK in this case) not a specific transform.
I believe we have made usage of external transforms used-friendly for the 
general case. But we had to make some assumptions. For example we assumed,
* user will be using the default environment of the expansion service (Docker 
in this case)
* User will be using the pre-specified dependency only 
(sdks:java:io:expansion-service:shadowJar for Kafka)
* User will be in an environment where the jar can be downloaded.

I would consider any use-case where these basic assumptions cannot be met as an 
advanced use-case. The solution in such a case would be to start a custom 
expansion service and pass the address of it as a parameter to the transform 
[1]. I'm fine with extending the capabilities of Java expansion service by 
adding more parameters (for example, for overriding the environment, for 
specifying dependencies, for providing pipeline options).

Thanks,
Cham

[1] 
https://github.com/apache/beam/blob/b86fcf94af26a240777f30f8193a314cb7ffc87e/sdks/python/apache_beam/io/kafka.py#L133


Is option (1) updating the default expansion service such that one can
override default environment properties on the command line? (You
would still have to start it up manually to use it.)
Yes and no. :) Updating ExpansionService so that you can specify default
environment on command like makes this accessible to
JavaJarExpansionService, and that makes it possible to add (optional)
argument to Python Kafka IO, that would delegate this to the
(automatically) started expansion service. It is important to note that
both ReadFromKafka and WriteToKafka have expansion that involves only
single external (Java) SDK. That simplifies things.
Maybe it would help to make things more concrete. Suppose I have a Go
pipeline that uses a library which invokes a Python external transform
to do ML (say, via TFX), and two Java IOs (which happen to have
mutually exclusive dependencies). The ML transform itself uses Java to
invoke some SQL.

The way things work currently is each external transform will have an
associated fully specified environment and a runner can use docker to
start up the required workers at the expected time.

Now, suppose one doesn't have docker on the workers. One wants to run this with

       ./my_pipeline --someFlag=someValue --someOtherFlag=someOtherValue ...

such that docker is no longer needed. What someFlags would we need,
and what would their values be? (And how to make this feasible to
implement.)

Are there meaningful intermediate points that extend to a general
solution (or at least aren't hostile to it)?
I believe that in the option 2) the best way would to use each SDK's URN
Then the arguments could be something like
"--expansionEnvironments={"apache:beam:go:2.33.0:latest"={"env"="DOCKER",
config="<image>"}, "apache:beam:python:2.33.0:latest"={env="PROCESS",
config={...}}". Yes, it would require a lot of "syntactic sugar" to
configure that. :) (sorry if I don't have URNs for SDKs 100% correct)
I still think in the long run having runners understand environments,
and saying "oh, whenever I see 'apache:beam:java:2.33.0:latest' I'll
swap that out for 'path/to/my/java -cp ...' is the right way to go
long-term. (I would put this in runners, not SDKs, though a common
runners library could be used.)
Yes, I also agree, that expansion service should be runner-dependent (or
at least runner-aware), as that brings optimizations. Runner could
ignore settings from previous point when it can be *sure* it can do so.
On Tue, Jun 29, 2021 at 1:29 PM Jan Lukavský <je...@seznam.cz> wrote:
Thanks for pointing to that thread.

1) I'm - as well as Kyle - fine with the approach that we use a
"preferred environment" for the expansion service. We only need to pass
it via command line. Yes, the command line might be generally
SDK-dependent, and that makes it expansion dependent, because whether or
not particular transform is "external" or not is implementation detail.
That is the nasty part. The rest of my original question is about, how
exactly to do that, because it seems to be tricky, due to the fact, that
it is not possible to include runtime dependency on DirectRunner (fails
many, many tests) and it is not possible to extract PipelineOptions as a
Map either.

2) Regarding SDK injecting environment, I still think that is the
correct way. The SDK (the driver code) own the execution environment. It
should be able to define (or at least prioritize) runtime environments
of all transforms. If we cannot know in advance, which transform is
going to expand to how many nested (and possibly external) transforms, I
think that the SDK could be fine with providing a Map(SDK ->
environment). That is: "Run Java using PROCESS", "Run Python using
DOCKER", and so on. A default mapping might exist on the expansion
service as well (which might be passed through command line and that is
the point 1)). Yes, the Map approach is definitely not universal,
because one can imagine that the SDK itself is not enough for specifying
the environment, but seems that vast majority of cases would fit into that.

3) The best might be for the SDK to provide a list of supported
environments with additional metrics which the expansion service might
choose from.

These three approaches are all extensions to the current state. Current
state has predefined environment without possibility to change it.
Option 1) changes it to single configurable environment, option 2) to N
environments based on SDK and option 3) to M environments based on
SDK-dependent metrics (and/or capabilitites of particular environment).
Seems like gradual extensions of the current state, so maybe we can
focus on the first one, and maybe add other, when there is a need?

If this could be the first conclusion, then the next one would be, what
should be the preferred way to implement it.

WDYT?

On 6/29/21 9:15 PM, Robert Bradshaw wrote:
+1, thanks for digging up that thread.

I am still of the same opinion that I wrote there. To touch on some
things brought up here, copying something like
defaultEnvironmentConfig doesn't make sense from language to language
(e.g. the docker image name or CLI arguments for subprocess mode just
isn't going to work for all of Python, Java, and Go, and clearly
embedded type is only going to work for one.)

In the short term, to change environment (or anything else) about the
"default" expansions service, the thing to do is build and start your
own expansion service that sets up the environment for its transforms
in a custom way.

FYI, in Python, one can use --beam_services to use a custom expansion
service. E.g.

--beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
"localhost:port"}'

would override the default one when using SqlTransform.

On Tue, Jun 29, 2021 at 11:47 AM Kyle Weaver <kcwea...@google.com> wrote:
For context, there was a previous thread which touched on many of the same 
points: 
https://lists.apache.org/thread.html/r6f6fc207ed62e1bf2a1d41deeeab554e35cd2af389ce38289a303cea%40%3Cdev.beam.apache.org%3E

On Tue, Jun 29, 2021 at 11:16 AM Jan Lukavský <je...@seznam.cz> wrote:
I would slightly disagree that this breaks the black box nature of the expansion, the "how the 
transform expands" remains unknown to the SDK requesting the expansion, the "how the 
transform executes" - on the other hand - is something that the SDK must cooperate on - it 
knows (or could or should know) what is the environment that the pipeline is going to be executed 
on looks like. That is why expansion service on its own cannot correctly define the execution 
environment. It could, if it would be bound to runner (and its environemnt) - for instance 
FlinkRunnerExpansionService could probably expand KafkaIO to something more 'native'. But that 
requires knowledge of the target runner. If the expansion service is not dedicated to a runner, the 
only place where it can be defined, is the SDK - and therefore the expansion request.

Power users can always modify the output produced by the expansion service as 
well.
I'm not sure if I follow this, do you mean that power users, who run the 
expansion service can modify the output? Or is the output (protobuf) of the 
expansion service easily transferable between different execution 
environments?- I had the impression, that execution environments do not 
necessarily have to have the same payloads associated with them, and therefore 
it is impossible to 'postprocess' the output of the expansion. Is that wrong 
assumption?

On 6/29/21 7:55 PM, Luke Cwik wrote:

This would "break" the black box where the expansion service is supposed to 
hide the implementation internals from the caller and pushes compatibility of these kinds 
of environment overrides on to the expansion service and its implementer.

Power users can always modify the output produced by the expansion service as 
well.

On Tue, Jun 29, 2021 at 10:08 AM Jan Lukavský <je...@seznam.cz> wrote:
The argument for being able to accept (possibly ordered list of) execution 
environments is in that this could make a single instance of execution service 
reusable by various clients with different requirements. Moreover, the two 
approaches are probably orthogonal - users could specify 
'defaultExecutionEnvironment' for the service which could be used in case when 
there is no preference given by the client.

On 6/29/21 7:03 PM, Luke Cwik wrote:

I would be much more inclined for the user being able to configure the 
expansion service for their needs instead of changing the expansion service API.

On Tue, Jun 29, 2021 at 9:42 AM Jan Lukavský <je...@seznam.cz> wrote:
If I understand it correctly, there is currently no place to set the
defaultEnvironmentType - python's KafkaIO uses either
'expansion_service' given by the user (which might be a host:port, or an
object that has appropriate method), or calls
'default_io_expansion_service' - which in turn runs ExpansionService
using gradle. Either way, it ends up in ExpansionService#main [1]. It
could be possible to adapt ExpansionService and call it locally -
provided ExpansionService would provide a way to extend it (using
protected method createPipeline()) seems to be enough - but that is not
too much user-friendly. If we could specify the defaultEnvironmentConfig
when starting the ExpansionService, it would be possible to add these
parameters in the python SDK's KafkaIO, which would mean users do not
have to worry about the expansion service at all (leaving aside that
using too many ReafFromKafka or WriteToKafka transforms would somewhat
hurt performance during pipeline build, but that applies to the pipeline
build time only). I have created [2] to track that.

Does that make sense, or is my analysis incorrect?

      Jan

[1]
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511

[2] https://issues.apache.org/jira/browse/BEAM-12539


On 6/29/21 6:24 PM, Alexey Romanenko wrote:
I’m sorry if I missed something but do you mean that 
PortablePipelineOptions.setDefaultEnvironmentType(String) doesn’t work for you? 
Or it’s only a specific case while using portable KafkaIO?

On 29 Jun 2021, at 09:51, Jan Lukavský <x666je...@gmail.com> wrote:

Hi,

I have come across an issue with cross-language transforms. My setup is I have 
working environment type PROCESS and I cannot use DOCKER. When I use Python's 
KafkaIO, it unfortunately - by default - expands to docker environment, which 
then fails due to missing 'docker' command. I didn't find a solution without 
tackling the expansion service, yet.

I see several possible solutions to that:

      a) I would say, that the cleanest solution would be to add preferred 
environment type to the expansion request to the expansion service (probably 
along with additional flags, probably --experiments?). This requires deeper 
changes to the expansion RPC defintion, probably serializing the 
PipelineOptions from the client environment into the ExpansionRequest.

      b) Another option would be to allow specifying some of the command-line 
arguments when starting the expansion service, which currently accepts only port on 
command line, see [1]. The straightforward 'fix' (see [2]) unfortunately does not 
work, because it requires DirectRunner to be on the classpath, which then breaks 
other runners (see [3]). It seems possible to copy hand selected options from command 
line to the Pipeline, but that feels hackish. It would require to either be able to 
construct the Pipeline without a runner specified (which seems possible when calling 
Pipeline.create(), but not when using PipelineOptions create by parsing command-line 
arguments) or to be able to create a Map<String, String> from PIpelineOptions 
and then the ability to copy all options into the Pipeline's options.

My proposal would be to create a hackish shortcut and just copy the 
--defaultEnvironmentType, --defaultEnvironmentConfig and --experiments into 
Pipeline's options for now, and create an issue for a proper solution (possible 
a)?).

WDYT? Or did I miss a way to override the default expansion?

Thanks for comments,

      Jan

[1] 
https://github.com/apache/beam/blob/22205ee1a84581e9206c5c61bad88a799779b4bc/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L511

[2] https://github.com/apache/beam/pull/15082

[3] https://ci-beam.apache.org/job/beam_PreCommit_Java_Commit/18169/

Reply via email to