I looked at your example and the custom logic for the singleton is
basically:
static transient T value;
public static synchronized getOrCreate(...) {
  if (value == null) {
     ... instantiate value ...
  }
  return value;
}

Which is only a few lines. You could use one of Java's injection frameworks
like Spring or Guice or ... as they commonly provide you a singleton
pattern but the complexity overhead a lot of the time isn't worth it.

If all your logic can be provided by PipelineOptions you could combine a
DefaultValueFactory and a PipelineOption that returns T. First time an
unset value is accessed the PipelineOption will be created and the
synchronization is provided for you by PipelineOptions. It would look like:

public interface MyPipelineOptions extends PipelineOptions {
  @Default.InstanceFactory(TFactory.class);
  @JsonIgnore  // prevents the value from being saved and makes it "local"
to the process
  T getT();
  void setT(T t);

  public static class TFactory implements DefaultValueFactory<T> {
    public T create(PipelineOptions options) {
      ... instantiate value ...
    }
  }
}
and then in your DoFn you access your PipelineOption as normal. This is
also convenient because you can set the value during testing of your DoFn.

Another alternative would be to look for something like Guavas memoize[1]
functions and use them as they are very lightweight.

1:
https://guava.dev/releases/19.0/api/docs/com/google/common/base/Suppliers.html#memoize(com.google.common.base.Supplier)

On Thu, Apr 30, 2020 at 12:45 PM Jeff Klukas <[email protected]> wrote:

> Beam Java users,
>
> I've run into a few cases where I want to present a single thread-safe
> data structure to all threads on a worker, and I end up writing a good bit
> of custom code each time involving a synchronized method that handles
> creating the resource exactly once, and then each thread has its own
> reference to the singleton. I don't have extensive experience with thread
> safety in Java, so it seems likely I'm going to get this wrong.
>
> Are there any best practices for state that is shared across threads? Any
> prior art I can read up on?
>
> The most concrete case I have in mind is loading a GeoIP database for
> doing city lookups from IP addresses. We're using MaxMind's API which
> allows mapping a portion of memory to a file sitting on disk. We have a
> synchronized method that checks if the reader has been initialized [0] ; if
> not, we copy the database file from GCS to local disk, build the
> DatabaseReader instance, and return it. Other threads will see the
> already-initialized and just get a reference to it instead.
>
> This all appears to work, and it saves memory compared to each thread
> maintaining their own DatabaseReader. But is there a safer or more built-in
> way to do this? Am I missing relevant hooks in the Beam API that would make
> this cleaner?
>
> [0]
> https://github.com/mozilla/gcp-ingestion/blob/master/ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/GeoCityLookup.java#L95
>

Reply via email to