Thanks! Moving my DoFn into a new module worked, and that solved the slowness as well. I tried importing it in setup() as well, but that didn't work.
On Fri, Jan 6, 2023 at 2:25 PM Luke Cwik <lc...@google.com> wrote: > The proto (java) -> bytes -> proto (python) sounds good. > > Have you tried moving your DoFn outside of your main module into a new > module as per [1]. Other suggestions are to do the import in the function. > Can you do the import once in the setup()[2] function? Have you considered > using the cloud profiler[3] to see what is actually slow? > > 1: > https://stackoverflow.com/questions/69436706/nameerror-name-beam-is-not-defined-in-lambda > 2: > https://github.com/apache/beam/blob/f9d5de34ae1dad251f5580073c0245a206224a69/sdks/python/apache_beam/transforms/core.py#L670 > 3: > https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline#python > > > On Fri, Jan 6, 2023 at 11:19 AM Lina Mårtensson <lina@camus.energy> wrote: > >> I am *so close* it seems. ;) >> >> I followed Luke's advice and am reading the proto >> com.google.bigtable.v2.Row, then use a transform to convert that to >> bytes in order to be able to send it across to Python. (I assume that's >> what I should be doing with the proto?) >> Once on the Python side, when running on Dataflow, I'm running into the >> dreaded NameError. >> save_main_session is True. >> >> Either >> from google.cloud.bigtable_v2.types import Row >> ... >> class ParsePB(beam.DoFn): >> def process(self, pb_bytes): >> row = Row() >> row.ParseFromString(pb_bytes) >> >> or >> >> from google.cloud.bigtable_v2.proto import data_pb2 as data_v2_pb2 >> ... >> class ParsePB(beam.DoFn): >> def process(self, pb_bytes): >> row = Row() >> row.ParseFromString(pb_bytes) >> >> works in the DirectRunner (if I skip the Java connection and fake input >> data), but not on Dataflow. >> It works if I put the import in the process() function, although then >> running the code is super slow. (I'm not sure why, but running an import on >> every entry definitely sounds like it could cause that!) >> >> (I still have issues with the DirectRunner, as per my previous email.) >> >> Is there a good way to get around this? >> >> Thanks! >> -Lina >> >> On Thu, Jan 5, 2023 at 4:49 PM Lina Mårtensson <lina@camus.energy> wrote: >> >>> Great, thanks! That was a huge improvement. >>> >>> >>> On Thu, Jan 5, 2023 at 12:52 PM Luke Cwik <lc...@google.com> wrote: >>> >>>> By default Beam Java only uploads artifacts that have changed but it >>>> looks like this is not the case for Beam Python and you need to explicitly >>>> opt in with the --enable_artifact_caching flag[1]. >>>> >>>> It looks like this feature was added 1 year ago[2], should we make this >>>> on by default? >>>> >>>> 1: >>>> https://github.com/apache/beam/blob/3070160203c6734da0eb04b440e08b43f9fd33f3/sdks/python/apache_beam/options/pipeline_options.py#L794 >>>> 2: https://github.com/apache/beam/pull/16229 >>>> >>>> >>>> >>>> On Thu, Jan 5, 2023 at 11:43 AM Lina Mårtensson <lina@camus.energy> >>>> wrote: >>>> >>>>> Thanks! I have now successfully written a beautiful string of protobuf >>>>> bytes into a file via Python. 🎉 >>>>> >>>>> Two issues though: >>>>> 1. Robert said the Python direct runner would just work with this - >>>>> but it's not working. After about half an hour of these messages repeated >>>>> over and over again I interrupted the job: >>>>> >>>>> E0105 07:25:48.170601677 58210 fork_posix.cc:76] Other >>>>> threads are currently calling into gRPC, skipping fork() handlers >>>>> >>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2023/01/05 >>>>> 06:57:10 Failed to obtain provisioning information: failed to dial server >>>>> at localhost:41087\n\tcaused by:\ncontext deadline exceeded\n' >>>>> 2. I (unsurprisingly) get back to the issue I had when I tested out >>>>> the Spanner x-lang transform on Dataflow - the overhead for starting a job >>>>> is unbearably slow, the time mainly spent in transferring the expansion >>>>> service jar (115 MB) + my jar (105 MB) with my new code and its >>>>> dependencies: >>>>> >>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS >>>>> upload to >>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar... >>>>> >>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS >>>>> upload to >>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar >>>>> in 321 seconds. >>>>> >>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS >>>>> upload to >>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar... >>>>> >>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS >>>>> upload to >>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar >>>>> in 295 seconds. >>>>> I have a total of 13 minutes until any workers have started on >>>>> Dataflow, then another 4.5 minutes once the job actually does anything >>>>> (which eventually is to read a whopping 3 cells from Bigtable ;). >>>>> >>>>> How could this be improved? >>>>> For one, it seems to me like the upload of >>>>> sdks:java:io:google-cloud-platform:expansion-service:shadowJar from >>>>> my computer shouldn't be necessary - shouldn't Dataflow have that >>>>> already/could it be fetched by Dataflow rather than having to upload it >>>>> over slow internet? >>>>> And what about my own jar - it's not bound to change very often, so >>>>> would it be possible to upload somewhere and then fetch it from there? >>>>> >>>>> Thanks! >>>>> -Lina >>>>> >>>>> On Tue, Jan 3, 2023 at 1:23 PM Luke Cwik <lc...@google.com> wrote: >>>>> >>>>>> I would suggest using BigtableIO which also returns a >>>>>> protobuf com.google.bigtable.v2.Row. This should allow you to replicate >>>>>> what SpannerIO is doing. >>>>>> >>>>>> Alternatively you could provide a way to convert the HBase result >>>>>> into a Beam row by specifying a converter and a schema for it and then >>>>>> you >>>>>> could use the already well known Beam Schema type: >>>>>> >>>>>> https://github.com/apache/beam/blob/0b8f0b4db7a0de4977e30bcfeb50b5c14c7c1572/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1068 >>>>>> >>>>>> Otherwise you'll have to register the HBase result coder with a well >>>>>> known name so that the runner API coder URN is something that you know >>>>>> and >>>>>> then on the Python side you would need a coder for that URN as well allow >>>>>> you to understand the bytes being sent across from the Java portion of >>>>>> the >>>>>> pipeline. >>>>>> >>>>>> On Fri, Dec 30, 2022 at 12:59 AM Lina Mårtensson <lina@camus.energy> >>>>>> wrote: >>>>>> >>>>>>> And next issue... I'm getting KeyError: 'beam:coders:javasdk:0.1' which >>>>>>> I learned >>>>>>> <https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips> >>>>>>> is because the transform is trying to return something that there isn't >>>>>>> a standard >>>>>>> Beam coder for >>>>>>> <https://github.com/apache/beam/blob/05428866cdbf1ea8e4c1789dd40327673fd39451/model/pipeline/src/main/proto/beam_runner_api.proto#L784> >>>>>>> . >>>>>>> Makes sense, but... how do I fix this? The documentation talks >>>>>>> about how to do this for the input, but not for the output. >>>>>>> >>>>>>> Comparing to Spanner, it looks like Spanner returns a protobuf, >>>>>>> which I'm guessing somehow gets converted to bytes... But >>>>>>> CloudBigtableIO >>>>>>> <https://github.com/googleapis/java-bigtable-hbase/blob/main/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java> >>>>>>> returns org.apache.hadoop.hbase.client.Result. >>>>>>> >>>>>>> My buildExternal method looks like follows: >>>>>>> >>>>>>> @Override >>>>>>> >>>>>>> public PTransform<PBegin, PCollection<Result>> >>>>>>> buildExternal( >>>>>>> >>>>>>> BigtableReadBuilder.Configuration configuration) { >>>>>>> >>>>>>> >>>>>>> return Read.from(CloudBigtableIO.read( >>>>>>> >>>>>>> new CloudBigtableScanConfiguration.Builder() >>>>>>> >>>>>>> >>>>>>> .withProjectId(configuration.projectId) >>>>>>> >>>>>>> >>>>>>> .withInstanceId(configuration.instanceId) >>>>>>> >>>>>>> >>>>>>> .withTableId(configuration.tableId) >>>>>>> >>>>>>> .build() >>>>>>> >>>>>>> )); >>>>>>> >>>>>>> >>>>>>> I also got a warning, which I *believe* is unrelated (but also an >>>>>>> issue): >>>>>>> >>>>>>> INFO:apache_beam.utils.subprocess_server:b"WARNING: Configuration >>>>>>> class >>>>>>> 'energy.camus.beam.BigtableRegistrar$BigtableReadBuilder$Configuration' >>>>>>> has >>>>>>> no schema registered. Attempting to construct with setter approach." >>>>>>> >>>>>>> INFO:apache_beam.utils.subprocess_server:b'Dec 30, 2022 7:46:14 AM >>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader >>>>>>> payloadToConfig' >>>>>>> What is this schema and what should it look like? >>>>>>> >>>>>>> Thanks! >>>>>>> -Lina >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Dec 30, 2022 at 12:28 AM Lina Mårtensson <lina@camus.energy> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks! This was really helpful. It took a while to figure out the >>>>>>>> details - a section in the docs on what's required of these jars for >>>>>>>> non-Java users would be a great addition. >>>>>>>> >>>>>>>> But once I did, the Bazel config was actually quite straightforward >>>>>>>> and makes sense. >>>>>>>> I pasted the first section from here >>>>>>>> <https://github.com/bazelbuild/rules_jvm_external/blob/master/README.md#usage> >>>>>>>> into >>>>>>>> my WORKSPACE file and changed the artifacts to the ones I needed. (How >>>>>>>> to >>>>>>>> find the right ones remains confusing.) >>>>>>>> >>>>>>>> After that I updated my BUILD rules and Blaze had easy and >>>>>>>> straightforward configs for it, all I needed was this: >>>>>>>> >>>>>>>> # From >>>>>>>> https://github.com/google/bazel-common/blob/master/third_party/java/auto/BUILD >>>>>>>> . >>>>>>>> >>>>>>>> # The auto service is what registers our Registrar class, and it >>>>>>>> needs to be a plugin which >>>>>>>> >>>>>>>> # makes it run at compile-time. >>>>>>>> >>>>>>>> java_plugin( >>>>>>>> >>>>>>>> name = "auto_service_processor", >>>>>>>> >>>>>>>> processor_class = >>>>>>>> "com.google.auto.service.processor.AutoServiceProcessor", >>>>>>>> >>>>>>>> deps = [ >>>>>>>> >>>>>>>> "@maven//:com_google_auto_service_auto_service", >>>>>>>> >>>>>>>> "@maven//:com_google_auto_service_auto_service_annotations" >>>>>>>> , >>>>>>>> >>>>>>>> "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre", >>>>>>>> >>>>>>>> ], >>>>>>>> >>>>>>>> ) >>>>>>>> >>>>>>>> >>>>>>>> java_binary( >>>>>>>> >>>>>>>> name = "java_hbase", >>>>>>>> >>>>>>>> main_class = "energy.camus.beam.BigtableRegistrar", >>>>>>>> >>>>>>>> plugins = [":auto_service_processor"], >>>>>>>> >>>>>>>> srcs = [ >>>>>>>> "src/main/java/energy/camus/beam/BigtableRegistrar.java"], >>>>>>>> >>>>>>>> deps = [ >>>>>>>> >>>>>>>> "@maven//:com_google_auto_service_auto_service", >>>>>>>> >>>>>>>> "@maven//:com_google_auto_service_auto_service_annotations" >>>>>>>> , >>>>>>>> >>>>>>>> "@maven//:com_google_cloud_bigtable_bigtable_hbase_beam", >>>>>>>> >>>>>>>> >>>>>>>> "@maven//:org_apache_beam_beam_sdks_java_core", >>>>>>>> >>>>>>>> "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre", >>>>>>>> >>>>>>>> "@maven//:org_apache_hbase_hbase_shaded_client", >>>>>>>> >>>>>>>> ], >>>>>>>> >>>>>>>> ) >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Dec 29, 2022 at 2:43 PM Luke Cwik <lc...@google.com> wrote: >>>>>>>> >>>>>>>>> AutoService relies on Java's compiler annotation processor. >>>>>>>>> https://github.com/google/auto/tree/main/service#getting-started >>>>>>>>> shows that you need to configure Java's compiler to use the annotation >>>>>>>>> processors within AutoService. >>>>>>>>> >>>>>>>>> I saw this public gist that seemed to enable using the AutoService >>>>>>>>> annotation processor with Bazel >>>>>>>>> https://gist.github.com/jart/5333824b94cd706499a7bfa1e086ee00 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Dec 29, 2022 at 2:27 PM Lina Mårtensson via dev < >>>>>>>>> dev@beam.apache.org> wrote: >>>>>>>>> >>>>>>>>>> That's good news about the direct runner, thanks! >>>>>>>>>> >>>>>>>>>> On Thu, Dec 29, 2022 at 2:02 PM Robert Bradshaw < >>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath via dev >>>>>>>>>>> <dev@beam.apache.org> wrote: >>>>>>>>>>> > >>>>>>>>>>> > On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson >>>>>>>>>>> <lina@camus.energy> wrote: >>>>>>>>>>> >> >>>>>>>>>>> >> Thanks for the detailed answers! >>>>>>>>>>> >> >>>>>>>>>>> >> I totally get the points about development & maintenance >>>>>>>>>>> cost, and, >>>>>>>>>>> >> from a user perspective, about getting the performance right. >>>>>>>>>>> >> >>>>>>>>>>> >> I decided to try out the Spanner connector to get a sense of >>>>>>>>>>> how well >>>>>>>>>>> >> the x-language approach works in our world, since that's an >>>>>>>>>>> existing >>>>>>>>>>> >> x-language connector. >>>>>>>>>>> >> Overall, it works and with minimal intervention as you say - >>>>>>>>>>> it is >>>>>>>>>>> >> very slow, though. >>>>>>>>>>> >> I'm a little confused about "portable runners" - if I >>>>>>>>>>> understand this >>>>>>>>>>> >> correctly, this means we couldn't run with the DirectRunner >>>>>>>>>>> anymore if >>>>>>>>>>> >> using an x-language connector? (At least it didn't work when >>>>>>>>>>> I tried >>>>>>>>>>> >> it.) >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > You'll have to use the portable DirectRunner - >>>>>>>>>>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability >>>>>>>>>>> > >>>>>>>>>>> > Job service for this can be started using following command: >>>>>>>>>>> > python >>>>>>>>>>> apache_beam/runners/portability/local_job_service_main.py -p <port> >>>>>>>>>>> >>>>>>>>>>> Note that the Python direct runner is already a portable runner, >>>>>>>>>>> so >>>>>>>>>>> you shouldn't have to do anything special (like start up a >>>>>>>>>>> separate >>>>>>>>>>> job service and pass extra options) to run locally. Just use the >>>>>>>>>>> cross-language transforms as you would any normal Python >>>>>>>>>>> transform. >>>>>>>>>>> >>>>>>>>>>> The goal is to make this as smooth and transparent as possible; >>>>>>>>>>> please >>>>>>>>>>> keep coming back to us if you find rough edges. >>>>>>>>>>> >>>>>>>>>>