Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1098#discussion_r45749750
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
    @@ -717,23 +717,188 @@ class DataStream[T](javaStream: JavaStream[T]) {
        * every element of the DataStream the result of .toString
        * is written.
        *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param millis
    +   * the file update frequency
    +   *
    +   * @return the closed DataStream
        */
       def writeAsText(path: String, millis: Long = 0): DataStreamSink[T] =
         javaStream.writeAsText(path, millis)
     
       /**
    +   * Writes a DataStream to the file specified by path in text format. For
    +   * every element of the DataStream the result of .toString
    +   * is written.
    +   *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param writeMode
    +   * Controls the behavior for existing files. Options are
    +   * NO_OVERWRITE and OVERWRITE.
    +   *
    +   * @return the closed DataStream
    +   *
    +   */
    +  def writeAsText(
    +       path: String,
    +       writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
    +    if (writeMode != null) {
    +      javaStream.writeAsText(path, writeMode)
    +    } else {
    +      javaStream.writeAsText(path)
    +    }
    +  }
    +
    +  /**
        * Writes a DataStream to the file specified by path in text format. The
        * writing is performed periodically, in every millis milliseconds. For
        * every element of the DataStream the result of .toString
        * is written.
        *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param writeMode
    +   * Controls the behavior for existing files. Options are
    +   * NO_OVERWRITE and OVERWRITE.
    +   *
    +   * @param millis
    +   * the file update frequency
    +   *
    +   * @return the closed DataStream
    +   *
    +   */
    +  def writeAsText(
    +       path: String,
    +       writeMode: FileSystem.WriteMode,
    +       millis: Long): DataStreamSink[T] = {
    +    if (writeMode != null) {
    +      javaStream.writeAsText(path, writeMode, millis)
    +    } else {
    +      javaStream.writeAsText(path, millis)
    +    }
    +  }
    +
    +  /**
    +   * Writes a DataStream to the file specified by path in csv format. The
    +   * writing is performed periodically, in every millis milliseconds. For
    +   * every element of the DataStream the result of .toString
    +   * is written.
    +   *
    +   * @param path
    +   * the path pointing to the location the text file is written to
    +   *
    +   * @param millis
    +   * the file update frequency
    +   *
    +   * @return the closed DataStream
    +   */
    +  def writeAsCsv(
    +      path: String,
    +      millis: Long = 0): DataStreamSink[T] = {
    +    require(javaStream.getType.isTupleType, "CSV output can only be used 
with Tuple DataSets.")
    +    val of = new ScalaCsvOutputFormat[Product](
    +      new Path(path),
    +      ScalaCsvOutputFormat.DEFAULT_LINE_DELIMITER,
    +      ScalaCsvOutputFormat.DEFAULT_FIELD_DELIMITER)
    +    javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
    --- End diff --
    
    Could you call `writeAsCsv(String, WriteMode, Long, String, String)` here 
with the respective parameter values instead? That way we have only one place 
where we have to apply changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to