Repository: incubator-beam Updated Branches: refs/heads/master 3bcc5058a -> ec0e9fd7b
[BEAM-255] Write: add limited logging This will help, for all sinks, users and developers gain insight into where time is spent. (Enabling DEBUG level will provide more insight.) Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad406696 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad406696 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad406696 Branch: refs/heads/master Commit: ad406696793132b82450cad9fabf7fcea379158d Parents: 3bcc505 Author: Dan Halperin <dhalp...@google.com> Authored: Tue May 3 13:07:04 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Tue May 3 13:42:44 2016 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/beam/sdk/io/Write.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad406696/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java index b6743fa..9cb026a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java @@ -36,6 +36,9 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.UUID; /** @@ -52,6 +55,8 @@ import java.util.UUID; */ @Experimental(Experimental.Kind.SOURCE_SINK) public class Write { + private static final Logger LOG = LoggerFactory.getLogger(Write.class); + /** * Creates a Write transform that writes to the given Sink. */ @@ -144,7 +149,9 @@ public class Write { @Override public void processElement(ProcessContext c) throws Exception { WriteOperation<T, WriteT> writeOperation = c.element(); + LOG.info("Initializing write operation {}", writeOperation); writeOperation.initialize(c.getPipelineOptions()); + LOG.debug("Done initializing write operation {}", writeOperation); // The WriteOperation is also the output of this ParDo, so it can have mutable // state. c.output(writeOperation); @@ -172,8 +179,10 @@ public class Write { // Lazily initialize the Writer if (writer == null) { WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView); + LOG.info("Opening writer for write operation {}", writeOperation); writer = writeOperation.createWriter(c.getPipelineOptions()); writer.open(UUID.randomUUID().toString()); + LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView); } try { writer.write(c.element()); @@ -211,9 +220,12 @@ public class Write { .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() { @Override public void processElement(ProcessContext c) throws Exception { - Iterable<WriteT> results = c.sideInput(resultsView); WriteOperation<T, WriteT> writeOperation = c.element(); + LOG.info("Finalizing write operation {}", writeOperation); + Iterable<WriteT> results = c.sideInput(resultsView); + LOG.debug("Side input initialized to finalize write operation {}", writeOperation); writeOperation.finalize(results, c.getPipelineOptions()); + LOG.debug("Done finalizing write operation {}", writeOperation); } }).withSideInputs(resultsView)); return PDone.in(input.getPipeline());