I kept working with an ExternalTransformRegistrar solution (although if there's an easier way, I'm all ears), and I have Java code that builds, and a Python connector that tries to use it.
My current issue is that the expansion service that's started up doesn't find my transform using the URN provided: RuntimeError: java.lang.UnsupportedOperationException: Unknown urn: beam:external:CAMUS:bigtable_read:v1 And I can see that my transform wasn't registered: INFO:apache_beam.utils.subprocess_server:b'INFO: Registering external transforms: [beam:transform:org.apache.beam:pubsub_read:v1, beam:transform:org.apache.beam:pubsub_write:v1, beam:transform:org.apache.beam:pubsublite_write:v1, beam:transform:org.apache.beam:pubsublite_read:v1, beam:transform:org.apache.beam:spanner_insert:v1, beam:transform:org.apache.beam:spanner_update:v1, beam:transform:org.apache.beam:spanner_replace:v1, beam:transform:org.apache.beam:spanner_insert_or_update:v1, beam:transform:org.apache.beam:spanner_delete:v1, beam:transform:org.apache.beam:spanner_read:v1, beam:transform:org.apache.beam:schemaio_bigquery_read:v1, beam:transform:org.apache.beam:schemaio_bigquery_write:v1, beam:transform:org.apache.beam:schemaio_datastoreV1_read:v1, beam:transform:org.apache.beam:schemaio_datastoreV1_write:v1, beam:transform:org.apache.beam:schemaio_pubsub_read:v1, beam:transform:org.apache.beam:schemaio_pubsub_write:v1, beam:transform:org.apache.beam:schemaio_jdbc_read:v1, beam:transform:org.apache.beam:schemaio_jdbc_write:v1, beam:transform:org.apache.beam:schemaio_avro_read:v1, beam:transform:org.apache.beam:schemaio_avro_write:v1, beam:external:java:generate_sequence:v1]' I'm creating the expansion service in code like this: expansion_service = BeamJarExpansionService( 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar', extra_args=["{{PORT}}", '--javaClassLookupAllowlistFile=*'], classpath=[ "/home/builder/xlang/bando/bazel-bin/bigtable/libjava_hbase.jar"]) where libjava_hbase.jar was built by Bazel and contains my code: $ jar tf libjava_hbase.jar META-INF/ META-INF/MANIFEST.MF energy/ energy/camus/ energy/camus/beam/ energy/camus/beam/BigtableRegistrar$BigtableReadBuilder$Configuration.class energy/camus/beam/BigtableRegistrar$BigtableReadBuilder.class energy/camus/beam/BigtableRegistrar$CrossLanguageConfiguration.class energy/camus/beam/BigtableRegistrar.class The relevant part of my code that does the registration looks like this: @AutoService(ExternalTransformRegistrar.class) public class BigtableRegistrar implements ExternalTransformRegistrar { static final String READ_URN = "beam:external:CAMUS:bigtable_read:v1"; @Override public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() { return ImmutableMap.of(READ_URN, new BigtableReadBuilder()); } What am I missing that prevents my transform to be registered? Thanks, -Lina On Tue, Dec 27, 2022 at 5:11 PM Lina Mårtensson <lina@camus.energy> wrote: > I finally was able to get back to this and try to make an x-language > transform for Bigtable to be used in Python, but I could use some help. > > I started out with the Bigtable > <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java> > library, and it seemed like I should be able to go with option 1 here > <https://beam.apache.org/documentation/programming-guide/#1311-creating-cross-language-java-transforms>, > i.e. not write any Java code. > > As a non-Java user, it still wasn't obvious how to get this working, but I > eventually got it: > > java_transform = JavaExternalTransform( > > 'org.apache.beam.sdk.io.gcp.bigtable.BigtableIO', > > > BeamJarExpansionService( > > > > 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar', > > > extra_args=["{{PORT}}", > '--javaClassLookupAllowlistFile=*']) > > ).read().withProjectId(projectId="myProjectId") > > > > > > data = p | 'Read from Bigtable' >> java_transform > > It wasn't clear to me how to find the right jar to use, or that I needed > to add the extra_args when specifying my own JAR. > > However, I get the following error: > > RuntimeError: java.lang.RuntimeException: Expected to find exactly one > matching method in transform Read{config=BigtableConfig{projectId=null, > instanceId=null, tableId=, bigtableOptionsConfigurator=null, options=null}, > readOptions=BigtableReadOptions{rowFilter=null, > keyRanges=[ByteKeyRange{startKey=[], endKey=[]}]}} for BuilderMethodname: > "withProjectId" > > schema { > > fields { > > name: "projectId" > > type { > > atomic_type: STRING > > } > > } > > id: "8b43f1f0-313f-4b46-9559-d9d11fd7ecf2" > > } > > payload: "\001\000\vmyProjectId" > > but found 2 > > at > org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.getMethod(JavaClassLookupTransformProvider.java:236) > > at > org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.applyBuilderMethods(JavaClassLookupTransformProvider.java:145) > > at > org.apache.beam.sdk.expansion.service.JavaClassLookupTransformProvider.getTransform(JavaClassLookupTransformProvider.java:129) > > at > org.apache.beam.sdk.expansion.service.ExpansionService$TransformProvider.apply(ExpansionService.java:396) > > at > org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:515) > > at > org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:595) > > at > org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:220) > > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) > > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340) > > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866) > > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) > > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > at java.base/java.lang.Thread.run(Thread.java:829) > > I believe this is pointing out that there are two withProjectId methods - > one that takes String > <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L285>, > one that takes a ValueProvider > <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L273> > . > > I take it this means that the write-no-Java option won't work here? The > HBase > <https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java> > implementation looks like it would have the same issue. > > Before I try and write Java code and convince my team that we're OK to > have some Java code, I wanted to check if there's anything I'm missing, or > if I'll need to go with the process described in 13.1.1.2 and implement an > ExternalTransformBuilder and an ExternalTransformRegistrar. > > Thanks! > -Lina > > > On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> >> >> On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson <lina@camus.energy> >> wrote: >> >>> Thanks for the detailed answers! >>> >>> I totally get the points about development & maintenance cost, and, >>> from a user perspective, about getting the performance right. >>> >>> I decided to try out the Spanner connector to get a sense of how well >>> the x-language approach works in our world, since that's an existing >>> x-language connector. >>> Overall, it works and with minimal intervention as you say - it is >>> very slow, though. >>> I'm a little confused about "portable runners" - if I understand this >>> correctly, this means we couldn't run with the DirectRunner anymore if >>> using an x-language connector? (At least it didn't work when I tried >>> it.) >>> >> >> You'll have to use the portable DirectRunner - >> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability >> >> Job service for this can be started using following command: >> python apache_beam/runners/portability/local_job_service_main.py -p <port> >> >> Instructions for using this should be similar to here (under "Portable >> (Java/Python/Go)"): https://beam.apache.org/documentation/runners/flink/ >> >> >>> >>> My test of running a trivial GCS-to-Spanner job with 18 KB of input on >>> Dataflow takes about 15 minutes end-to-end. 5+ minutes of that is >>> uploading the expansion service to GCS, and the startup time on >>> Dataflow takes several minutes as well: >>> "INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS >>> upload to >>> gs://dataflow-staging-us-central1-92d40d9a13427cbb4dfa41465ce57494/beamapp-lina-0728173601-761137-4rfo0mb9.1659029761.762052/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar >>> in 337 seconds." >>> Is that expected, or are we doing something strange here? My internet >>> isn't very fast here, so these up/downloads can really slow things >>> down. >>> I tried adding --prebuild_sdk_container_engine=cloud_build but that >>> doesn't affect the .jar file. >>> >> >> There are several things contributing to the end-to-end execution time. >> >> * Time to stage dependencies including the shaded jar file (that is used >> both by the expansion service and at runtime). >> >> This is cross-language only. But you control the jar file. You are trying >> to use the >> existing beam-sdks-java-io-google-cloud-platform-expansion-service jar >> which is a 114 MB file. >> >> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform-expansion-service/2.39.0 >> >> Not exactly sure why it took 337 seconds. But could possibly be a network >> issue. You could also define a new smaller expansion service jar just for >> Spanner if needed. >> >> * Time to start the job >> This is mostly common for both cross-language and non-cross-language >> jobs. Starting up the Dataflow worker pool could take some time. >> Cross-language could take slightly longer since we need to start both Java >> and Python containers but this is a fixed cost (not dependent on the >> job/input size). >> >> * Time to execute the job. >> This is what I'd compare if you want to decide on a pure-Python vs a Java >> cross-language implementation just based on performance. Cross-language >> version would have an added cost to serialize data and send across SDK >> harness containers (within the same VM for Dataflow). >> On the other hand cross-language version would be reading using a >> Java implementation which I expected to be more performant than a pure >> Python read implementation. >> >> Hope this helps. >> >> Thanks, >> Cham >> >> >> >> >>> >>> If we can get this to a workable time, and/or iterate locally, then I >>> think an x-language connector for Bigtable could work out well. >>> Otherwise we might have to look at a native Python version after all. >>> >>> Thanks! >>> -Lina >>> >>> On Wed, Jul 27, 2022 at 1:39 PM Chamikara Jayalath <chamik...@google.com> >>> wrote: >>> > >>> > >>> > >>> > On Wed, Jul 27, 2022 at 11:10 AM Lina Mårtensson <lina@camus.energy> >>> wrote: >>> >> >>> >> Thanks Cham! >>> >> >>> >> Could you provide some more detail on your preference for developing a >>> >> Python wrapper rather than implementing a source purely in Python? >>> > >>> > >>> > I've mentioned the main advantages of developing a cross-language >>> transform over natively implementing this in Python below. >>> > >>> > * Reduced cost of development >>> > >>> > It's much easier to develop a cross-language wrapper of the Java >>> source than re-implementing the source in Python. Sources are some of the >>> most complex >>> > code we have in Beam and sources control the parallelization of the >>> pipeline (for example, splitting and dynamic work rebalancing for supported >>> runners). So getting this code wrong can result in hard to track data >>> loss/duplication related issues. >>> > Additionally, based on my experience, it's very hard to get a source >>> implementation correct and performant on the first try. It could take >>> additional benchmarks/user feedback over time to get the source production >>> ready. >>> > Java BT source is already battle tested well (actually we have two >>> Java implementations [1][2] currently). So I would rather use a Java BT >>> connector as a cross-language transform than re-implementing sources for >>> other SDKs. >>> > >>> > * Minimal maintenance cost >>> > >>> > Developing a source/sink is just a part of the story. We (as a >>> community) have to maintain it over time and make sure that ongoing >>> issues/feature requests are adequately handled. In the past, we have had >>> cases where sources/sinks are available for multiple SDKs but one >>> > is significantly better than others when it comes to the feature set >>> (for example, BigQuery). Cross-language will make this easier and will >>> allow us to maintain key logic in a single place. >>> > >>> >> >>> >> >>> >> If I look at the instructions for using the x-language Spanner >>> >> connector, then using this - from the user's perspective - would >>> >> involve installing a Java runtime. >>> >> That's not terrible, but I fear that getting this to work with bazel >>> >> might end up being more trouble than expected. (That has often >>> >> happened here, and we have enough trouble with getting Python 3.9 and >>> >> 3.10 to co-exist.) >>> > >>> > >>> > From an end user perspective, all they should have to do is make sure >>> that Java is available in the machine where the job is submitted from. Beam >>> has features to allow starting up cross-language expansion services (that >>> is needed during job submission) automatically so users should not have to >>> do anything other than that. >>> > >>> > At job execution, Beam (portable) uses Docker-based SDK harness >>> containers and we already release appropriate containers for each SDK. The >>> runners should seamlessly download containers needed to execute the job. >>> > >>> > That said, the main downside of cross-language today is runner >>> support. Cross-language transform support is only available for portable >>> Beam runners (for example, Dataflow Runner v2) but this is the direction >>> Beam runners are going anyway. >>> > >>> >> >>> >> >>> >> There are a few of us at our small start-up that have written >>> >> MapReduces and similar in the past and are completely convinced by the >>> >> Beam/Dataflow model. But many others have no previous experience and >>> >> are skeptical, and see this new tool we're introducing as something >>> >> that's more trouble than it's worth, and something they'd rather avoid >>> >> - even when we see how lots of their use cases could be made much >>> >> easier using Beam. I'm worried that every extra hoop to jump through >>> >> will make it less likely to be widely used for us. Because of that, my >>> >> bias would be towards having a Python connector rather than >>> >> x-language, and I would find it really helpful to learn about why you >>> >> both favor the x-language option. >>> > >>> > >>> > I understand your concerns. It's certainly possible to develop the >>> same connector in multiple SDKs (and we provide SDF source framework >>> support in all SDK languages). But hopefully my comments above will give >>> you an idea of the downsides of this approach :). >>> > >>> > Thanks, >>> > Cham >>> > >>> > [1] >>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java >>> > [2] https://cloud.google.com/bigtable/docs/hbase-dataflow-java >>> > >>> >> >>> >> >>> >> Thanks! >>> >> -Lina >>> >> >>> >> On Tue, Jul 26, 2022 at 6:11 PM Chamikara Jayalath < >>> chamik...@google.com> wrote: >>> >> > >>> >> > >>> >> > >>> >> > On Mon, Jul 25, 2022 at 12:53 PM Lina Mårtensson via dev < >>> dev@beam.apache.org> wrote: >>> >> >> >>> >> >> Hi dev, >>> >> >> >>> >> >> We're starting to incorporate BigTable in our stack and I've >>> delighted >>> >> >> my co-workers with how easy it was to create some BigTables with >>> >> >> Beam... but there doesn't appear to be a reader for BigTable in >>> >> >> Python. >>> >> >> >>> >> >> First off, is there a good reason why not/any reason why it would >>> be difficult? >>> >> > >>> >> > >>> >> > There's was a previous effort to implement a Python BT source but >>> that was not completed: >>> https://github.com/apache/beam/pull/11295#issuecomment-646378304 >>> >> > >>> >> >> >>> >> >> >>> >> >> I could write one, but before I start, I'd love some input to make >>> it easier. >>> >> >> >>> >> >> It appears that there would be two options: either write one in >>> >> >> Python, or try to set one up with x-language from Java which I see >>> is >>> >> >> done e.g. with the Spanner IO Connector. >>> >> >> Any recommendation on which one to pick or potential pitfalls in >>> either choice? >>> >> >> >>> >> >> If I write one in Python, what should I think about? >>> >> >> It is not obvious to me how to achieve parallelization, so any tips >>> >> >> here would be welcome. >>> >> > >>> >> > >>> >> > I would strongly prefer developing a Python wrapper for the >>> existing Java BT source using Beam's Multi-language Pipelines framework >>> over developing a new Python source. >>> >> > >>> https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines >>> >> > >>> >> > Thanks, >>> >> > Cham >>> >> > >>> >> > >>> >> >> >>> >> >> >>> >> >> Thanks! >>> >> >> -Lina >>> >>