> On 06 April 2017 at 19:53 Dan Halperin wrote:
>
> Hi Richard,
>
> Can you share a little more info about your environment? Here's a
> smattering of questions for which answers may be useful.
>
> * What runner are you using?
>
I don't specify any runner so I believe it should be use direct runner.
> * What version of the SDK?
>
Apache Beam version is 0.6.0 (beam-sdks-java-core and beam-runners-direct-java)
> * Does this reproduce in the DirectRunner?
>
This problem I believe happens while running DirectRunner.
> * Can you share a full reproduction? (e.g., in a github gist)?
>
JDK: 1.8.0_121
Scala: 2.12.1
sbt: 0.13.13
Below is the sample xml file
33
John Smith
The sample record object.
@XmlRootElement
class Customer {
private var name: String = ""
private var age: Int = 0
private var id: Int = -1
def getName():String = name
@XmlElement
def setName(name: String) = this.name = name
def getAge(): Int = age
@XmlElement
def setAge(age: Int) = this.age = age
def getId(): Int = id
@XmlAttribute
def setId(id: Int) = this.id = id
}
Pipeline procedure of the code.
val options = PipelineOptionsFactory.create
val p = Pipeline.create(options)
val source = XmlSource.from[Customer](
new File("customers.xml").toPath.toString
).withRootElement("customers").withRecordElement("customer").
withRecordClass(classOf[Customer])
val sink = XmlSink.write().toFilenamePrefix("xmlout").
ofRecordClass(classOf[Customer]).
withRootElement("customers")
p.apply(Read.from(source)).apply(Write.to(sink))
p.run.waitUntilFinish
> * What is happening on the machine(s) executing the job? Is there high
> CPU? Is the disk active? Etc.
>
There is a high cpu usage which keeps at 99.x% when Java process is executing
(when checking with top command).
7624 user 20 0 2837.6m 582.5m 23.5m S 99.3 11.4 2:42.11 java
Monitoring with iotop shows disk io are (mostly) often performed by system
processes e.g. kworker. Only seeing once or twice Java process (the only user
process that runs on the machine) is doing disk io.
Total DISK READ : 0.00 B/s | Total DISK WRITE : 0.00 B/s
Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s
TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
7720 be/4 root 0.00 B/s 0.00 B/s 0.00 % 0.01 % [kworker/0:2]
Total DISK READ : 0.00 B/s | Total DISK WRITE : 15.62 K/s
Actual DISK READ: 0.00 B/s | Actual DISK WRITE: 0.00 B/s
TID PRIO USER DISK READ DISK WRITE SWAPIN IO> COMMAND
7626 be/4 user 0.00 B/s 11.72 K/s 0.00 % 0.00 % java -Xms~h.jar test
7633 be/4 user 0.00 B/s 3.91 K/s 0.00 % 0.00 % java -Xms~h.jar test
> Thanks,
> Dan
>
> On Tue, Apr 4, 2017 at 9:33 AM, Richard Hanson mailto:rhan...@mailbox.org > wrote:
>
> > >
> > I am testing apache beam to read/ write xml files. But I encounter
> > a problem that even the code is just to read a single xml file and write it
> > out without doing any transformation, the process seems to hang
> > indefinitely. The output looks like below:
> >
> > [pool-2-thread-5-ScalaTest-running-XmlSpec]
> > INFOhttp://org.apache.beam.sdk.io .FileBasedSource - Matched 1 files for
> > pattern /tmp/input/my.xml
> > [pool-6-thread-1] INFO org.apache.beam.sdk.io.Write - Initializing
> > write operationhttp://org.apache.beam.sdk.io
> > .XmlSink$XmlWriteOperation@1c72df2c
> >
> >
> > The code basically do the following:
> >
> > val options = PipelineOptionsFactory.create
> > val p = Pipeline.create(options)
> >
> >
> > val xml = XmlSource.from[Record](new
> > File("/tmp/input/my.xml").toPath.toString).withRootElement("RootE").withRecordElement("Record").withRecordClass(classOf[Record])
> >
> >
> > p.apply(Read.from(xml)).apply(Write.to(XmlSink.write().toFilenamePrefix("xml").ofRecordClass(classOf[Record]).withRootElement("RootE")))
> >
> > p.run.waitUntilFinish
> >
> >
> > What part may be missing in my program?
> >
> > Thanks
> >
> > >
>