Repository: flink Updated Branches: refs/heads/master a0147c493 -> df7c61e2e
[FLINK-1486] add additional print method for prefixing a user-defined string - extend API to include a print(String sinkIdentifier) method - change PrintingOutputformat to include the sink identifier - if appropriate, print sink identifier and task id - update documentation This closes #372 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df7c61e2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df7c61e2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df7c61e2 Branch: refs/heads/master Commit: df7c61e2e5e4a901470abc7b7ac72e2167045a95 Parents: a0147c4 Author: Maximilian Michels <m...@apache.org> Authored: Mon Apr 20 18:29:43 2015 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Apr 22 13:38:32 2015 +0200 ---------------------------------------------------------------------- docs/programming_guide.md | 6 ++- .../java/org/apache/flink/api/java/DataSet.java | 22 ++++++++ .../flink/api/java/io/PrintingOutputFormat.java | 53 ++++++++++++++------ .../org/apache/flink/api/scala/DataSet.scala | 21 +++++++- 4 files changed, 84 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/df7c61e2/docs/programming_guide.md ---------------------------------------------------------------------- diff --git a/docs/programming_guide.md b/docs/programming_guide.md index 8638af2..d9b1753 100644 --- a/docs/programming_guide.md +++ b/docs/programming_guide.md @@ -1823,8 +1823,10 @@ DataSet: are obtained by calling a user-defined *format()* method for each element. - `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the *toString()* method of the objects. -- `print()` / `printToErr()` - Prints the *toString()* value of each element on the - standard out / strandard error stream. +- `print()` / `printToErr()` / `print(String msg)` / `printToErr(String msg)` - Prints the *toString()* value +of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is +prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is +greater than 1, the output will also be prepended with the identifier of the task which produced the output. - `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports custom object-to-bytes conversion. - `output()`/ `OutputFormat` - Most generic output method, for data sinks that are not file based http://git-wip-us.apache.org/repos/asf/flink/blob/df7c61e2/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index a6a0af8..5d1ca4c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -1349,6 +1349,17 @@ public abstract class DataSet<T> { public DataSink<T> print() { return output(new PrintingOutputFormat<T>(false)); } + + /** + * Writes a DataSet to the standard output stream (stdout).<br/> + * For each element of the DataSet the result of {@link Object#toString()} is written. + * + * @param sinkIdentifier The string to prefix the output with. + * @return The DataSink that writes the DataSet. + */ + public DataSink<T> print(String sinkIdentifier) { + return output(new PrintingOutputFormat<T>(sinkIdentifier, false)); + } /** * Writes a DataSet to the standard error stream (stderr).<br/> @@ -1359,6 +1370,17 @@ public abstract class DataSet<T> { public DataSink<T> printToErr() { return output(new PrintingOutputFormat<T>(true)); } + + /** + * Writes a DataSet to the standard error stream (stderr).<br/> + * For each element of the DataSet the result of {@link Object#toString()} is written. + * + * @param sinkIdentifier The string to prefix the output with. + * @return The DataSink that writes the DataSet. + */ + public DataSink<T> printToErr(String sinkIdentifier) { + return output(new PrintingOutputFormat<T>(sinkIdentifier, true)); + } /** * Writes a DataSet using a {@link FileOutputFormat} to a specified location. http://git-wip-us.apache.org/repos/asf/flink/blob/df7c61e2/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java index 8e028eb..d24a369 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java @@ -30,10 +30,11 @@ public class PrintingOutputFormat<T> implements OutputFormat<T> { private static final boolean STD_OUT = false; private static final boolean STD_ERR = true; - - - private boolean target; - + + private String sinkIdentifier; + + private boolean target; + private transient PrintStream stream; private transient String prefix; @@ -53,7 +54,16 @@ public class PrintingOutputFormat<T> implements OutputFormat<T> { public PrintingOutputFormat(boolean stdErr) { this.target = stdErr; } - + + /** + * Instantiates a printing output format that prints to standard out with a prefixed message. + * @param sinkIdentifier Message that is prefixed to the output of the value. + * @param stdErr True, if the format should print to standard error instead of standard out. + */ + public PrintingOutputFormat(String sinkIdentifier, boolean stdErr) { + this(stdErr); + this.sinkIdentifier = sinkIdentifier; + } public void setTargetToStandardOut() { this.target = STD_OUT; @@ -61,7 +71,7 @@ public class PrintingOutputFormat<T> implements OutputFormat<T> { public void setTargetToStandardErr() { this.target = STD_ERR; - } + } @Override @@ -72,25 +82,38 @@ public class PrintingOutputFormat<T> implements OutputFormat<T> { public void open(int taskNumber, int numTasks) { // get the target stream this.stream = this.target == STD_OUT ? System.out : System.err; - - // set the prefix if we have a >1 parallelism - this.prefix = (numTasks > 1) ? ((taskNumber+1) + "> ") : null; + + /** + * Four possible format options: + * sinkId:taskId> output <- sink id provided, parallelism > 1 + * sinkId> output <- sink id provided, parallelism == 1 + * taskId> output <- no sink id provided, parallelism > 1 + * output <- no sink id provided, parallelism == 1 + */ + if (this.sinkIdentifier != null) { + this.prefix = this.sinkIdentifier; + if (numTasks > 1) { + this.prefix += ":" + (taskNumber + 1); + } + this.prefix += "> "; + } else if (numTasks > 1) { + this.prefix = (taskNumber + 1) + "> "; + } else { + this.prefix = ""; + } + } @Override public void writeRecord(T record) { - if (this.prefix != null) { - this.stream.println(this.prefix + record.toString()); - } - else { - this.stream.println(record.toString()); - } + this.stream.println(this.prefix + record.toString()); } @Override public void close() { this.stream = null; this.prefix = null; + this.sinkIdentifier = null; } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/df7c61e2/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 5676229..0dea42d 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -1332,11 +1332,30 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { } /** - * Writes a DataSet to the standard error stream (stderr).This uses [[AnyRef.toString]] on + * * + * Writes a DataSet to the standard output stream (stdout) with a sink identifier prefixed. + * This uses [[AnyRef.toString]] on each element. + * @param sinkIdentifier The string to prefix the output with. + */ + def print(sinkIdentifier: String): DataSink[T] = { + output(new PrintingOutputFormat[T](sinkIdentifier, false)) + } + + /** + * Writes a DataSet to the standard error stream (stderr). This uses [[AnyRef.toString]] on * each element. */ def printToErr(): DataSink[T] = { output(new PrintingOutputFormat[T](true)) } + + /** + * Writes a DataSet to the standard error stream (stderr) with a sink identifier prefixed. + * This uses [[AnyRef.toString]] on each element. + * @param sinkIdentifier The string to prefix the output with. + */ + def printToErr(sinkIdentifier: String): DataSink[T] = { + output(new PrintingOutputFormat[T](sinkIdentifier, true)) + } }