[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-12-03 Thread Roshan Naik (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15038752#comment-15038752
 ] 

Roshan Naik commented on FLINK-2583:


Would like to point out that this  truncate mechanism will only work with a 
very limited set of file formats.. such as text files. it wont work with most 
file formats such as compressed, avro  or columnar formats

> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10.0
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725568#comment-14725568
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/1084

[FLINK-2583] Add Stream Sink For Rolling HDFS Files

Note: The rolling sink is not yet integrated with 
checkpointing/fault-tolerance.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink hdfs-sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1084.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1084


commit 8c852c2abf3a1e597dd1d139197d3420861a516c
Author: Robert Metzger 
Date:   2015-08-27T16:13:08Z

[FLINK-2584] Check for unshaded classes in fat jar and shade curator

This closes #1076

commit f4a48c23a30c170a5a2c08c27e1f01f7827eefd2
Author: chengxiang li 
Date:   2015-08-31T06:02:27Z

[FLINK-2596] Remove non-transitive comparator in random sampler test.

This closes #1080

commit ab14f90142fd69426bb695cbdb641f0a5a0c46f7
Author: Martin Junghanns 
Date:   2015-08-29T20:51:19Z

[FLINK-2590] fixing DataSetUtils.zipWithUniqueId() and 
DataSetUtils.zipWithIndex()

* modified algorithm as explained in the issue
* updated method documentation

[FLINK-2590] reducing required bit shift size

* maximum bit size is changed to getNumberOfParallelSubTasks() - 1

This closes #1075.

commit 6a58aadec15657a7da60c58ef6d5dbbf7e5ca14b
Author: Till Rohrmann 
Date:   2015-09-01T10:04:23Z

[FLINK-2590] Fixes Scala's DataSetUtilsITCase

commit 81276ff88bb7185d93bbf92392b82b25ece7aff1
Author: Aljoscha Krettek 
Date:   2015-08-31T08:01:38Z

[FLINK-2583] Add Stream Sink For Rolling HDFS Files

The rolling sink is not yet integrated with
checkpointing/fault-tolerance.




> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14725605#comment-14725605
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1084#discussion_r38435351
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-hdfs/pom.xml
 ---
@@ -0,0 +1,107 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors-parent
+   0.10-SNAPSHOT
+   ..
+   
+
+   flink-connector-hdfs
+   flink-connector-hdfs
+
+   jar
+
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-streaming-core
+   ${project.version}
+   
+
+
+   
+   org.apache.flink
+   flink-streaming-core
+   ${project.version}
+   test
+   test-jar
+   
+
+   
+   org.apache.flink
+   flink-tests
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils
+   ${project.version}
+   test
+   
+
+
+   
+   org.apache.flink
+   flink-runtime
+   test
+   test-jar
+   ${project.version}
+   
+
+   
--- End diff --

why are you not using our shaded hadoop dependency?


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727391#comment-14727391
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137094936
  
Looks very good from a first glance!

Can you explain how the writing behaves in cases of failues? Will it start 
a new file? Will it try to append to the previous one? What about incomplete 
records?


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727484#comment-14727484
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137125224
  
If it fails in the middle of writing or before sync/flush is called on the 
writer then the data can be in an inconsistent state. I see three ways of 
dealing with this, one is more long-term.

The long term solution is to make the sink exactly-once aware. Either using 
truncate() support in Hadoop 2.7 or a custom Thread that does merging of part 
files and throwing away of data that was erroneously written.

The two short term options are:
 - Keep it as it is, consumers need to be able to deal with corrupt records 
and ignore them. This would give you at-least-once semantics.
 - Write to a temporary file. When rolling, close the current bucket and 
rename the file to the final filename. This would ensure that the output 
doesn't contain corrupt records but you would have neither at-least-once nor 
exactly-once semantics because some written records would be lost if checkpoint 
restore restores to a state after the writing of the current bucket file 
started.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727540#comment-14727540
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137143368
  
Concerning the second short term option: Is the problem there that the 
checkpointing is not aligned with the rolling? Thus, you can take a checkpoint 
but you still have a temporary file which contains all the records up to the 
checkpoint. If the sink now fails before renaming the temporary file, then the 
records which have been written before the checkpoint was taken are lost. Did I 
understand it correctly? If this is the case, then the records in the temporary 
file should actually be part of the state of the sink, right? 


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727543#comment-14727543
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137144072
  
Should be, but then we can also just keep them in memory and write when 
checkpointing. But this has more potential of blowing up because OOM.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14727565#comment-14727565
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137148970
  
What about renaming the files upon checkpointing? I mean not triggering the 
rolling mechanism but to write the current temp file as the latest part and 
start a new part after the checkpoint. Would this help? Of course this would 
entail that the different file parts will have different sizes.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728744#comment-14728744
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137390414
  
I'm currently trying out the module. Some comments:
- Why do we name the module `flink-connector-hdfs`. I think a name such as 
`flink-connector-filesystems` or `flink-connector-hfs` would be more 
appropriate because its is implemented against Hadoops `FileSystems` classes, 
and not HDFS classes.
So users should be able to use the connector with other FS such as Tachyon, 
NFS, S3 etc.

- Is there a way of re-using existing InputFormats with the rolling file 
sink? I guess users will start asking about CSV, Avro, Parquet ...

- I think there is already some code to monitor a file system directory to 
ingest it into a data stream. Maybe it would make sense to move that code out 
of the core into this module?


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728745#comment-14728745
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1084#discussion_r38627730
  
--- Diff: docs/apis/streaming_guide.md ---
@@ -1836,6 +1837,110 @@ More about information about Elasticsearch can be 
found [here](https://elastic.c
 
 [Back to top](#top)
 
+### HDFS
+
+This connector provides a Sink that writes rolling files to HDFS. To use 
this connector, add the
+following dependency to your project:
+
+{% highlight xml %}
+
+  org.apache.flink
+  flink-connector-hdfs
+  {{site.version}}
+
+{% endhighlight %}
+
+Note that the streaming connectors are currently not part of the binary
+distribution. See

+[here](cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution)
+for information about how to package the program with the libraries for
+cluster execution.
+
+ HDFS Rolling File Sink
+
+The rolling behaviour as well as the writing can be configured but we will 
get to that later.
+This is how you can create a default rolling sink:
+
+
+
+{% highlight java %}
+DataStream input = ...;
+
+input.addSink(new RollingHDFSSink("/base/path"));
+
+{% endhighlight %}
+
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+input.addSink(new RollingHDFSSink("/base/path"))
+
+{% endhighlight %}
+
+
+
+The only required parameter is the base path in HDFS where the rolling 
files (buckets) will be
+stored. The sink can be configured by specifying a custom bucketer, HDFS 
writer and batch size.
+
+By default the rolling sink will use the pattern `"-MM-dd--HH"` to 
name the rolling buckets.
--- End diff --

Can you make it a bit more explicit that a new directory is created when 
the pattern changes?


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728770#comment-14728770
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137393554
  
I think using truncate for exactly once is the way to go. To support users 
with older HDFS versions, how about this:

1. We consider only valid what was written successfully at a checkpoint 
(hflush/hsync). When we roll over to a new file on restart, we write a 
`.length` file for that other file that indicates how many bytes are valid in 
that file. Basically simulating truncate by adding a metadata file.

2. Optionally, the user can activate a merge-on roll-over, that takes all 
the files from the attempts and all the metadata files, and merges them into 
one file. This rollover can be written such that it works incrementally and 
re-tries on failures, etc...



> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14728787#comment-14728787
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137396268
  
Question is, should we do exactly once now or put it in as it is (more or 
less)?


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14730947#comment-14730947
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137764994
  
I (almost) completely reworked the sink. It is now called `RollingSink` and 
the module is called `flink-connector-filesystem` to show that it works with 
any Hadoop FileSystem.

It is integrated with the checkpointing mechanism to provide exactly-once 
semantics. When supported it will use `truncate` for this. Otherwise it will 
write a special `.valid-length` file that specifies how many bytes in a file 
are valid.

I added an ITCase that verifies the exactly-once behavior.

I added a lot of description about how the sink works in the Javadoc of 
`RollingSink`, so if you want to check it out I suggest you start there.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14731024#comment-14731024
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137782086
  
How hard is it to support the truncate file code path also for regular Unix 
file systems (rather than only HDFS 2.7+)?

The reason is that this way we would support mounted file systems (NAS, 
EBS) and MapR file system (exposes also a regular Unix file system interface) 
in a seamless exactly once way (no need to have readers that are aware of the 
metadata files).


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14732005#comment-14732005
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-137962730
  
What do you mean? This is using the Hadoop FileSystem for everything. Is 
your suggestion to abstract away the filesystems behind our own FileSystem 
class again?

By the way, truncate works with "file://", for example. I didn't check 
other filesystems.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14733408#comment-14733408
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-138237480
  
I mean can we get the "truncate()" behavior for local file systems even if 
you build for Hadoop versions earlier than  2.7?


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14733470#comment-14733470
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-138252252
  
It could be done by circumventing FileSystem and just using the regular 
Java File I/O API to perform the truncate if we detect that the FileSystem 
works on file "file://" paths.

Should we do this? It seems like a bit of a hack to me.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14733537#comment-14733537
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-138268930
  
Let's add this after we merge this, but it sounds quite valueable to me...


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14742615#comment-14742615
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aminouvic commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-139911603
  
Since the RollingSink was a little inspired by flume's HDFS Sink, it would 
be nice to include another really valuable features that could make it more 
complete.

One of the most common use cases of the HDFS Sink is to dispatch data into 
multiple directories depending of attributes present in source events.

For example, let's say we have some data that have a timestamp and status 
fields, one can specify in Flume conf file to write data into different 
directories like this:
hadfs.path=/somepath/%{timestamp}/%{status}
The result will be to write data into multiple folders like 
/somepath/some_timestamp/wellformed
/somepath/some_timestamp/malformed
/somepath/some_timestamp/incomplete 
... 
etc

To achieve this, Flume maintains a LRU Hashmap to hold not one but a set of 
Writers, and computes the destination path for each event.
It also uses some params like maxOpenfFiles, idleTimeOut(optional, used to 
close a file after x seconds of inactivity) to ensure not having too many open 
files.

But to include this, the bucketing, cleanOnStartup and the checkpointing 
logics need to be changed.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14743089#comment-14743089
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-139994388
  
Yes, we just had a discussion on the user mailing list about a partitioned 
output format for batch processing. Definitely a good addition for both DataSet 
and DataStream API.

How about we merge this PR (once it is ready) and add the partitioning 
functionality later?


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14743091#comment-14743091
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-139995006
  
Yes, I would be very much in favor of adding it in the current incarnation 
before adding more stuff.

It sill has a bug right now with exactly once, somehow some elements seem 
to get lost. I'm still debugging.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14743411#comment-14743411
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-140048228
  
Yes, let's merge it before extending it.

@aminouvic If you want, can you create a JIRA with a description of that 
behavior, as a followup task?

As a workaround, you should be able to do this right now with the help of 
splitting data streams and having multiple HDFS sinks.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14745062#comment-14745062
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aminouvic commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-140325179
  
Yeah you're right better have an operational version of the sink first, 
followup JIRA created https://issues.apache.org/jira/browse/FLINK-2672 


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14746065#comment-14746065
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-140523411
  
I fixed the bug and now I'm confident that it should work.

Also, I updated the travis build matrix because the sink does not work with 
hadoop 2.0.0-alpha. We had discussions about changing it, so I hope it is OK.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14768881#comment-14768881
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1084#discussion_r39624668
  
--- Diff: .travis.yml ---
@@ -19,9 +19,9 @@ matrix:
 - jdk: "oraclejdk7" # this will also deploy a uberjar to s3 at some 
point
   env: PROFILE="-Dhadoop.profile=1"
 - jdk: "openjdk7"
-  env: PROFILE="-P!include-yarn -Dhadoop.version=2.0.0-alpha"
-- jdk: "oraclejdk7"
-  env: PROFILE="-Dhadoop.version=2.2.0"
+  env: PROFILE="-Dhadoop.version=2.4.0"
--- End diff --

As per mailing list discussion, we agreed to support hadoop 2.3.0.
But it seems that the yarn tests are not working with 2.3.0, so if you want 
we can set the version to 2.3.0 when the tests are fixed.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14768885#comment-14768885
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-140732373
  
I think the pull request has grown quite a lot. I think we should merge it 
now and then improve it from there.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14781425#comment-14781425
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1084#discussion_r39640472
  
--- Diff: .travis.yml ---
@@ -19,9 +19,9 @@ matrix:
 - jdk: "oraclejdk7" # this will also deploy a uberjar to s3 at some 
point
   env: PROFILE="-Dhadoop.profile=1"
 - jdk: "openjdk7"
-  env: PROFILE="-P!include-yarn -Dhadoop.version=2.0.0-alpha"
-- jdk: "oraclejdk7"
-  env: PROFILE="-Dhadoop.version=2.2.0"
+  env: PROFILE="-Dhadoop.version=2.4.0"
--- End diff --

So you are OK with merging it as is and then changing the build matrix 
later?


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14789551#comment-14789551
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/1084#discussion_r39640862
  
--- Diff: .travis.yml ---
@@ -19,9 +19,9 @@ matrix:
 - jdk: "oraclejdk7" # this will also deploy a uberjar to s3 at some 
point
   env: PROFILE="-Dhadoop.profile=1"
 - jdk: "openjdk7"
-  env: PROFILE="-P!include-yarn -Dhadoop.version=2.0.0-alpha"
-- jdk: "oraclejdk7"
-  env: PROFILE="-Dhadoop.version=2.2.0"
+  env: PROFILE="-Dhadoop.version=2.4.0"
--- End diff --

I am okay, but this is a pretty important thing. Maybe we should wait for 
at least one more committer agreeing with us.


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14805348#comment-14805348
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1084#issuecomment-141411612
  
Looks good, will merge this!


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2583) Add Stream Sink For Rolling HDFS Files

2015-09-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14875653#comment-14875653
 ] 

ASF GitHub Bot commented on FLINK-2583:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1084


> Add Stream Sink For Rolling HDFS Files
> --
>
> Key: FLINK-2583
> URL: https://issues.apache.org/jira/browse/FLINK-2583
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> In addition to having configurable file-rolling behavior the Sink should also 
> integrate with checkpointing to make it possible to have exactly-once 
> semantics throughout the topology.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)