[ 
https://issues.apache.org/jira/browse/FLINK-1081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14228342#comment-14228342
 ] 

ASF GitHub Bot commented on FLINK-1081:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/226#discussion_r21037226
  
    --- Diff: 
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileStreamFunction.java
 ---
    @@ -17,33 +17,125 @@
     
     package org.apache.flink.streaming.api.function.source;
     
    +import org.apache.flink.core.fs.FSDataInputStream;
    +import org.apache.flink.core.fs.FileStatus;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.util.Collector;
    +
     import java.io.BufferedReader;
    -import java.io.FileReader;
     import java.io.IOException;
    -
    -import org.apache.flink.util.Collector;
    +import java.io.InputStreamReader;
    +import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
     
     public class FileStreamFunction implements SourceFunction<String> {
        private static final long serialVersionUID = 1L;
     
    -   private final String path;
    +   public enum WatchType {
    +           ONLY_NEW_FILES, // Only new files will be processed.
    +           REPROCESS_WITH_APPENDED, // When some files are appended, all 
contents of the files will be processed.
    +           PROCESS_ONLY_APPENDED // When some files are appended, only 
appended contents will be processed.
    +   }
     
    -   public FileStreamFunction(String path) {
    +   private String path;
    +   private long interval;
    +   private WatchType watchType;
    +
    +   private FileSystem fileSystem;
    +   private long lastModificationTime;
    +   private Map<String, Long> offsetOfFiles;
    +
    +   public FileStreamFunction(String path, long interval, WatchType 
watchType) {
                this.path = path;
    +           this.interval = interval;
    +           this.watchType = watchType;
    +
    +           this.lastModificationTime = System.currentTimeMillis();
    +           this.offsetOfFiles = new HashMap<String, Long>();
        }
     
        @Override
    -   public void invoke(Collector<String> collector) throws IOException {
    +   public void invoke(Collector<String> collector) throws Exception {
    +           fileSystem = FileSystem.get(new URI(path));
    +
                while (true) {
    -                   BufferedReader br = new BufferedReader(new 
FileReader(path));
    -                   String line = br.readLine();
    -                   while (line != null) {
    -                           if (!line.equals("")) {
    -                                   collector.collect(line);
    +                   List<String> files = listNewFiles();
    +                   for (String filePath : files) {
    +                           if (watchType == WatchType.ONLY_NEW_FILES || 
watchType == WatchType.REPROCESS_WITH_APPENDED) {
    +                                   processEntire(collector, filePath);
    +                           } else {
    +                                   processOnlyAppended(collector, 
filePath);
                                }
    -                           line = br.readLine();
                        }
    -                   br.close();
    +
    +                   Thread.sleep(interval);
                }
        }
    +
    +   private void processEntire(Collector<String> collector, String path) 
throws IOException {
    +           BufferedReader reader = new BufferedReader(new 
InputStreamReader(fileSystem.open(new Path(path))));
    +           String line;
    +
    +           try {
    +                   while ((line = reader.readLine()) != null) {
    +                           collector.collect(line);
    +                   }
    +
    +                   offsetOfFiles.put(path, 0L);
    +           } finally {
    +                   reader.close();
    +           }
    +   }
    +
    +   private void processOnlyAppended(Collector<String> collector, String 
path) throws IOException {
    +           FSDataInputStream stream = fileSystem.open(new Path(path));
    +           if (offsetOfFiles.containsKey(path)) {
    +                   stream.seek(offsetOfFiles.get(path));
    +           }
    +
    +           BufferedReader reader = new BufferedReader(new 
InputStreamReader(stream));
    +           String line;
    +
    +           try {
    +                   while ((line = reader.readLine()) != null) {
    +                           collector.collect(line);
    +                   }
    +
    +                   offsetOfFiles.put(path, stream.getPos());
    +                   System.out.println(path + ": " + 
String.valueOf(stream.getPos()));
    --- End diff --
    
    This seems to be debugging code ;) 


> Add HDFS file-stream source for streaming
> -----------------------------------------
>
>                 Key: FLINK-1081
>                 URL: https://issues.apache.org/jira/browse/FLINK-1081
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 0.7.0-incubating
>            Reporter: Gyula Fora
>            Assignee: Chiwan Park
>              Labels: starter
>
> Add data stream source that will monitor a slected directory on HDFS (or 
> other filesystems as well) and will process all new files created.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to