Cameron's point about protecting return values is quite reasonable. I agree it would be good to make sure we're returning an returning an unmodifiable set in my getOrCreateSingletonAllowedCities case.
Luke's suggestion of Supplier.memoize sounds like a more elegant way of doing essentially the same thing we're currently implementing, so definitely worth looking into. The PipelineOptions-based solution is also intriguing. When we pass values into transforms from PipelineOptions, are all worker threads going to see the same instance? Or does PipelineOptions end up getting serialized and reconstructed by each thread? I see that your example uses JsonIgnore to prevent a computed value being serialized, but it's unclear whether the default value will be generated once per thread or once per worker. I suppose the Beam model doesn't make any statement about how runners handle multiple threads per worker, so there's no guarantee about whether multiple threads are executing in the same JVM or not. Am I correct in assuming at least in the Dataflow case that there is one JVM per worker node? On Thu, Apr 30, 2020 at 7:05 PM Luke Cwik <[email protected]> wrote: > I looked at your example and the custom logic for the singleton is > basically: > static transient T value; > public static synchronized getOrCreate(...) { > if (value == null) { > ... instantiate value ... > } > return value; > } > > Which is only a few lines. You could use one of Java's injection > frameworks like Spring or Guice or ... as they commonly provide you a > singleton pattern but the complexity overhead a lot of the time isn't worth > it. > > If all your logic can be provided by PipelineOptions you could combine a > DefaultValueFactory and a PipelineOption that returns T. First time an > unset value is accessed the PipelineOption will be created and the > synchronization is provided for you by PipelineOptions. It would look like: > > public interface MyPipelineOptions extends PipelineOptions { > @Default.InstanceFactory(TFactory.class); > @JsonIgnore // prevents the value from being saved and makes it "local" > to the process > T getT(); > void setT(T t); > > public static class TFactory implements DefaultValueFactory<T> { > public T create(PipelineOptions options) { > ... instantiate value ... > } > } > } > and then in your DoFn you access your PipelineOption as normal. This is > also convenient because you can set the value during testing of your DoFn. > > Another alternative would be to look for something like Guavas memoize[1] > functions and use them as they are very lightweight. > > 1: > https://guava.dev/releases/19.0/api/docs/com/google/common/base/Suppliers.html#memoize(com.google.common.base.Supplier) > > On Thu, Apr 30, 2020 at 12:45 PM Jeff Klukas <[email protected]> wrote: > >> Beam Java users, >> >> I've run into a few cases where I want to present a single thread-safe >> data structure to all threads on a worker, and I end up writing a good bit >> of custom code each time involving a synchronized method that handles >> creating the resource exactly once, and then each thread has its own >> reference to the singleton. I don't have extensive experience with thread >> safety in Java, so it seems likely I'm going to get this wrong. >> >> Are there any best practices for state that is shared across threads? Any >> prior art I can read up on? >> >> The most concrete case I have in mind is loading a GeoIP database for >> doing city lookups from IP addresses. We're using MaxMind's API which >> allows mapping a portion of memory to a file sitting on disk. We have a >> synchronized method that checks if the reader has been initialized [0] ; if >> not, we copy the database file from GCS to local disk, build the >> DatabaseReader instance, and return it. Other threads will see the >> already-initialized and just get a reference to it instead. >> >> This all appears to work, and it saves memory compared to each thread >> maintaining their own DatabaseReader. But is there a safer or more built-in >> way to do this? Am I missing relevant hooks in the Beam API that would make >> this cleaner? >> >> [0] >> https://github.com/mozilla/gcp-ingestion/blob/master/ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/GeoCityLookup.java#L95 >> >
