I kept working with an ExternalTransformRegistrar solution (although if
there's an easier way, I'm all ears), and I have Java code that builds, and
a Python connector that tries to use it.

My current issue is that the expansion service that's started up doesn't
find my transform using the URN provided:
RuntimeError: java.lang.UnsupportedOperationException: Unknown urn:
beam:external:CAMUS:bigtable_read:v1

And I can see that my transform wasn't registered:

INFO:apache_beam.utils.subprocess_server:b'INFO: Registering external
transforms: [beam:transform:org.apache.beam:pubsub_read:v1,
beam:transform:org.apache.beam:pubsub_write:v1,
beam:transform:org.apache.beam:pubsublite_write:v1,
beam:transform:org.apache.beam:pubsublite_read:v1,
beam:transform:org.apache.beam:spanner_insert:v1,
beam:transform:org.apache.beam:spanner_update:v1,
beam:transform:org.apache.beam:spanner_replace:v1,
beam:transform:org.apache.beam:spanner_insert_or_update:v1,
beam:transform:org.apache.beam:spanner_delete:v1,
beam:transform:org.apache.beam:spanner_read:v1,
beam:transform:org.apache.beam:schemaio_bigquery_read:v1,
beam:transform:org.apache.beam:schemaio_bigquery_write:v1,
beam:transform:org.apache.beam:schemaio_datastoreV1_read:v1,
beam:transform:org.apache.beam:schemaio_datastoreV1_write:v1,
beam:transform:org.apache.beam:schemaio_pubsub_read:v1,
beam:transform:org.apache.beam:schemaio_pubsub_write:v1,
beam:transform:org.apache.beam:schemaio_jdbc_read:v1,
beam:transform:org.apache.beam:schemaio_jdbc_write:v1,
beam:transform:org.apache.beam:schemaio_avro_read:v1,
beam:transform:org.apache.beam:schemaio_avro_write:v1,
beam:external:java:generate_sequence:v1]'

I'm creating the expansion service in code like this:

        expansion_service = BeamJarExpansionService(


'sdks:java:io:google-cloud-platform:expansion-service:shadowJar',

                            extra_args=["{{PORT}}",
'--javaClassLookupAllowlistFile=*'],

                            classpath=[
"/home/builder/xlang/bando/bazel-bin/bigtable/libjava_hbase.jar"])

where libjava_hbase.jar was built by Bazel and contains my code:

$ jar tf libjava_hbase.jar

META-INF/

META-INF/MANIFEST.MF

energy/

energy/camus/

energy/camus/beam/

energy/camus/beam/BigtableRegistrar$BigtableReadBuilder$Configuration.class

energy/camus/beam/BigtableRegistrar$BigtableReadBuilder.class

energy/camus/beam/BigtableRegistrar$CrossLanguageConfiguration.class

energy/camus/beam/BigtableRegistrar.class

The relevant part of my code that does the registration looks like this:

@AutoService(ExternalTransformRegistrar.class)

public class BigtableRegistrar implements ExternalTransformRegistrar {


    static final String READ_URN = "beam:external:CAMUS:bigtable_read:v1";


    @Override

    public Map<String, ExternalTransformBuilder<?, ?, ?>>
knownBuilderInstances() {

        return ImmutableMap.of(READ_URN, new BigtableReadBuilder());

    }

What am I missing that prevents my transform to be registered?

Thanks,
-Lina

On Tue, Dec 27, 2022 at 5:11 PM Lina Mårtensson <lina@camus.energy> wrote:

> I finally was able to get back to this and try to make an x-language
> transform for Bigtable to be used in Python, but I could use some help.
>
> I started out with the Bigtable
> <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java>
> library, and it seemed like I should be able to go with option 1 here
> <https://beam.apache.org/documentation/programming-guide/#1311-creating-cross-language-java-transforms>,
> i.e. not write any Java code.
>
> As a non-Java user, it still wasn't obvious how to get this working, but I
> eventually got it:
>
>         java_transform = JavaExternalTransform(
>
>             'org.apache.beam.sdk.io.gcp.bigtable.BigtableIO',
>
>
>             BeamJarExpansionService(
>
>
>
> 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar',
>
>
>                     extra_args=["{{PORT}}",
> '--javaClassLookupAllowlistFile=*'])
>
>         ).read().withProjectId(projectId="myProjectId")
>
>
>
>
>
>         data = p | 'Read from Bigtable' >> java_transform
>
> It wasn't clear to me how to find the right jar to use, or that I needed
> to add the extra_args when specifying my own JAR.
>
> However, I get the following error:
>
> RuntimeError: java.lang.RuntimeException: Expected to find exactly one
> matching method in transform Read{config=BigtableConfig{projectId=null,
> instanceId=null, tableId=, bigtableOptionsConfigurator=null, options=null},
> readOptions=BigtableReadOptions{rowFilter=null,
> keyRanges=[ByteKeyRange{startKey=[], endKey=[]}]}} for BuilderMethodname:
> "withProjectId"
>
> schema {
>
>   fields {
>
>     name: "projectId"
>
>     type {
>
>       atomic_type: STRING
>
>     }
>
>   }
>
>   id: "8b43f1f0-313f-4b46-9559-d9d11fd7ecf2"
>
> }
>
> payload: "\001\000\vmyProjectId"
>
>  but found 2
>
> at
> org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.getMethod(JavaClassLookupTransformProvider.java:236)
>
> at
> org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.applyBuilderMethods(JavaClassLookupTransformProvider.java:145)
>
> at
> org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.getTransform(JavaClassLookupTransformProvider.java:129)
>
> at
> org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:396)
>
> at
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:515)
>
> at
> org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:595)
>
> at
> org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:220)
>
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
>
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
>
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
>
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
>
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> I believe this is pointing out that there are two withProjectId methods -
> one that takes String
> <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L285>,
> one that takes a ValueProvider
> <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L273>
> .
>
> I take it this means that the write-no-Java option won't work here? The
> HBase
> <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java>
> implementation looks like it would have the same issue.
>
> Before I try and write Java code and convince my team that we're OK to
> have some Java code, I wanted to check if there's anything I'm missing, or
> if I'll need to go with the process described in 13.1.1.2 and implement an
> ExternalTransformBuilder and an ExternalTransformRegistrar.
>
> Thanks!
> -Lina
>
>
> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>>
>>
>> On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson <lina@camus.energy>
>> wrote:
>>
>>> Thanks for the detailed answers!
>>>
>>> I totally get the points about development & maintenance cost, and,
>>> from a user perspective, about getting the performance right.
>>>
>>> I decided to try out the Spanner connector to get a sense of how well
>>> the x-language approach works in our world, since that's an existing
>>> x-language connector.
>>> Overall, it works and with minimal intervention as you say - it is
>>> very slow, though.
>>> I'm a little confused about "portable runners" - if I understand this
>>> correctly, this means we couldn't run with the DirectRunner anymore if
>>> using an x-language connector? (At least it didn't work when I tried
>>> it.)
>>>
>>
>> You'll have to use the portable DirectRunner -
>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability
>>
>> Job service for this can be started using following command:
>> python apache_beam/runners/portability/local_job_service_main.py -p <port>
>>
>> Instructions for using this should be similar to here (under "Portable
>> (Java/Python/Go)"): https://beam.apache.org/documentation/runners/flink/
>>
>>
>>>
>>> My test of running a trivial GCS-to-Spanner job with 18 KB of input on
>>> Dataflow takes about 15 minutes end-to-end. 5+ minutes of that is
>>> uploading the expansion service to GCS, and the startup time on
>>> Dataflow takes several minutes as well:
>>> "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS
>>> upload to
>>> gs://dataflow-staging-us-central1-92d40d9a13427cbb4dfa41465ce57494/beamapp-lina-0728173601-761137-4rfo0mb9.1659029761.762052/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar
>>> in 337 seconds."
>>> Is that expected, or are we doing something strange here? My internet
>>> isn't very fast here, so these up/downloads can really slow things
>>> down.
>>> I tried adding --prebuild_sdk_container_engine=cloud_build but that
>>> doesn't affect the .jar file.
>>>
>>
>> There are several things contributing to the end-to-end execution time.
>>
>> * Time to stage dependencies including the shaded jar file (that is used
>> both by the expansion service and at runtime).
>>
>> This is cross-language only. But you control the jar file. You are trying
>> to use the
>> existing beam-sdks-java-io-google-cloud-platform-expansion-service jar
>> which is a 114 MB file.
>>
>> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform-expansion-service/2.39.0
>>
>> Not exactly sure why it took 337 seconds. But could possibly be a network
>> issue. You could also define a new smaller expansion service jar just for
>> Spanner if needed.
>>
>> * Time to start the job
>> This is mostly common for both cross-language and non-cross-language
>> jobs. Starting up the Dataflow worker pool could take some time.
>> Cross-language could take slightly longer since we need to start both Java
>> and Python containers but this is a fixed cost (not dependent on the
>> job/input size).
>>
>> * Time to execute the job.
>> This is what I'd compare if you want to decide on a pure-Python vs a Java
>> cross-language implementation just based on performance. Cross-language
>> version would have an added cost to serialize data and send across SDK
>> harness containers (within the same VM for Dataflow).
>> On the other hand cross-language version would be reading using a
>> Java implementation which I expected to be more performant than a pure
>> Python read implementation.
>>
>> Hope this helps.
>>
>> Thanks,
>> Cham
>>
>>
>>
>>
>>>
>>> If we can get this to a workable time, and/or iterate locally, then I
>>> think an x-language connector for Bigtable could work out well.
>>> Otherwise we might have to look at a native Python version after all.
>>>
>>> Thanks!
>>> -Lina
>>>
>>> On Wed, Jul 27, 2022 at 1:39 PM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>> >
>>> >
>>> >
>>> > On Wed, Jul 27, 2022 at 11:10 AM Lina Mårtensson <lina@camus.energy>
>>> wrote:
>>> >>
>>> >> Thanks Cham!
>>> >>
>>> >> Could you provide some more detail on your preference for developing a
>>> >> Python wrapper rather than implementing a source purely in Python?
>>> >
>>> >
>>> > I've mentioned the main advantages of developing a cross-language
>>> transform over natively implementing this in Python below.
>>> >
>>> > * Reduced cost of development
>>> >
>>> > It's much easier to  develop a cross-language wrapper of the Java
>>> source than re-implementing the source in Python. Sources are some of the
>>> most complex
>>> > code we have in Beam and sources control the parallelization of the
>>> pipeline (for example, splitting and dynamic work rebalancing for supported
>>> runners). So getting this code wrong can result in hard to track data
>>> loss/duplication related issues.
>>> > Additionally, based on my experience, it's very hard to get a source
>>> implementation correct and performant on the first try. It could take
>>> additional benchmarks/user feedback over time to get the source production
>>> ready.
>>> > Java BT source is already battle tested well (actually we have two
>>> Java implementations [1][2] currently). So I would rather use a Java BT
>>> connector as a cross-language transform than re-implementing sources for
>>> other SDKs.
>>> >
>>> > * Minimal maintenance cost
>>> >
>>> > Developing a source/sink is just a part of the story. We (as a
>>> community) have to maintain it over time and make sure that ongoing
>>> issues/feature requests are adequately handled. In the past, we have had
>>> cases where sources/sinks are available for multiple SDKs but one
>>> > is significantly better than others when it comes to the feature set
>>> (for example, BigQuery). Cross-language will make this easier and will
>>> allow us to maintain key logic in a single place.
>>> >
>>> >>
>>> >>
>>> >> If I look at the instructions for using the x-language Spanner
>>> >> connector, then using this - from the user's perspective - would
>>> >> involve installing a Java runtime.
>>> >> That's not terrible, but I fear that getting this to work with bazel
>>> >> might end up being more trouble than expected. (That has often
>>> >> happened here, and we have enough trouble with getting Python 3.9 and
>>> >> 3.10 to co-exist.)
>>> >
>>> >
>>> > From an end user perspective, all they should have to do is make sure
>>> that Java is available in the machine where the job is submitted from. Beam
>>> has features to allow starting up cross-language expansion services (that
>>> is needed during job submission) automatically so users should not have to
>>> do anything other than that.
>>> >
>>> > At job execution, Beam (portable) uses Docker-based SDK harness
>>> containers and we already release appropriate containers for each SDK. The
>>> runners should seamlessly download containers needed to execute the job.
>>> >
>>> > That said, the main downside of cross-language today is runner
>>> support. Cross-language transform support is only available for portable
>>> Beam runners (for example, Dataflow Runner v2) but this is the direction
>>> Beam runners are going anyway.
>>> >
>>> >>
>>> >>
>>> >> There are a few of us at our small start-up that have written
>>> >> MapReduces and similar in the past and are completely convinced by the
>>> >> Beam/Dataflow model. But many others have no previous experience and
>>> >> are skeptical, and see this new tool we're introducing as something
>>> >> that's more trouble than it's worth, and something they'd rather avoid
>>> >> - even when we see how lots of their use cases could be made much
>>> >> easier using Beam. I'm worried that every extra hoop to jump through
>>> >> will make it less likely to be widely used for us. Because of that, my
>>> >> bias would be towards having a Python connector rather than
>>> >> x-language, and I would find it really helpful to learn about why you
>>> >> both favor the x-language option.
>>> >
>>> >
>>> > I understand your concerns. It's certainly possible to develop the
>>> same connector in multiple SDKs (and we provide SDF source framework
>>> support in all SDK languages). But hopefully my comments above will give
>>> you an idea of the downsides of this approach :).
>>> >
>>> > Thanks,
>>> > Cham
>>> >
>>> > [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
>>> > [2] https://cloud.google.com/bigtable/docs/hbase-dataflow-java
>>> >
>>> >>
>>> >>
>>> >> Thanks!
>>> >> -Lina
>>> >>
>>> >> On Tue, Jul 26, 2022 at 6:11 PM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Mon, Jul 25, 2022 at 12:53 PM Lina Mårtensson via dev <
>>> dev@beam.apache.org> wrote:
>>> >> >>
>>> >> >> Hi dev,
>>> >> >>
>>> >> >> We're starting to incorporate BigTable in our stack and I've
>>> delighted
>>> >> >> my co-workers with how easy it was to create some BigTables with
>>> >> >> Beam... but there doesn't appear to be a reader for BigTable in
>>> >> >> Python.
>>> >> >>
>>> >> >> First off, is there a good reason why not/any reason why it would
>>> be difficult?
>>> >> >
>>> >> >
>>> >> > There's was a previous effort to implement a Python BT source but
>>> that was not completed:
>>> https://github.com/apache/beam/pull/11295#issuecomment-646378304
>>> >> >
>>> >> >>
>>> >> >>
>>> >> >> I could write one, but before I start, I'd love some input to make
>>> it easier.
>>> >> >>
>>> >> >> It appears that there would be two options: either write one in
>>> >> >> Python, or try to set one up with x-language from Java which I see
>>> is
>>> >> >> done e.g. with the Spanner IO Connector.
>>> >> >> Any recommendation on which one to pick or potential pitfalls in
>>> either choice?
>>> >> >>
>>> >> >> If I write one in Python, what should I think about?
>>> >> >> It is not obvious to me how to achieve parallelization, so any tips
>>> >> >> here would be welcome.
>>> >> >
>>> >> >
>>> >> > I would strongly prefer developing a  Python wrapper for the
>>> existing Java BT source using Beam's Multi-language Pipelines framework
>>> over developing a new Python source.
>>> >> >
>>> https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines
>>> >> >
>>> >> > Thanks,
>>> >> > Cham
>>> >> >
>>> >> >
>>> >> >>
>>> >> >>
>>> >> >> Thanks!
>>> >> >> -Lina
>>>
>>

Reply via email to