Great, thanks! That was a huge improvement.

On Thu, Jan 5, 2023 at 12:52 PM Luke Cwik <lc...@google.com> wrote:

> By default Beam Java only uploads artifacts that have changed but it looks
> like this is not the case for Beam Python and you need to explicitly opt in
> with the --enable_artifact_caching flag[1].
>
> It looks like this feature was added 1 year ago[2], should we make this on
> by default?
>
> 1:
> https://github.com/apache/beam/blob/3070160203c6734da0eb04b440e08b43f9fd33f3/sdks/python/apache_beam/options/pipeline_options.py#L794
> 2: https://github.com/apache/beam/pull/16229
>
>
>
> On Thu, Jan 5, 2023 at 11:43 AM Lina MÃ¥rtensson <lina@camus.energy> wrote:
>
>> Thanks! I have now successfully written a beautiful string of protobuf
>> bytes into a file via Python. 🎉
>>
>> Two issues though:
>> 1. Robert said the Python direct runner would just work with this - but
>> it's not working. After about half an hour of these messages repeated over
>> and over again I interrupted the job:
>>
>> E0105 07:25:48.170601677   58210 fork_posix.cc:76]           Other
>> threads are currently calling into gRPC, skipping fork() handlers
>>
>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'2023/01/05
>> 06:57:10 Failed to obtain provisioning information: failed to dial server
>> at localhost:41087\n\tcaused by:\ncontext deadline exceeded\n'
>> 2. I (unsurprisingly) get back to the issue I had when I tested out the
>> Spanner x-lang transform on Dataflow - the overhead for starting a job is
>> unbearably slow, the time mainly spent in transferring the expansion
>> service jar (115 MB) + my jar (105 MB) with my new code and its
>> dependencies:
>>
>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload
>> to
>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar...
>>
>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload
>> to
>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/beam-sdks-java-io-google-cloud-platform-expansion-service-2.39.0-uBMB6BRMpxmYFg1PPu1yUxeoyeyX_lYX1NX0LVL7ZcM.jar
>> in 321 seconds.
>>
>> INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload
>> to
>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar...
>>
>> INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload
>> to
>> gs://hce-mimo-inbox/beam_temp/beamapp-builder-0105191153-992959-3fhktuyb.1672945913.993243/java_bigtable_deploy-Ed1r7YOeLKLTmg2RGNktkym9sVYciCiielpk61r6CJ4.jar
>> in 295 seconds.
>> I have a total of 13 minutes until any workers have started on Dataflow,
>> then another 4.5 minutes once the job actually does anything (which
>> eventually is to read a whopping 3 cells from Bigtable ;).
>>
>> How could this be improved?
>> For one, it seems to me like the upload of
>> sdks:java:io:google-cloud-platform:expansion-service:shadowJar from my
>> computer shouldn't be necessary - shouldn't Dataflow have that
>> already/could it be fetched by Dataflow rather than having to upload it
>> over slow internet?
>> And what about my own jar - it's not bound to change very often, so would
>> it be possible to upload somewhere and then fetch it from there?
>>
>> Thanks!
>> -Lina
>>
>> On Tue, Jan 3, 2023 at 1:23 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> I would suggest using BigtableIO which also returns a
>>> protobuf com.google.bigtable.v2.Row. This should allow you to replicate
>>> what SpannerIO is doing.
>>>
>>> Alternatively you could provide a way to convert the HBase result into a
>>> Beam row by specifying a converter and a schema for it and then you could
>>> use the already well known Beam Schema type:
>>>
>>> https://github.com/apache/beam/blob/0b8f0b4db7a0de4977e30bcfeb50b5c14c7c1572/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1068
>>>
>>> Otherwise you'll have to register the HBase result coder with a well
>>> known name so that the runner API coder URN is something that you know and
>>> then on the Python side you would need a coder for that URN as well allow
>>> you to understand the bytes being sent across from the Java portion of the
>>> pipeline.
>>>
>>> On Fri, Dec 30, 2022 at 12:59 AM Lina MÃ¥rtensson <lina@camus.energy>
>>> wrote:
>>>
>>>> And next issue... I'm getting KeyError: 'beam:coders:javasdk:0.1' which
>>>> I learned
>>>> <https://cwiki.apache.org/confluence/display/BEAM/Multi-language+Pipelines+Tips>
>>>> is because the transform is trying to return something that there isn't a 
>>>> standard
>>>> Beam coder for
>>>> <https://github.com/apache/beam/blob/05428866cdbf1ea8e4c1789dd40327673fd39451/model/pipeline/src/main/proto/beam_runner_api.proto#L784>
>>>> .
>>>> Makes sense, but... how do I fix this? The documentation talks
>>>> about how to do this for the input, but not for the output.
>>>>
>>>> Comparing to Spanner, it looks like Spanner returns a protobuf, which
>>>> I'm guessing somehow gets converted to bytes... But CloudBigtableIO
>>>> <https://github.com/googleapis/java-bigtable-hbase/blob/main/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java>
>>>> returns org.apache.hadoop.hbase.client.Result.
>>>>
>>>> My buildExternal method looks like follows:
>>>>
>>>>         @Override
>>>>
>>>>         public PTransform<PBegin, PCollection<Result>> buildExternal(
>>>>
>>>>                 BigtableReadBuilder.Configuration configuration) {
>>>>
>>>>
>>>>             return Read.from(CloudBigtableIO.read(
>>>>
>>>>                 new CloudBigtableScanConfiguration.Builder()
>>>>
>>>>
>>>>                     .withProjectId(configuration.projectId)
>>>>
>>>>
>>>>                     .withInstanceId(configuration.instanceId)
>>>>
>>>>
>>>>                     .withTableId(configuration.tableId)
>>>>
>>>>                     .build()
>>>>
>>>>             ));
>>>>
>>>>
>>>> I also got a warning, which I *believe* is unrelated (but also an
>>>> issue):
>>>>
>>>> INFO:apache_beam.utils.subprocess_server:b"WARNING: Configuration class
>>>> 'energy.camus.beam.BigtableRegistrar$BigtableReadBuilder$Configuration' has
>>>> no schema registered. Attempting to construct with setter approach."
>>>>
>>>> INFO:apache_beam.utils.subprocess_server:b'Dec 30, 2022 7:46:14 AM
>>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader
>>>> payloadToConfig'
>>>> What is this schema and what should it look like?
>>>>
>>>> Thanks!
>>>> -Lina
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Dec 30, 2022 at 12:28 AM Lina MÃ¥rtensson <lina@camus.energy>
>>>> wrote:
>>>>
>>>>> Thanks! This was really helpful. It took a while to figure out the
>>>>> details - a section in the docs on what's required of these jars for
>>>>> non-Java users would be a great addition.
>>>>>
>>>>> But once I did, the Bazel config was actually quite straightforward
>>>>> and makes sense.
>>>>> I pasted the first section from here
>>>>> <https://github.com/bazelbuild/rules_jvm_external/blob/master/README.md#usage>
>>>>>  into
>>>>> my WORKSPACE file and changed the artifacts to the ones I needed. (How to
>>>>> find the right ones remains confusing.)
>>>>>
>>>>> After that I updated my BUILD rules and Blaze had easy and
>>>>> straightforward configs for it, all I needed was this:
>>>>>
>>>>> # From
>>>>> https://github.com/google/bazel-common/blob/master/third_party/java/auto/BUILD
>>>>> .
>>>>>
>>>>> # The auto service is what registers our Registrar class, and it needs
>>>>> to be a plugin which
>>>>>
>>>>> # makes it run at compile-time.
>>>>>
>>>>> java_plugin(
>>>>>
>>>>>     name = "auto_service_processor",
>>>>>
>>>>>     processor_class =
>>>>> "com.google.auto.service.processor.AutoServiceProcessor",
>>>>>
>>>>>     deps = [
>>>>>
>>>>>         "@maven//:com_google_auto_service_auto_service",
>>>>>
>>>>>         "@maven//:com_google_auto_service_auto_service_annotations",
>>>>>
>>>>>         "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre",
>>>>>
>>>>>     ],
>>>>>
>>>>> )
>>>>>
>>>>>
>>>>> java_binary(
>>>>>
>>>>>     name = "java_hbase",
>>>>>
>>>>>     main_class = "energy.camus.beam.BigtableRegistrar",
>>>>>
>>>>>     plugins = [":auto_service_processor"],
>>>>>
>>>>>     srcs = ["src/main/java/energy/camus/beam/BigtableRegistrar.java"],
>>>>>
>>>>>     deps = [
>>>>>
>>>>>         "@maven//:com_google_auto_service_auto_service",
>>>>>
>>>>>         "@maven//:com_google_auto_service_auto_service_annotations",
>>>>>
>>>>>
>>>>>         "@maven//:com_google_cloud_bigtable_bigtable_hbase_beam",
>>>>>
>>>>>
>>>>>         "@maven//:org_apache_beam_beam_sdks_java_core",
>>>>>
>>>>>         "@maven//:org_apache_beam_beam_vendor_guava_26_0_jre",
>>>>>
>>>>>         "@maven//:org_apache_hbase_hbase_shaded_client",
>>>>>
>>>>>     ],
>>>>>
>>>>> )
>>>>>
>>>>>
>>>>> On Thu, Dec 29, 2022 at 2:43 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> AutoService relies on Java's compiler annotation processor.
>>>>>> https://github.com/google/auto/tree/main/service#getting-started
>>>>>> shows that you need to configure Java's compiler to use the annotation
>>>>>> processors within AutoService.
>>>>>>
>>>>>> I saw this public gist that seemed to enable using the AutoService
>>>>>> annotation processor with Bazel
>>>>>> https://gist.github.com/jart/5333824b94cd706499a7bfa1e086ee00
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 29, 2022 at 2:27 PM Lina MÃ¥rtensson via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> That's good news about the direct runner, thanks!
>>>>>>>
>>>>>>> On Thu, Dec 29, 2022 at 2:02 PM Robert Bradshaw <rober...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Thu, Jul 28, 2022 at 5:37 PM Chamikara Jayalath via dev
>>>>>>>> <dev@beam.apache.org> wrote:
>>>>>>>> >
>>>>>>>> > On Thu, Jul 28, 2022 at 4:51 PM Lina MÃ¥rtensson <lina@camus.energy>
>>>>>>>> wrote:
>>>>>>>> >>
>>>>>>>> >> Thanks for the detailed answers!
>>>>>>>> >>
>>>>>>>> >> I totally get the points about development & maintenance cost,
>>>>>>>> and,
>>>>>>>> >> from a user perspective, about getting the performance right.
>>>>>>>> >>
>>>>>>>> >> I decided to try out the Spanner connector to get a sense of how
>>>>>>>> well
>>>>>>>> >> the x-language approach works in our world, since that's an
>>>>>>>> existing
>>>>>>>> >> x-language connector.
>>>>>>>> >> Overall, it works and with minimal intervention as you say - it
>>>>>>>> is
>>>>>>>> >> very slow, though.
>>>>>>>> >> I'm a little confused about "portable runners" - if I understand
>>>>>>>> this
>>>>>>>> >> correctly, this means we couldn't run with the DirectRunner
>>>>>>>> anymore if
>>>>>>>> >> using an x-language connector? (At least it didn't work when I
>>>>>>>> tried
>>>>>>>> >> it.)
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > You'll have to use the portable DirectRunner -
>>>>>>>> https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/portability
>>>>>>>> >
>>>>>>>> > Job service for this can be started using following command:
>>>>>>>> > python apache_beam/runners/portability/local_job_service_main.py
>>>>>>>> -p <port>
>>>>>>>>
>>>>>>>> Note that the Python direct runner is already a portable runner, so
>>>>>>>> you shouldn't have to do anything special (like start up a separate
>>>>>>>> job service and pass extra options) to run locally. Just use the
>>>>>>>> cross-language transforms as you would any normal Python transform.
>>>>>>>>
>>>>>>>> The goal is to make this as smooth and transparent as possible;
>>>>>>>> please
>>>>>>>> keep coming back to us if you find rough edges.
>>>>>>>>
>>>>>>>

Reply via email to