Re: Reading/ writing xml file hangs indefinitely

2017-04-07 Thread Richard Hanson

> On 06 April 2017 at 19:53 Dan Halperin  wrote:
> 
> Hi Richard,
> 
> Can you share a little more info about your environment? Here's a 
> smattering of questions for which answers may be useful.
> 
> * What runner are you using? 
> 

I don't specify any runner so I believe it should be use direct runner.

> * What version of the SDK?
> 

Apache Beam version is 0.6.0 (beam-sdks-java-core and beam-runners-direct-java)

> * Does this reproduce in the DirectRunner?
> 

This problem I believe happens while running DirectRunner. 

> * Can you share a full reproduction? (e.g., in a github gist)?
> 

JDK: 1.8.0_121

Scala: 2.12.1

sbt: 0.13.13


Below is the sample xml file




33
John Smith




The sample record object.

@XmlRootElement
class Customer {

private var name: String = ""

private var age: Int = 0

private var id: Int = -1

def getName():String = name

@XmlElement
def setName(name: String) = this.name = name

def getAge(): Int = age

@XmlElement
def setAge(age: Int) = this.age = age

def getId(): Int = id

@XmlAttribute
def setId(id: Int) = this.id = id

}

Pipeline procedure of the code. 


val options = PipelineOptionsFactory.create
val p = Pipeline.create(options)

val source = XmlSource.from[Customer](
new File("customers.xml").toPath.toString
).withRootElement("customers").withRecordElement("customer").
withRecordClass(classOf[Customer])

val sink = XmlSink.write().toFilenamePrefix("xmlout").
ofRecordClass(classOf[Customer]).
withRootElement("customers")

p.apply(Read.from(source)).apply(Write.to(sink))

p.run.waitUntilFinish


> * What is happening on the machine(s) executing the job? Is there high 
> CPU? Is the disk active? Etc.
> 

There is a high cpu usage which keeps at 99.x% when Java process is executing 
(when checking with top command). 

7624 user  20   0 2837.6m 582.5m  23.5m S 99.3 11.4   2:42.11 java

Monitoring with iotop shows disk io are (mostly) often performed by system 
processes e.g. kworker. Only seeing once or twice Java process (the only user 
process that runs on the machine) is doing disk io. 

Total DISK READ : 0.00 B/s | Total DISK WRITE : 0.00 B/s
Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s


TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
7720 be/4 root 0.00 B/s 0.00 B/s 0.00 % 0.01 % [kworker/0:2]


Total DISK READ : 0.00 B/s | Total DISK WRITE : 15.62 K/s
Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s
TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND

7626 be/4 user 0.00 B/s 11.72 K/s 0.00 % 0.00 % java -Xms~h.jar test

7633 be/4 user 0.00 B/s 3.91 K/s 0.00 % 0.00 % java -Xms~h.jar test


> Thanks,
> Dan
> 
> On Tue, Apr 4, 2017 at 9:33 AM, Richard Hanson  mailto:rhan...@mailbox.org > wrote:
> 
> > > 
> > I am testing apache beam to read/ write xml files. But I encounter 
> > a problem that even the code is just to read a single xml file and write it 
> > out without doing any transformation, the process seems to hang 
> > indefinitely. The output looks like below:
> > 
> > [pool-2-thread-5-ScalaTest-running-XmlSpec] 
> > INFOhttp://org.apache.beam.sdk.io .FileBasedSource - Matched 1 files for 
> > pattern /tmp/input/my.xml
> > [pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing 
> > write operationhttp://org.apache.beam.sdk.io 
> > .XmlSink$XmlWriteOperation@1c72df2c
> > 
> > 
> > The code basically do the following:
> > 
> > val options = PipelineOptionsFactory.create
> > val p = Pipeline.create(options)
> > 
> > 
> > val xml = XmlSource.from[Record](new 
> > File("/tmp/input/my.xml").toPath.toString).withRootElement("RootE").withRecordElement("Record").withRecordClass(classOf[Record])
> > 
> > 
> > p.apply(Read.from(xml)).apply(Write.to(XmlSink.write().toFilenamePrefix("xml").ofRecordClass(classOf[Record]).withRootElement("RootE")))
> > 
> > p.run.waitUntilFinish
> > 
> > 
> > What part may be missing in my program?
> > 
> > Thanks
> > 
> > > 
> 


Reading/ writing xml file hangs indefinitely

2017-04-04 Thread Richard Hanson
I am testing apache beam to read/ write xml files. But I encounter a problem 
that even the code is just to read a single xml file and write it out without 
doing any transformation, the process seems to hang indefinitely. The output 
looks like below:

[pool-2-thread-5-ScalaTest-running-XmlSpec] INFO 
org.apache.beam.sdk.io.FileBasedSource - Matched 1 files for pattern 
/tmp/input/my.xml
[pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing write 
operation org.apache.beam.sdk.io.XmlSink$XmlWriteOperation@1c72df2c


The code basically do the following:

val options = PipelineOptionsFactory.create
val p = Pipeline.create(options)


val xml = XmlSource.from[Record](new 
File("/tmp/input/my.xml").toPath.toString).withRootElement("RootE").withRecordElement("Record").withRecordClass(classOf[Record])

p.apply(Read.from(xml)).apply(Write.to(XmlSink.write().toFilenamePrefix("xml").ofRecordClass(classOf[Record]).withRootElement("RootE")))

p.run.waitUntilFinish


What part may be missing in my program?

Thanks