[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15038752#comment-15038752 ] Roshan Naik commented on FLINK-2583: Would like to point out that this truncate mechanism will only work with a very limited set of file formats.. such as text files. it wont work with most file formats such as compressed, avro or columnar formats > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10.0 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14875653#comment-14875653 ] ASF GitHub Bot commented on FLINK-2583: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1084 > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14805348#comment-14805348 ] ASF GitHub Bot commented on FLINK-2583: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-141411612 Looks good, will merge this! > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14789551#comment-14789551 ] ASF GitHub Bot commented on FLINK-2583: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1084#discussion_r39640862 --- Diff: .travis.yml --- @@ -19,9 +19,9 @@ matrix: - jdk: "oraclejdk7" # this will also deploy a uberjar to s3 at some point env: PROFILE="-Dhadoop.profile=1" - jdk: "openjdk7" - env: PROFILE="-P!include-yarn -Dhadoop.version=2.0.0-alpha" -- jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.2.0" + env: PROFILE="-Dhadoop.version=2.4.0" --- End diff -- I am okay, but this is a pretty important thing. Maybe we should wait for at least one more committer agreeing with us. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14781425#comment-14781425 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/1084#discussion_r39640472 --- Diff: .travis.yml --- @@ -19,9 +19,9 @@ matrix: - jdk: "oraclejdk7" # this will also deploy a uberjar to s3 at some point env: PROFILE="-Dhadoop.profile=1" - jdk: "openjdk7" - env: PROFILE="-P!include-yarn -Dhadoop.version=2.0.0-alpha" -- jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.2.0" + env: PROFILE="-Dhadoop.version=2.4.0" --- End diff -- So you are OK with merging it as is and then changing the build matrix later? > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14768885#comment-14768885 ] ASF GitHub Bot commented on FLINK-2583: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-140732373 I think the pull request has grown quite a lot. I think we should merge it now and then improve it from there. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14768881#comment-14768881 ] ASF GitHub Bot commented on FLINK-2583: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1084#discussion_r39624668 --- Diff: .travis.yml --- @@ -19,9 +19,9 @@ matrix: - jdk: "oraclejdk7" # this will also deploy a uberjar to s3 at some point env: PROFILE="-Dhadoop.profile=1" - jdk: "openjdk7" - env: PROFILE="-P!include-yarn -Dhadoop.version=2.0.0-alpha" -- jdk: "oraclejdk7" - env: PROFILE="-Dhadoop.version=2.2.0" + env: PROFILE="-Dhadoop.version=2.4.0" --- End diff -- As per mailing list discussion, we agreed to support hadoop 2.3.0. But it seems that the yarn tests are not working with 2.3.0, so if you want we can set the version to 2.3.0 when the tests are fixed. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14746065#comment-14746065 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-140523411 I fixed the bug and now I'm confident that it should work. Also, I updated the travis build matrix because the sink does not work with hadoop 2.0.0-alpha. We had discussions about changing it, so I hope it is OK. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745062#comment-14745062 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aminouvic commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-140325179 Yeah you're right better have an operational version of the sink first, followup JIRA created https://issues.apache.org/jira/browse/FLINK-2672 > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14743411#comment-14743411 ] ASF GitHub Bot commented on FLINK-2583: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-140048228 Yes, let's merge it before extending it. @aminouvic If you want, can you create a JIRA with a description of that behavior, as a followup task? As a workaround, you should be able to do this right now with the help of splitting data streams and having multiple HDFS sinks. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14743091#comment-14743091 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-139995006 Yes, I would be very much in favor of adding it in the current incarnation before adding more stuff. It sill has a bug right now with exactly once, somehow some elements seem to get lost. I'm still debugging. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14743089#comment-14743089 ] ASF GitHub Bot commented on FLINK-2583: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-139994388 Yes, we just had a discussion on the user mailing list about a partitioned output format for batch processing. Definitely a good addition for both DataSet and DataStream API. How about we merge this PR (once it is ready) and add the partitioning functionality later? > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14742615#comment-14742615 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aminouvic commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-139911603 Since the RollingSink was a little inspired by flume's HDFS Sink, it would be nice to include another really valuable features that could make it more complete. One of the most common use cases of the HDFS Sink is to dispatch data into multiple directories depending of attributes present in source events. For example, let's say we have some data that have a timestamp and status fields, one can specify in Flume conf file to write data into different directories like this: hadfs.path=/somepath/%{timestamp}/%{status} The result will be to write data into multiple folders like /somepath/some_timestamp/wellformed /somepath/some_timestamp/malformed /somepath/some_timestamp/incomplete ... etc To achieve this, Flume maintains a LRU Hashmap to hold not one but a set of Writers, and computes the destination path for each event. It also uses some params like maxOpenfFiles, idleTimeOut(optional, used to close a file after x seconds of inactivity) to ensure not having too many open files. But to include this, the bucketing, cleanOnStartup and the checkpointing logics need to be changed. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14733537#comment-14733537 ] ASF GitHub Bot commented on FLINK-2583: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-138268930 Let's add this after we merge this, but it sounds quite valueable to me... > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14733470#comment-14733470 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-138252252 It could be done by circumventing FileSystem and just using the regular Java File I/O API to perform the truncate if we detect that the FileSystem works on file "file://" paths. Should we do this? It seems like a bit of a hack to me. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14733408#comment-14733408 ] ASF GitHub Bot commented on FLINK-2583: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-138237480 I mean can we get the "truncate()" behavior for local file systems even if you build for Hadoop versions earlier than 2.7? > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14732005#comment-14732005 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137962730 What do you mean? This is using the Hadoop FileSystem for everything. Is your suggestion to abstract away the filesystems behind our own FileSystem class again? By the way, truncate works with "file://", for example. I didn't check other filesystems. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14731024#comment-14731024 ] ASF GitHub Bot commented on FLINK-2583: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137782086 How hard is it to support the truncate file code path also for regular Unix file systems (rather than only HDFS 2.7+)? The reason is that this way we would support mounted file systems (NAS, EBS) and MapR file system (exposes also a regular Unix file system interface) in a seamless exactly once way (no need to have readers that are aware of the metadata files). > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14730947#comment-14730947 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137764994 I (almost) completely reworked the sink. It is now called `RollingSink` and the module is called `flink-connector-filesystem` to show that it works with any Hadoop FileSystem. It is integrated with the checkpointing mechanism to provide exactly-once semantics. When supported it will use `truncate` for this. Otherwise it will write a special `.valid-length` file that specifies how many bytes in a file are valid. I added an ITCase that verifies the exactly-once behavior. I added a lot of description about how the sink works in the Javadoc of `RollingSink`, so if you want to check it out I suggest you start there. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728787#comment-14728787 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137396268 Question is, should we do exactly once now or put it in as it is (more or less)? > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728770#comment-14728770 ] ASF GitHub Bot commented on FLINK-2583: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137393554 I think using truncate for exactly once is the way to go. To support users with older HDFS versions, how about this: 1. We consider only valid what was written successfully at a checkpoint (hflush/hsync). When we roll over to a new file on restart, we write a `.length` file for that other file that indicates how many bytes are valid in that file. Basically simulating truncate by adding a metadata file. 2. Optionally, the user can activate a merge-on roll-over, that takes all the files from the attempts and all the metadata files, and merges them into one file. This rollover can be written such that it works incrementally and re-tries on failures, etc... > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728745#comment-14728745 ] ASF GitHub Bot commented on FLINK-2583: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1084#discussion_r38627730 --- Diff: docs/apis/streaming_guide.md --- @@ -1836,6 +1837,110 @@ More about information about Elasticsearch can be found [here](https://elastic.c [Back to top](#top) +### HDFS + +This connector provides a Sink that writes rolling files to HDFS. To use this connector, add the +following dependency to your project: + +{% highlight xml %} + + org.apache.flink + flink-connector-hdfs + {{site.version}} + +{% endhighlight %} + +Note that the streaming connectors are currently not part of the binary +distribution. See +[here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution) +for information about how to package the program with the libraries for +cluster execution. + + HDFS Rolling File Sink + +The rolling behaviour as well as the writing can be configured but we will get to that later. +This is how you can create a default rolling sink: + + + +{% highlight java %} +DataStream input = ...; + +input.addSink(new RollingHDFSSink("/base/path")); + +{% endhighlight %} + + +{% highlight scala %} +val input: DataStream[String] = ... + +input.addSink(new RollingHDFSSink("/base/path")) + +{% endhighlight %} + + + +The only required parameter is the base path in HDFS where the rolling files (buckets) will be +stored. The sink can be configured by specifying a custom bucketer, HDFS writer and batch size. + +By default the rolling sink will use the pattern `"-MM-dd--HH"` to name the rolling buckets. --- End diff -- Can you make it a bit more explicit that a new directory is created when the pattern changes? > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728744#comment-14728744 ] ASF GitHub Bot commented on FLINK-2583: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137390414 I'm currently trying out the module. Some comments: - Why do we name the module `flink-connector-hdfs`. I think a name such as `flink-connector-filesystems` or `flink-connector-hfs` would be more appropriate because its is implemented against Hadoops `FileSystems` classes, and not HDFS classes. So users should be able to use the connector with other FS such as Tachyon, NFS, S3 etc. - Is there a way of re-using existing InputFormats with the rolling file sink? I guess users will start asking about CSV, Avro, Parquet ... - I think there is already some code to monitor a file system directory to ingest it into a data stream. Maybe it would make sense to move that code out of the core into this module? > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727565#comment-14727565 ] ASF GitHub Bot commented on FLINK-2583: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137148970 What about renaming the files upon checkpointing? I mean not triggering the rolling mechanism but to write the current temp file as the latest part and start a new part after the checkpoint. Would this help? Of course this would entail that the different file parts will have different sizes. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727543#comment-14727543 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137144072 Should be, but then we can also just keep them in memory and write when checkpointing. But this has more potential of blowing up because OOM. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727540#comment-14727540 ] ASF GitHub Bot commented on FLINK-2583: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137143368 Concerning the second short term option: Is the problem there that the checkpointing is not aligned with the rolling? Thus, you can take a checkpoint but you still have a temporary file which contains all the records up to the checkpoint. If the sink now fails before renaming the temporary file, then the records which have been written before the checkpoint was taken are lost. Did I understand it correctly? If this is the case, then the records in the temporary file should actually be part of the state of the sink, right? > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727484#comment-14727484 ] ASF GitHub Bot commented on FLINK-2583: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137125224 If it fails in the middle of writing or before sync/flush is called on the writer then the data can be in an inconsistent state. I see three ways of dealing with this, one is more long-term. The long term solution is to make the sink exactly-once aware. Either using truncate() support in Hadoop 2.7 or a custom Thread that does merging of part files and throwing away of data that was erroneously written. The two short term options are: - Keep it as it is, consumers need to be able to deal with corrupt records and ignore them. This would give you at-least-once semantics. - Write to a temporary file. When rolling, close the current bucket and rename the file to the final filename. This would ensure that the output doesn't contain corrupt records but you would have neither at-least-once nor exactly-once semantics because some written records would be lost if checkpoint restore restores to a state after the writing of the current bucket file started. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727391#comment-14727391 ] ASF GitHub Bot commented on FLINK-2583: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1084#issuecomment-137094936 Looks very good from a first glance! Can you explain how the writing behaves in cases of failues? Will it start a new file? Will it try to append to the previous one? What about incomplete records? > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725605#comment-14725605 ] ASF GitHub Bot commented on FLINK-2583: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1084#discussion_r38435351 --- Diff: flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-hdfs/pom.xml --- @@ -0,0 +1,107 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors-parent + 0.10-SNAPSHOT + .. + + + flink-connector-hdfs + flink-connector-hdfs + + jar + + + + + + + + org.apache.flink + flink-streaming-core + ${project.version} + + + + + org.apache.flink + flink-streaming-core + ${project.version} + test + test-jar + + + + org.apache.flink + flink-tests + ${project.version} + test + + + + org.apache.flink + flink-test-utils + ${project.version} + test + + + + + org.apache.flink + flink-runtime + test + test-jar + ${project.version} + + + --- End diff -- why are you not using our shaded hadoop dependency? > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files
[ https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725568#comment-14725568 ] ASF GitHub Bot commented on FLINK-2583: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/1084 [FLINK-2583] Add Stream Sink For Rolling HDFS Files Note: The rolling sink is not yet integrated with checkpointing/fault-tolerance. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink hdfs-sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1084.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1084 commit 8c852c2abf3a1e597dd1d139197d3420861a516c Author: Robert Metzger Date: 2015-08-27T16:13:08Z [FLINK-2584] Check for unshaded classes in fat jar and shade curator This closes #1076 commit f4a48c23a30c170a5a2c08c27e1f01f7827eefd2 Author: chengxiang li Date: 2015-08-31T06:02:27Z [FLINK-2596] Remove non-transitive comparator in random sampler test. This closes #1080 commit ab14f90142fd69426bb695cbdb641f0a5a0c46f7 Author: Martin Junghanns Date: 2015-08-29T20:51:19Z [FLINK-2590] fixing DataSetUtils.zipWithUniqueId() and DataSetUtils.zipWithIndex() * modified algorithm as explained in the issue * updated method documentation [FLINK-2590] reducing required bit shift size * maximum bit size is changed to getNumberOfParallelSubTasks() - 1 This closes #1075. commit 6a58aadec15657a7da60c58ef6d5dbbf7e5ca14b Author: Till Rohrmann Date: 2015-09-01T10:04:23Z [FLINK-2590] Fixes Scala's DataSetUtilsITCase commit 81276ff88bb7185d93bbf92392b82b25ece7aff1 Author: Aljoscha Krettek Date: 2015-08-31T08:01:38Z [FLINK-2583] Add Stream Sink For Rolling HDFS Files The rolling sink is not yet integrated with checkpointing/fault-tolerance. > Add Stream Sink For Rolling HDFS Files > -- > > Key: FLINK-2583 > URL: https://issues.apache.org/jira/browse/FLINK-2583 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 0.10 > > > In addition to having configurable file-rolling behavior the Sink should also > integrate with checkpointing to make it possible to have exactly-once > semantics throughout the topology. -- This message was sent by Atlassian JIRA (v6.3.4#6332)