Re: Reading/ writing xml file hangs indefinitely

2017-04-06 Thread Dan Halperin
Hi Richard,

Can you share a little more info about your environment? Here's a
smattering of questions for which answers may be useful.

* What runner are you using?
* What version of the SDK?
* Does this reproduce in the DirectRunner?
* Can you share a full reproduction? (e.g., in a github gist)?
* What is happening on the machine(s) executing the job? Is there high CPU?
Is the disk active? Etc.

Thanks,
Dan

On Tue, Apr 4, 2017 at 9:33 AM, Richard Hanson  wrote:

> I am testing apache beam to read/ write xml files. But I encounter a
> problem that even the code is just to read a single xml file and write it
> out without doing any transformation, the process seems to hang
> indefinitely. The output looks like below:
>
> [pool-2-thread-5-ScalaTest-running-XmlSpec] INFO 
> org.apache.beam.sdk.io.FileBasedSource
> - Matched 1 files for pattern /tmp/input/my.xml
> [pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing write
> operation org.apache.beam.sdk.io.XmlSink$XmlWriteOperation@1c72df2c
>
>
> The code basically do the following:
>
> val options = PipelineOptionsFactory.create
> val p = Pipeline.create(options)
>
>
> val xml = XmlSource.from[Record](new File("/tmp/input/my.xml").
> toPath.toString).withRootElement("RootE").withRecordElement("Record").
> withRecordClass(classOf[Record])
>
> p.apply(Read.from(xml)).apply(Write.to(XmlSink.write().
> toFilenamePrefix("xml").ofRecordClass(classOf[Record])
> .withRootElement("RootE")))
>
> p.run.waitUntilFinish
>
>
> What part may be missing in my program?
>
> Thanks
>


Re: Having a local cache (per JVM) to use in DoFns

2017-04-06 Thread Josh
Thanks Lukasz, that's very helpful!

On Thu, Apr 6, 2017 at 1:34 PM, Lukasz Cwik  wrote:

> You should follow any valid singleton pattern and preferably initialize on
> class load or within a method annotated with @Setup [1]
>
> @Setup/@Teardown is called each time an instance of a DoFn is
> created/discarded respectively. @Setup/@Teardown generally will be called
> fewer times then startBundle/finishBundle but more than one instance of a
> DoFn may be created within a single JVM still which is why you still are
> required to follow any valid singleton pattern.
>
> For example:
> class MyDoFn {
>   private static volatile CachedService cachedService;
>
>   @Setup
>   public void setup() {
> // Initialize and store as static member if not already initialized
> if (cachedService == null) {
>   synchronized (MyDoFn.class) {
> if (cachedService == null) {
>   cachedService = ...
> }
>   }
>   }
> }
>
> [1]: https://github.com/apache/beam/blob/master/sdks/
> java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L496
>
> On Thu, Apr 6, 2017 at 4:46 AM, Josh  wrote:
>
>> Hello!
>>
>> I'm just getting started with Beam (in java), and have a question about
>> the best way to initialise and keep a local cache.
>>
>> In my case my DoFn needs to occasionally look up some info in an external
>> service. I have a Service class which interacts with the external service
>> and I have a CachedService which wraps an instance of Service and caches
>> the responses.
>>
>> I want this CachedService to be initialised once per JVM. What's the best
>> way to do this in Beam? Should the cache just be a static field in the
>> DoFn? Or should I be using the DoFn.StartBundle method and initialising the
>> cache in there? What if I want my cache to be used in two separate DoFns
>> (which sometimes run in the same JVM) - how can I ensure one cache per JVM
>> rather than one cache per DoFn?
>>
>> Thanks for any advice,
>>
>> Josh
>>
>
>


Re: Having a local cache (per JVM) to use in DoFns

2017-04-06 Thread Lukasz Cwik
You should follow any valid singleton pattern and preferably initialize on
class load or within a method annotated with @Setup [1]

@Setup/@Teardown is called each time an instance of a DoFn is
created/discarded respectively. @Setup/@Teardown generally will be called
fewer times then startBundle/finishBundle but more than one instance of a
DoFn may be created within a single JVM still which is why you still are
required to follow any valid singleton pattern.

For example:
class MyDoFn {
  private static volatile CachedService cachedService;

  @Setup
  public void setup() {
// Initialize and store as static member if not already initialized
if (cachedService == null) {
  synchronized (MyDoFn.class) {
if (cachedService == null) {
  cachedService = ...
}
  }
  }
}

[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L496

On Thu, Apr 6, 2017 at 4:46 AM, Josh  wrote:

> Hello!
>
> I'm just getting started with Beam (in java), and have a question about
> the best way to initialise and keep a local cache.
>
> In my case my DoFn needs to occasionally look up some info in an external
> service. I have a Service class which interacts with the external service
> and I have a CachedService which wraps an instance of Service and caches
> the responses.
>
> I want this CachedService to be initialised once per JVM. What's the best
> way to do this in Beam? Should the cache just be a static field in the
> DoFn? Or should I be using the DoFn.StartBundle method and initialising the
> cache in there? What if I want my cache to be used in two separate DoFns
> (which sometimes run in the same JVM) - how can I ensure one cache per JVM
> rather than one cache per DoFn?
>
> Thanks for any advice,
>
> Josh
>


Having a local cache (per JVM) to use in DoFns

2017-04-06 Thread Josh
Hello!

I'm just getting started with Beam (in java), and have a question about the
best way to initialise and keep a local cache.

In my case my DoFn needs to occasionally look up some info in an external
service. I have a Service class which interacts with the external service
and I have a CachedService which wraps an instance of Service and caches
the responses.

I want this CachedService to be initialised once per JVM. What's the best
way to do this in Beam? Should the cache just be a static field in the
DoFn? Or should I be using the DoFn.StartBundle method and initialising the
cache in there? What if I want my cache to be used in two separate DoFns
(which sometimes run in the same JVM) - how can I ensure one cache per JVM
rather than one cache per DoFn?

Thanks for any advice,

Josh