Fix query10 log messages issue #5 and issue #51
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee500b28 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee500b28 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee500b28 Branch: refs/heads/master Commit: ee500b28086f1261101395dc0b7b23f197ba19d9 Parents: 3d5c3d0 Author: Etienne Chauchot <echauc...@gmail.com> Authored: Tue May 30 18:00:00 2017 +0100 Committer: Ismaël MejÃa <ieme...@gmail.com> Committed: Wed Aug 23 19:07:28 2017 +0200 ---------------------------------------------------------------------- integration/java/nexmark/pom.xml | 6 +++ .../integration/nexmark/queries/Query10.java | 39 ++++++++------------ 2 files changed, 22 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ee500b28/integration/java/nexmark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/pom.xml b/integration/java/nexmark/pom.xml index 86b88bd..664a410 100644 --- a/integration/java/nexmark/pom.xml +++ b/integration/java/nexmark/pom.xml @@ -255,6 +255,12 @@ </dependency> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-jdk14</artifactId> + <scope>runtime</scope> + </dependency> + + <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/ee500b28/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java index c868666..378d01e 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query10.java @@ -17,9 +17,6 @@ */ package org.apache.beam.integration.nexmark.queries; -import static com.google.common.base.Preconditions.checkState; - -import com.google.cloud.hadoop.gcsio.GoogleCloudStorageWriteChannel; import java.io.IOException; import java.io.OutputStream; import java.io.Serializable; @@ -33,9 +30,9 @@ import org.apache.beam.integration.nexmark.model.Done; import org.apache.beam.integration.nexmark.model.Event; import org.apache.beam.integration.nexmark.model.KnownSize; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; @@ -57,7 +54,6 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Query "10", 'Log to sharded files' (Not in original suite.) * @@ -132,12 +128,9 @@ public class Query10 extends NexmarkQuery { */ private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename) throws IOException { - //TODO Decide what to do about this one -// WritableByteChannel channel = -// GcsIOChannelFactory.fromOptions(options).create(filename, "text/plain"); -// checkState(channel instanceof GoogleCloudStorageWriteChannel); -// ((GoogleCloudStorageWriteChannel) channel).setUploadBufferSize(CHANNEL_BUFFER); -// return channel; + //TODO + // Fix after PR: right now this is a specific Google added use case + // Discuss it on ML: shall we keep GCS or use HDFS or use a generic beam filesystem way. throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory"); } @@ -192,7 +185,7 @@ public class Query10 extends NexmarkQuery { public void processElement(ProcessContext c) { if (c.element().hasAnnotation("LATE")) { lateCounter.inc(); - LOG.error("Observed late: %s", c.element()); + LOG.info("Observed late: %s", c.element()); } else { onTimeCounter.inc(); } @@ -240,11 +233,11 @@ public class Query10 extends NexmarkQuery { } } String shard = c.element().getKey(); - LOG.error( + LOG.info(String.format( "%s with timestamp %s has %d actually late and %d on-time " + "elements in pane %s for window %s", shard, c.timestamp(), numLate, numOnTime, c.pane(), - window.maxTimestamp()); + window.maxTimestamp())); if (c.pane().getTiming() == PaneInfo.Timing.LATE) { if (numLate == 0) { LOG.error( @@ -283,11 +276,11 @@ public class Query10 extends NexmarkQuery { String shard = c.element().getKey(); GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); OutputFile outputFile = outputFileFor(window, shard, c.pane()); - LOG.error( + LOG.info(String.format( "Writing %s with record timestamp %s, window timestamp %s, pane %s", - shard, c.timestamp(), window.maxTimestamp(), c.pane()); + shard, c.timestamp(), window.maxTimestamp(), c.pane())); if (outputFile.filename != null) { - LOG.error("Beginning write to '%s'", outputFile.filename); + LOG.info("Beginning write to '%s'", outputFile.filename); int n = 0; try (OutputStream output = Channels.newOutputStream(openWritableGcsFile(options, outputFile @@ -296,12 +289,12 @@ public class Query10 extends NexmarkQuery { Event.CODER.encode(event, output, Coder.Context.OUTER); writtenRecordsCounter.inc(); if (++n % 10000 == 0) { - LOG.error("So far written %d records to '%s'", n, + LOG.info("So far written %d records to '%s'", n, outputFile.filename); } } } - LOG.error("Written all %d records to '%s'", n, outputFile.filename); + LOG.info("Written all %d records to '%s'", n, outputFile.filename); } savedFileCounter.inc(); c.output(KV.<Void, OutputFile>of(null, outputFile)); @@ -341,23 +334,23 @@ public class Query10 extends NexmarkQuery { LOG.error("ERROR! Unexpected ON_TIME pane index: %s", c.pane()); } else { GcsOptions options = c.getPipelineOptions().as(GcsOptions.class); - LOG.error( + LOG.info( "Index with record timestamp %s, window timestamp %s, pane %s", c.timestamp(), window.maxTimestamp(), c.pane()); @Nullable String filename = indexPathFor(window); if (filename != null) { - LOG.error("Beginning write to '%s'", filename); + LOG.info("Beginning write to '%s'", filename); int n = 0; try (OutputStream output = Channels.newOutputStream( openWritableGcsFile(options, filename))) { for (OutputFile outputFile : c.element().getValue()) { - output.write(outputFile.toString().getBytes()); + output.write(outputFile.toString().getBytes("UTF-8")); n++; } } - LOG.error("Written all %d lines to '%s'", n, filename); + LOG.info("Written all %d lines to '%s'", n, filename); } c.output( new Done("written for timestamp " + window.maxTimestamp()));