Repository: flink
Updated Branches:
  refs/heads/master a0147c493 -> df7c61e2e


[FLINK-1486] add additional print method for prefixing a user-defined string

- extend API to include a print(String sinkIdentifier) method
- change PrintingOutputformat to include the sink identifier
- if appropriate, print sink identifier and task id
- update documentation

This closes #372


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df7c61e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df7c61e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df7c61e2

Branch: refs/heads/master
Commit: df7c61e2e5e4a901470abc7b7ac72e2167045a95
Parents: a0147c4
Author: Maximilian Michels <m...@apache.org>
Authored: Mon Apr 20 18:29:43 2015 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Apr 22 13:38:32 2015 +0200

----------------------------------------------------------------------
 docs/programming_guide.md                       |  6 ++-
 .../java/org/apache/flink/api/java/DataSet.java | 22 ++++++++
 .../flink/api/java/io/PrintingOutputFormat.java | 53 ++++++++++++++------
 .../org/apache/flink/api/scala/DataSet.scala    | 21 +++++++-
 4 files changed, 84 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df7c61e2/docs/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/programming_guide.md b/docs/programming_guide.md
index 8638af2..d9b1753 100644
--- a/docs/programming_guide.md
+++ b/docs/programming_guide.md
@@ -1823,8 +1823,10 @@ DataSet:
   are obtained by calling a user-defined *format()* method for each element.
 - `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated 
value files. Row and field
   delimiters are configurable. The value for each field comes from the 
*toString()* method of the objects.
-- `print()` / `printToErr()` - Prints the *toString()* value of each element 
on the
-  standard out / strandard error stream.
+- `print()` / `printToErr()` / `print(String msg)` / `printToErr(String msg)` 
- Prints the *toString()* value
+of each element on the standard out / strandard error stream. Optionally, a 
prefix (msg) can be provided which is
+prepended to the output. This can help to distinguish between different calls 
to *print*. If the parallelism is
+greater than 1, the output will also be prepended with the identifier of the 
task which produced the output.
 - `write()` / `FileOutputFormat` - Method and base class for custom file 
outputs. Supports
   custom object-to-bytes conversion.
 - `output()`/ `OutputFormat` - Most generic output method, for data sinks that 
are not file based

http://git-wip-us.apache.org/repos/asf/flink/blob/df7c61e2/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index a6a0af8..5d1ca4c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1349,6 +1349,17 @@ public abstract class DataSet<T> {
        public DataSink<T> print() {
                return output(new PrintingOutputFormat<T>(false));
        }
+
+       /**
+        * Writes a DataSet to the standard output stream (stdout).<br/>
+        * For each element of the DataSet the result of {@link 
Object#toString()} is written.
+        *
+        *  @param sinkIdentifier The string to prefix the output with.
+        *  @return The DataSink that writes the DataSet.
+        */
+       public DataSink<T> print(String sinkIdentifier) {
+               return output(new PrintingOutputFormat<T>(sinkIdentifier, 
false));
+       }
        
        /**
         * Writes a DataSet to the standard error stream (stderr).<br/>
@@ -1359,6 +1370,17 @@ public abstract class DataSet<T> {
        public DataSink<T> printToErr() {
                return output(new PrintingOutputFormat<T>(true));
        }
+
+       /**
+        * Writes a DataSet to the standard error stream (stderr).<br/>
+        * For each element of the DataSet the result of {@link 
Object#toString()} is written.
+        *
+        * @param sinkIdentifier The string to prefix the output with.
+        * @return The DataSink that writes the DataSet.
+        */
+       public DataSink<T> printToErr(String sinkIdentifier) {
+               return output(new PrintingOutputFormat<T>(sinkIdentifier, 
true));
+       }
        
        /**
         * Writes a DataSet using a {@link FileOutputFormat} to a specified 
location.

http://git-wip-us.apache.org/repos/asf/flink/blob/df7c61e2/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
index 8e028eb..d24a369 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrintingOutputFormat.java
@@ -30,10 +30,11 @@ public class PrintingOutputFormat<T> implements 
OutputFormat<T> {
 
        private static final boolean STD_OUT = false;
        private static final boolean STD_ERR = true;
-       
-       
-       private boolean target; 
-       
+
+       private String sinkIdentifier;
+
+       private boolean target;
+
        private transient PrintStream stream;
        
        private transient String prefix;
@@ -53,7 +54,16 @@ public class PrintingOutputFormat<T> implements 
OutputFormat<T> {
        public PrintingOutputFormat(boolean stdErr) {
                this.target = stdErr;
        }
-       
+
+       /**
+        * Instantiates a printing output format that prints to standard out 
with a prefixed message.
+        * @param sinkIdentifier Message that is prefixed to the output of the 
value.
+        * @param stdErr True, if the format should print to standard error 
instead of standard out.
+        */
+       public PrintingOutputFormat(String sinkIdentifier, boolean stdErr) {
+               this(stdErr);
+               this.sinkIdentifier = sinkIdentifier;
+       }
        
        public void setTargetToStandardOut() {
                this.target = STD_OUT;
@@ -61,7 +71,7 @@ public class PrintingOutputFormat<T> implements 
OutputFormat<T> {
        
        public void setTargetToStandardErr() {
                this.target = STD_ERR;
-       }       
+       }
        
        
        @Override
@@ -72,25 +82,38 @@ public class PrintingOutputFormat<T> implements 
OutputFormat<T> {
        public void open(int taskNumber, int numTasks) {
                // get the target stream
                this.stream = this.target == STD_OUT ? System.out : System.err;
-               
-               // set the prefix if we have a >1 parallelism
-               this.prefix = (numTasks > 1) ? ((taskNumber+1) + "> ") : null;
+
+               /**
+                * Four possible format options:
+                *      sinkId:taskId> output  <- sink id provided, parallelism 
> 1
+                *      sinkId> output         <- sink id provided, parallelism 
== 1
+                *      taskId> output         <- no sink id provided, 
parallelism > 1
+                *      output                 <- no sink id provided, 
parallelism == 1
+                */
+               if (this.sinkIdentifier != null) {
+                       this.prefix = this.sinkIdentifier;
+                       if (numTasks > 1) {
+                               this.prefix += ":" + (taskNumber + 1);
+                       }
+                       this.prefix += "> ";
+               } else if (numTasks > 1) {
+                       this.prefix = (taskNumber + 1) + "> ";
+               } else {
+                       this.prefix = "";
+               }
+
        }
 
        @Override
        public void writeRecord(T record) {
-               if (this.prefix != null) {
-                       this.stream.println(this.prefix + record.toString());
-               }
-               else {
-                       this.stream.println(record.toString());
-               }
+               this.stream.println(this.prefix + record.toString());
        }
 
        @Override
        public void close() {
                this.stream = null;
                this.prefix = null;
+               this.sinkIdentifier = null;
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/df7c61e2/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 5676229..0dea42d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -1332,11 +1332,30 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
   }
 
   /**
-   * Writes a DataSet to the standard error stream (stderr).This uses 
[[AnyRef.toString]] on
+   * *
+   * Writes a DataSet to the standard output stream (stdout) with a sink 
identifier prefixed.
+   * This uses [[AnyRef.toString]] on each element.
+   * @param sinkIdentifier The string to prefix the output with.
+   */
+  def print(sinkIdentifier: String): DataSink[T] = {
+    output(new PrintingOutputFormat[T](sinkIdentifier, false))
+  }
+
+  /**
+   * Writes a DataSet to the standard error stream (stderr). This uses 
[[AnyRef.toString]] on
    * each element.
    */
   def printToErr(): DataSink[T] = {
     output(new PrintingOutputFormat[T](true))
   }
+
+  /**
+   * Writes a DataSet to the standard error stream (stderr) with a sink 
identifier prefixed.
+   * This uses [[AnyRef.toString]] on each element.
+   * @param sinkIdentifier The string to prefix the output with.
+   */
+  def printToErr(sinkIdentifier: String): DataSink[T] = {
+      output(new PrintingOutputFormat[T](sinkIdentifier, true))
+  }
 }
 

Reply via email to