Re: Reading/ writing xml file hangs indefinitely
Hi Richard, Can you share a little more info about your environment? Here's a smattering of questions for which answers may be useful. * What runner are you using? * What version of the SDK? * Does this reproduce in the DirectRunner? * Can you share a full reproduction? (e.g., in a github gist)? * What is happening on the machine(s) executing the job? Is there high CPU? Is the disk active? Etc. Thanks, Dan On Tue, Apr 4, 2017 at 9:33 AM, Richard Hanson wrote: > I am testing apache beam to read/ write xml files. But I encounter a > problem that even the code is just to read a single xml file and write it > out without doing any transformation, the process seems to hang > indefinitely. The output looks like below: > > [pool-2-thread-5-ScalaTest-running-XmlSpec] INFO > org.apache.beam.sdk.io.FileBasedSource > - Matched 1 files for pattern /tmp/input/my.xml > [pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing write > operation org.apache.beam.sdk.io.XmlSink$XmlWriteOperation@1c72df2c > > > The code basically do the following: > > val options = PipelineOptionsFactory.create > val p = Pipeline.create(options) > > > val xml = XmlSource.from[Record](new File("/tmp/input/my.xml"). > toPath.toString).withRootElement("RootE").withRecordElement("Record"). > withRecordClass(classOf[Record]) > > p.apply(Read.from(xml)).apply(Write.to(XmlSink.write(). > toFilenamePrefix("xml").ofRecordClass(classOf[Record]) > .withRootElement("RootE"))) > > p.run.waitUntilFinish > > > What part may be missing in my program? > > Thanks >
Re: Having a local cache (per JVM) to use in DoFns
Thanks Lukasz, that's very helpful! On Thu, Apr 6, 2017 at 1:34 PM, Lukasz Cwik wrote: > You should follow any valid singleton pattern and preferably initialize on > class load or within a method annotated with @Setup [1] > > @Setup/@Teardown is called each time an instance of a DoFn is > created/discarded respectively. @Setup/@Teardown generally will be called > fewer times then startBundle/finishBundle but more than one instance of a > DoFn may be created within a single JVM still which is why you still are > required to follow any valid singleton pattern. > > For example: > class MyDoFn { > private static volatile CachedService cachedService; > > @Setup > public void setup() { > // Initialize and store as static member if not already initialized > if (cachedService == null) { > synchronized (MyDoFn.class) { > if (cachedService == null) { > cachedService = ... > } > } > } > } > > [1]: https://github.com/apache/beam/blob/master/sdks/ > java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L496 > > On Thu, Apr 6, 2017 at 4:46 AM, Josh wrote: > >> Hello! >> >> I'm just getting started with Beam (in java), and have a question about >> the best way to initialise and keep a local cache. >> >> In my case my DoFn needs to occasionally look up some info in an external >> service. I have a Service class which interacts with the external service >> and I have a CachedService which wraps an instance of Service and caches >> the responses. >> >> I want this CachedService to be initialised once per JVM. What's the best >> way to do this in Beam? Should the cache just be a static field in the >> DoFn? Or should I be using the DoFn.StartBundle method and initialising the >> cache in there? What if I want my cache to be used in two separate DoFns >> (which sometimes run in the same JVM) - how can I ensure one cache per JVM >> rather than one cache per DoFn? >> >> Thanks for any advice, >> >> Josh >> > >
Re: Having a local cache (per JVM) to use in DoFns
You should follow any valid singleton pattern and preferably initialize on class load or within a method annotated with @Setup [1] @Setup/@Teardown is called each time an instance of a DoFn is created/discarded respectively. @Setup/@Teardown generally will be called fewer times then startBundle/finishBundle but more than one instance of a DoFn may be created within a single JVM still which is why you still are required to follow any valid singleton pattern. For example: class MyDoFn { private static volatile CachedService cachedService; @Setup public void setup() { // Initialize and store as static member if not already initialized if (cachedService == null) { synchronized (MyDoFn.class) { if (cachedService == null) { cachedService = ... } } } } [1]: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L496 On Thu, Apr 6, 2017 at 4:46 AM, Josh wrote: > Hello! > > I'm just getting started with Beam (in java), and have a question about > the best way to initialise and keep a local cache. > > In my case my DoFn needs to occasionally look up some info in an external > service. I have a Service class which interacts with the external service > and I have a CachedService which wraps an instance of Service and caches > the responses. > > I want this CachedService to be initialised once per JVM. What's the best > way to do this in Beam? Should the cache just be a static field in the > DoFn? Or should I be using the DoFn.StartBundle method and initialising the > cache in there? What if I want my cache to be used in two separate DoFns > (which sometimes run in the same JVM) - how can I ensure one cache per JVM > rather than one cache per DoFn? > > Thanks for any advice, > > Josh >
Having a local cache (per JVM) to use in DoFns
Hello! I'm just getting started with Beam (in java), and have a question about the best way to initialise and keep a local cache. In my case my DoFn needs to occasionally look up some info in an external service. I have a Service class which interacts with the external service and I have a CachedService which wraps an instance of Service and caches the responses. I want this CachedService to be initialised once per JVM. What's the best way to do this in Beam? Should the cache just be a static field in the DoFn? Or should I be using the DoFn.StartBundle method and initialising the cache in there? What if I want my cache to be used in two separate DoFns (which sometimes run in the same JVM) - how can I ensure one cache per JVM rather than one cache per DoFn? Thanks for any advice, Josh