Thanks! Moving my DoFn into a new module worked, and that solved the
slowness as well.
I tried importing it in setup() as well, but that didn't work.

On Fri, Jan 6, 2023 at 2:25 PM Luke Cwik <lc...@google.com> wrote:

> The proto (java) -> bytes -> proto (python) sounds good.
>
> Have you tried moving your DoFn outside of your main module into a new
> module as per [1]. Other suggestions are to do the import in the function.
> Can you do the import once in the setup()[2] function? Have you considered
> using the cloud profiler[3] to see what is actually slow?
>
> 1:
> https://stackoverflow.com/questions/69436706/nameerror-name-beam-is-not-defined-in-lambda
> 2:
> https://github.com/apache/beam/blob/f9d5de34ae1dad251f5580073c0245a206224a69/sdks/python/apache_beam/transforms/core.py#L670
> 3:
> https://cloud.google.com/dataflow/docs/guides/profiling-a-pipeline#python
>
>
> On Fri, Jan 6, 2023 at 11:19 AM Lina Mårtensson <lina@camus.energy> wrote:
>
>> I am *so close* it seems. ;)
>>
>> I followed Luke's advice and am reading the proto
>> com.google.bigtable.v2.Row, then use a transform to convert that to
>> bytes in order to be able to send it across to Python. (I assume that's
>> what I should be doing with the proto?)
>> Once on the Python side, when running on Dataflow, I'm running into the
>> dreaded NameError.
>> save_main_session is True.
>>
>> Either
>> from google.cloud.bigtable_v2.types import Row
>> ...
>> class ParsePB(beam.DoFn):
>>     def process(self, pb_bytes):
>>         row = Row()
>>         row.ParseFromString(pb_bytes)
>>
>> or
>>
>> from google.cloud.bigtable_v2.proto import data_pb2 as data_v2_pb2
>> ...
>> class ParsePB(beam.DoFn):
>>     def process(self, pb_bytes):
>>         row = Row()
>>         row.ParseFromString(pb_bytes)
>>
>> works in the DirectRunner (if I skip the Java connection and fake input
>> data), but not on Dataflow.
>> It works if I put the import in the process() function, although then
>> running the code is super slow. (I'm not sure why, but running an import on
>> every entry definitely sounds like it could cause that!)
>>
>> (I still have issues with the DirectRunner, as per my previous email.)
>>
>> Is there a good way to get around this?
>>
>> Thanks!
>> -Lina
>>
>> On Thu, Jan 5, 2023 at 4:49 PM Lina Mårtensson <lina@camus.energy> wrote:
>>
>>> Great, thanks! That was a huge improvement.
>>>
>>>
>>> On Thu, Jan 5, 2023 at 12:52 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> By default Beam Java only uploads artifacts that have changed but it
>>>> looks like this is not the case for Beam Python and you need to explicitly
>>>> opt in with the --enable_artifact_caching flag[1].
>>>>
>>>> It looks like this feature was added 1 year ago[2], should we make this
>>>> on by default?
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/3070160203c6734da0eb04b440e08b43f9fd33f3/sdks/python/apache_beam/options/pipeline_options.py#L794
>>>> 2: https://github.com/apache/beam/pull/16229
>>>>
>>>>
>>>>
>>>> On Thu, Jan 5, 2023 at 11:43 AM Lina Mårtensson <lina@camus.energy>
>>>> wrote:
>>>>
>>>>> Thanks! I have now successfully written a beautiful string of protobuf
>>>>> bytes into a file via Python. 🎉
>>>>>
>>>>> Two issues though:
>>>>> 1. Robert said the Python direct runner would just work with this -
>>>>> but it's not working. After about half an hour of these messages repeated
>>>>> over and over again I interrupted the job:
>>>>>
>>>>> E0105 07:25:48.170601677   58210 fork_posix.cc:76]           Other
>>>>> threads are currently calling into gRPC, skipping fork() handlers
>>>>>
>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2023/01/05
>>>>> 06:57:10 Failed to obtain provisioning information: failed to dial server
>>>>> at localhost:41087\n\tcaused by:\ncontext deadline exceeded\n'
>>>>> 2. I (unsurprisingly) get back to the issue I had when I tested out
>>>>> the Spanner x-lang transform on Dataflow - the overhead for starting a job
>>>>> is unbearably slow, the time mainly spent in transferring the expansion
>>>>> service jar (115 MB) + my jar (105 MB) with my new code and its
>>>>> dependencies:
>>>>>
>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS
>>>>> upload to
>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar...
>>>>>
>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS
>>>>> upload to
>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar
>>>>> in 321 seconds.
>>>>>
>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS
>>>>> upload to
>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar...
>>>>>
>>>>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS
>>>>> upload to
>>>>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar
>>>>> in 295 seconds.
>>>>> I have a total of 13 minutes until any workers have started on
>>>>> Dataflow, then another 4.5 minutes once the job actually does anything
>>>>> (which eventually is to read a whopping 3 cells from Bigtable ;).
>>>>>
>>>>> How could this be improved?
>>>>> For one, it seems to me like the upload of
>>>>> sdks:java:io:google-cloud-platform:expansion-service:shadowJar from
>>>>> my computer shouldn't be necessary - shouldn't Dataflow have that
>>>>> already/could it be fetched by Dataflow rather than having to upload it
>>>>> over slow internet?
>>>>> And what about my own jar - it's not bound to change very often, so
>>>>> would it be possible to upload somewhere and then fetch it from there?
>>>>>
>>>>> Thanks!
>>>>> -Lina
>>>>>
>>>>> On Tue, Jan 3, 2023 at 1:23 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> I would suggest using BigtableIO which also returns a
>>>>>> protobuf com.google.bigtable.v2.Row. This should allow you to replicate
>>>>>> what SpannerIO is doing.
>>>>>>
>>>>>> Alternatively you could provide a way to convert the HBase result
>>>>>> into a Beam row by specifying a converter and a schema for it and then 
>>>>>> you
>>>>>> could use the already well known Beam Schema type:
>>>>>>
>>>>>> https://github.com/apache/beam/blob/0b8f0b4db7a0de4977e30bcfeb50b5c14c7c1572/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1068
>>>>>>
>>>>>> Otherwise you'll have to register the HBase result coder with a well
>>>>>> known name so that the runner API coder URN is something that you know 
>>>>>> and
>>>>>> then on the Python side you would need a coder for that URN as well allow
>>>>>> you to understand the bytes being sent across from the Java portion of 
>>>>>> the
>>>>>> pipeline.
>>>>>>
>>>>>> On Fri, Dec 30, 2022 at 12:59 AM Lina Mårtensson <lina@camus.energy>
>>>>>> wrote:
>>>>>>
>>>>>>> And next issue... I'm getting KeyError: 'beam:coders:javasdk:0.1' which
>>>>>>> I learned
>>>>>>> <https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips>
>>>>>>> is because the transform is trying to return something that there isn't 
>>>>>>> a standard
>>>>>>> Beam coder for
>>>>>>> <https://github.com/apache/beam/blob/05428866cdbf1ea8e4c1789dd40327673fd39451/model/pipeline/src/main/proto/beam_runner_api.proto#L784>
>>>>>>> .
>>>>>>> Makes sense, but... how do I fix this? The documentation talks
>>>>>>> about how to do this for the input, but not for the output.
>>>>>>>
>>>>>>> Comparing to Spanner, it looks like Spanner returns a protobuf,
>>>>>>> which I'm guessing somehow gets converted to bytes... But
>>>>>>> CloudBigtableIO
>>>>>>> <https://github.com/googleapis/java-bigtable-hbase/blob/main/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java>
>>>>>>> returns org.apache.hadoop.hbase.client.Result.
>>>>>>>
>>>>>>> My buildExternal method looks like follows:
>>>>>>>
>>>>>>>         @Override
>>>>>>>
>>>>>>>         public PTransform<PBegin, PCollection<Result>>
>>>>>>> buildExternal(
>>>>>>>
>>>>>>>                 BigtableReadBuilder.Configuration configuration) {
>>>>>>>
>>>>>>>
>>>>>>>             return Read.from(CloudBigtableIO.read(
>>>>>>>
>>>>>>>                 new CloudBigtableScanConfiguration.Builder()
>>>>>>>
>>>>>>>
>>>>>>>                     .withProjectId(configuration.projectId)
>>>>>>>
>>>>>>>
>>>>>>>                     .withInstanceId(configuration.instanceId)
>>>>>>>
>>>>>>>
>>>>>>>                     .withTableId(configuration.tableId)
>>>>>>>
>>>>>>>                     .build()
>>>>>>>
>>>>>>>             ));
>>>>>>>
>>>>>>>
>>>>>>> I also got a warning, which I *believe* is unrelated (but also an
>>>>>>> issue):
>>>>>>>
>>>>>>> INFO:apache_beam.utils.subprocess_server:b"WARNING: Configuration
>>>>>>> class
>>>>>>> 'energy.camus.beam.BigtableRegistrar$BigtableReadBuilder$Configuration' 
>>>>>>> has
>>>>>>> no schema registered. Attempting to construct with setter approach."
>>>>>>>
>>>>>>> INFO:apache_beam.utils.subprocess_server:b'Dec 30, 2022 7:46:14 AM
>>>>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>>>>>>> payloadToConfig'
>>>>>>> What is this schema and what should it look like?
>>>>>>>
>>>>>>> Thanks!
>>>>>>> -Lina
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 30, 2022 at 12:28 AM Lina Mårtensson <lina@camus.energy>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks! This was really helpful. It took a while to figure out the
>>>>>>>> details - a section in the docs on what's required of these jars for
>>>>>>>> non-Java users would be a great addition.
>>>>>>>>
>>>>>>>> But once I did, the Bazel config was actually quite straightforward
>>>>>>>> and makes sense.
>>>>>>>> I pasted the first section from here
>>>>>>>> <https://github.com/bazelbuild/rules_jvm_external/blob/master/README.md#usage>
>>>>>>>>  into
>>>>>>>> my WORKSPACE file and changed the artifacts to the ones I needed. (How 
>>>>>>>> to
>>>>>>>> find the right ones remains confusing.)
>>>>>>>>
>>>>>>>> After that I updated my BUILD rules and Blaze had easy and
>>>>>>>> straightforward configs for it, all I needed was this:
>>>>>>>>
>>>>>>>> # From
>>>>>>>> https://github.com/google/bazel-common/blob/master/third_party/java/auto/BUILD
>>>>>>>> .
>>>>>>>>
>>>>>>>> # The auto service is what registers our Registrar class, and it
>>>>>>>> needs to be a plugin which
>>>>>>>>
>>>>>>>> # makes it run at compile-time.
>>>>>>>>
>>>>>>>> java_plugin(
>>>>>>>>
>>>>>>>>     name = "auto_service_processor",
>>>>>>>>
>>>>>>>>     processor_class =
>>>>>>>> "com.google.auto.service.processor.AutoServiceProcessor",
>>>>>>>>
>>>>>>>>     deps = [
>>>>>>>>
>>>>>>>>         "@maven//:com_google_auto_service_auto_service",
>>>>>>>>
>>>>>>>>         "@maven//:com_google_auto_service_auto_service_annotations"
>>>>>>>> ,
>>>>>>>>
>>>>>>>>         "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre",
>>>>>>>>
>>>>>>>>     ],
>>>>>>>>
>>>>>>>> )
>>>>>>>>
>>>>>>>>
>>>>>>>> java_binary(
>>>>>>>>
>>>>>>>>     name = "java_hbase",
>>>>>>>>
>>>>>>>>     main_class = "energy.camus.beam.BigtableRegistrar",
>>>>>>>>
>>>>>>>>     plugins = [":auto_service_processor"],
>>>>>>>>
>>>>>>>>     srcs = [
>>>>>>>> "src/main/java/energy/camus/beam/BigtableRegistrar.java"],
>>>>>>>>
>>>>>>>>     deps = [
>>>>>>>>
>>>>>>>>         "@maven//:com_google_auto_service_auto_service",
>>>>>>>>
>>>>>>>>         "@maven//:com_google_auto_service_auto_service_annotations"
>>>>>>>> ,
>>>>>>>>
>>>>>>>>         "@maven//:com_google_cloud_bigtable_bigtable_hbase_beam",
>>>>>>>>
>>>>>>>>
>>>>>>>>         "@maven//:org_apache_beam_beam_sdks_java_core",
>>>>>>>>
>>>>>>>>         "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre",
>>>>>>>>
>>>>>>>>         "@maven//:org_apache_hbase_hbase_shaded_client",
>>>>>>>>
>>>>>>>>     ],
>>>>>>>>
>>>>>>>> )
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 29, 2022 at 2:43 PM Luke Cwik <lc...@google.com> wrote:
>>>>>>>>
>>>>>>>>> AutoService relies on Java's compiler annotation processor.
>>>>>>>>> https://github.com/google/auto/tree/main/service#getting-started
>>>>>>>>> shows that you need to configure Java's compiler to use the annotation
>>>>>>>>> processors within AutoService.
>>>>>>>>>
>>>>>>>>> I saw this public gist that seemed to enable using the AutoService
>>>>>>>>> annotation processor with Bazel
>>>>>>>>> https://gist.github.com/jart/5333824b94cd706499a7bfa1e086ee00
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Dec 29, 2022 at 2:27 PM Lina Mårtensson via dev <
>>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> That's good news about the direct runner, thanks!
>>>>>>>>>>
>>>>>>>>>> On Thu, Dec 29, 2022 at 2:02 PM Robert Bradshaw <
>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath via dev
>>>>>>>>>>> <dev@beam.apache.org> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > On Thu, Jul 28, 2022 at 4:51 PM Lina Mårtensson
>>>>>>>>>>> <lina@camus.energy> wrote:
>>>>>>>>>>> >>
>>>>>>>>>>> >> Thanks for the detailed answers!
>>>>>>>>>>> >>
>>>>>>>>>>> >> I totally get the points about development & maintenance
>>>>>>>>>>> cost, and,
>>>>>>>>>>> >> from a user perspective, about getting the performance right.
>>>>>>>>>>> >>
>>>>>>>>>>> >> I decided to try out the Spanner connector to get a sense of
>>>>>>>>>>> how well
>>>>>>>>>>> >> the x-language approach works in our world, since that's an
>>>>>>>>>>> existing
>>>>>>>>>>> >> x-language connector.
>>>>>>>>>>> >> Overall, it works and with minimal intervention as you say -
>>>>>>>>>>> it is
>>>>>>>>>>> >> very slow, though.
>>>>>>>>>>> >> I'm a little confused about "portable runners" - if I
>>>>>>>>>>> understand this
>>>>>>>>>>> >> correctly, this means we couldn't run with the DirectRunner
>>>>>>>>>>> anymore if
>>>>>>>>>>> >> using an x-language connector? (At least it didn't work when
>>>>>>>>>>> I tried
>>>>>>>>>>> >> it.)
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > You'll have to use the portable DirectRunner -
>>>>>>>>>>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability
>>>>>>>>>>> >
>>>>>>>>>>> > Job service for this can be started using following command:
>>>>>>>>>>> > python
>>>>>>>>>>> apache_beam/runners/portability/local_job_service_main.py -p <port>
>>>>>>>>>>>
>>>>>>>>>>> Note that the Python direct runner is already a portable runner,
>>>>>>>>>>> so
>>>>>>>>>>> you shouldn't have to do anything special (like start up a
>>>>>>>>>>> separate
>>>>>>>>>>> job service and pass extra options) to run locally. Just use the
>>>>>>>>>>> cross-language transforms as you would any normal Python
>>>>>>>>>>> transform.
>>>>>>>>>>>
>>>>>>>>>>> The goal is to make this as smooth and transparent as possible;
>>>>>>>>>>> please
>>>>>>>>>>> keep coming back to us if you find rough edges.
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to