[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2109


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76199872
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -314,43 +299,95 @@ public void testIgnoredUnderscoreFiles() {
}
 
@Test
+   public void testExcludeFiles() {
+   try {
+   final String contents = "CONTENTS";
+
+   // create some accepted, some ignored files
+
+   File child1 = temporaryFolder.newFile("dataFile1.txt");
+   File child2 = 
temporaryFolder.newFile("another_file.bin");
+
+   File[] files = { child1, child2 };
+
+   createTempFiles(contents.getBytes(), files);
+
+   // test that only the valid files are accepted
+
+   Configuration configuration = new Configuration();
+
+   final DummyFileInputFormat format = new 
DummyFileInputFormat();
+   
format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+   format.configure(configuration);
+   format.setFilesFilter(new GlobFilePathFilter(
+   Collections.singletonList("**"),
--- End diff --

Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-25 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76198999
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java 
---
@@ -34,14 +34,18 @@
public static Collection data() {
return Arrays.asList(new Object[][] {
{"file.txt",false},
+
{".file.txt",   true},
-   {"_file.txt",   true},
-   {"_COPYING_",   true},
{"dir/.file.txt",   true},
-   {"dir/_file.txt",   true},
-   {"dir/_COPYING_",   true},
{".dir/file.txt",   false},
+
+   {"_file.txt",   true},
+   {"dir/_file.txt",   true},
{"_dir/file.txt",   false},
+
+   // Check filtering Hadoop's unfinished files
+   {"_COPYING_",   true},
--- End diff --

This should contain the newly introduced constant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-24 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76148927
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
+   @Test
+   public void defaultConstructorCreateMatchAllFilter() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter();
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   }
+
+   @Test
+   public void matchAllFilesByDefault() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.emptyList(),
+   Collections.emptyList());
+
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   }
+
+   @Test
+   public void excludeFilesNotInIncludePatterns() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.singletonList("dir/*"),
+   Collections.emptyList());
+
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   assertTrue(matcher.filterPath(new Path("dir1/file.txt")));
+   }
+
+   @Test
+   public void excludeFilesIfMatchesExclude() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.singletonList("dir/*"),
--- End diff --

Yes, this is supported. Added a test for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-24 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76148802
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -314,43 +299,95 @@ public void testIgnoredUnderscoreFiles() {
}
 
@Test
+   public void testExcludeFiles() {
+   try {
+   final String contents = "CONTENTS";
+
+   // create some accepted, some ignored files
+
+   File child1 = temporaryFolder.newFile("dataFile1.txt");
+   File child2 = 
temporaryFolder.newFile("another_file.bin");
+
+   File[] files = { child1, child2 };
+
+   createTempFiles(contents.getBytes(), files);
+
+   // test that only the valid files are accepted
+
+   Configuration configuration = new Configuration();
+
+   final DummyFileInputFormat format = new 
DummyFileInputFormat();
+   
format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+   format.configure(configuration);
+   format.setFilesFilter(new GlobFilePathFilter(
+   Collections.singletonList("**"),
--- End diff --

* - means any file
** - means any file in any subdirectory 

Added a comment and test for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-24 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76148725
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java 
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class DefaultFilterTest {
+   @Parameters
+   public static Collection data() {
+   return Arrays.asList(new Object[][] {
+   {"file.txt",false},
+   {".file.txt",   true},
+   {"_file.txt",   true},
+   {"_COPYING_",   true},
+   {"dir/.file.txt",   true},
+   {"dir/_file.txt",   true},
+   {"dir/_COPYING_",   true},
--- End diff --

No prolems. I've extracted a constant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76056775
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
+   @Test
+   public void defaultConstructorCreateMatchAllFilter() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter();
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   }
+
+   @Test
+   public void matchAllFilesByDefault() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.emptyList(),
+   Collections.emptyList());
+
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   }
+
+   @Test
+   public void excludeFilesNotInIncludePatterns() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.singletonList("dir/*"),
+   Collections.emptyList());
+
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   assertTrue(matcher.filterPath(new Path("dir1/file.txt")));
+   }
+
+   @Test
+   public void excludeFilesIfMatchesExclude() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.singletonList("dir/*"),
--- End diff --

Is it possible to match file names with star in it, e.g. by escaping `\*` 
to literally match `*`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76056623
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -314,43 +299,95 @@ public void testIgnoredUnderscoreFiles() {
}
 
@Test
+   public void testExcludeFiles() {
+   try {
+   final String contents = "CONTENTS";
+
+   // create some accepted, some ignored files
+
+   File child1 = temporaryFolder.newFile("dataFile1.txt");
+   File child2 = 
temporaryFolder.newFile("another_file.bin");
+
+   File[] files = { child1, child2 };
+
+   createTempFiles(contents.getBytes(), files);
+
+   // test that only the valid files are accepted
+
+   Configuration configuration = new Configuration();
+
+   final DummyFileInputFormat format = new 
DummyFileInputFormat();
+   
format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+   format.configure(configuration);
+   format.setFilesFilter(new GlobFilePathFilter(
+   Collections.singletonList("**"),
--- End diff --

`**` would be the same as `*`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76055091
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java 
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class DefaultFilterTest {
+   @Parameters
+   public static Collection data() {
+   return Arrays.asList(new Object[][] {
+   {"file.txt",false},
+   {".file.txt",   true},
+   {"_file.txt",   true},
+   {"_COPYING_",   true},
+   {"dir/.file.txt",   true},
+   {"dir/_file.txt",   true},
+   {"dir/_COPYING_",   true},
--- End diff --

It seems quite arbitrary that we exclude this file. I know that you didn't 
introduce that. Still, could we move it to a constant and document it? This 
seems to be Hadoop's hack to indicate an unfinished file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73213456
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
--- End diff --

"glob" is the simplified regex syntax that is used in Linux shell, 
.gitignore files and many other cases. 
https://en.wikipedia.org/wiki/Glob_(programming)

I thought that it should be used instead of regexes because it should cover 
most of the practical use cases and is much simpler to use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73213488
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -257,41 +257,22 @@ public void testFileInputSplit() {
public void testIgnoredUnderscoreFiles() {
try {
final String contents = "CONTENTS";
-   
+
// create some accepted, some ignored files
-   
-   File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-   File f;
-   do {
-   f = new File(tempDir, 
TestFileUtils.randomFileName(""));
-   }
-   while (f.exists());
 
-   assertTrue(f.mkdirs());
-   f.deleteOnExit();
-   
-   File child1 = new File(f, "dataFile1.txt");
-   File child2 = new File(f, "another_file.bin");
-   File luigiFile = new File(f, "_luigi");
-   File success = new File(f, "_SUCCESS");
-   
-   File[] files = { child1, child2, luigiFile, success };
-   
-   for (File child : files) {
-   child.deleteOnExit();
-   
-   BufferedWriter out = new BufferedWriter(new 
FileWriter(child));
-   try { 
-   out.write(contents);
-   } finally {
-   out.close();
-   }
-   }
+   File tempDirectory = createTempDirectory();

+   File child1 = new File(tempDirectory, "dataFile1.txt");
+   File child2 = new File(tempDirectory, 
"another_file.bin");
+   File luigiFile = new File(tempDirectory, "_luigi");
+   File success = new File(tempDirectory, "_SUCCESS");
+
+   createTempFiles(contents.getBytes(), child1, child2, 
luigiFile, success);
--- End diff --

Sure, I'll update this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73212835
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -995,8 +993,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() 
{
public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

FileProcessingMode watchType,
-   
long interval,
-   
FilePathFilter filter) {
+   
long interval) {
--- End diff --

Good catch. I'll revert this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162978
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -257,41 +257,22 @@ public void testFileInputSplit() {
public void testIgnoredUnderscoreFiles() {
try {
final String contents = "CONTENTS";
-   
+
// create some accepted, some ignored files
-   
-   File tempDir = new 
File(System.getProperty("java.io.tmpdir"));
-   File f;
-   do {
-   f = new File(tempDir, 
TestFileUtils.randomFileName(""));
-   }
-   while (f.exists());
 
-   assertTrue(f.mkdirs());
-   f.deleteOnExit();
-   
-   File child1 = new File(f, "dataFile1.txt");
-   File child2 = new File(f, "another_file.bin");
-   File luigiFile = new File(f, "_luigi");
-   File success = new File(f, "_SUCCESS");
-   
-   File[] files = { child1, child2, luigiFile, success };
-   
-   for (File child : files) {
-   child.deleteOnExit();
-   
-   BufferedWriter out = new BufferedWriter(new 
FileWriter(child));
-   try { 
-   out.write(contents);
-   } finally {
-   out.close();
-   }
-   }
+   File tempDirectory = createTempDirectory();

+   File child1 = new File(tempDirectory, "dataFile1.txt");
+   File child2 = new File(tempDirectory, 
"another_file.bin");
+   File luigiFile = new File(tempDirectory, "_luigi");
+   File success = new File(tempDirectory, "_SUCCESS");
+
+   createTempFiles(contents.getBytes(), child1, child2, 
luigiFile, success);
--- End diff --

You could actually use `TemporaryFolder` here to create the root. It would 
be cleaned up automatically by junit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162125
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -274,7 +272,7 @@ public int compare(Tuple2 
o1, Tuple2

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162167
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
--- End diff --

What is "Glob"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73162004
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -274,7 +272,7 @@ public int compare(Tuple2 
o1, Tuple2

[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-02 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r73161791
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -995,8 +993,7 @@ public TimeCharacteristic getStreamTimeCharacteristic() 
{
public  DataStreamSource readFile(FileInputFormat 
inputFormat,

String filePath,

FileProcessingMode watchType,
-   
long interval,
-   
FilePathFilter filter) {
+   
long interval) {
--- End diff --

This is a breaking API change. `StreamExecutionEnvironment` is 
annotated`@Public`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-06-27 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r68578183
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -314,41 +297,101 @@ public void testIgnoredUnderscoreFiles() {
}
 
@Test
+   public void testExcludeFiles() {
+   try {
+   final String contents = "CONTENTS";
+
+   // create some accepted, some ignored files
+
+   File f = createTempDirectory();
+
+   File child1 = new File(f, "dataFile1.txt");
+   File child2 = new File(f, "another_file.bin");
+
+   File[] files = { child1, child2 };
+
+   createTempFiles(contents.getBytes(), files);
+
+   // test that only the valid files are accepted
+
+   Configuration configuration = new Configuration();
+   configuration.setString("fs.input.include", "**");
+   configuration.setString("fs.input.exclude", 
"**/another_file.bin");
--- End diff --

these keys no longer have to be set


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-06-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r67468955
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FilesFilter.java ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+
+import java.io.Serializable;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class for determining if a particular file should be included or 
excluded.
+ *
+ *  If does not match an include pattern it is excluded. If it matches 
and include
+ * pattern but also matches an exclude pattern it is excluded.
+ *
+ *  If no patterns are provided all files are included
+ */
+@Internal
+public class FilesFilter implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private final List includeMatchers;
+   private final List excludeMatchers;
+
+   /**
+* Constructor for FilesFilter
+*
+* @param includePatterns glob patterns for files to include
+* @param excludePatterns glob patterns for files to exclude
+*/
+   public FilesFilter(String[] includePatterns, String[] excludePatterns) {
+   includeMatchers = buildPatterns(includePatterns);
+   excludeMatchers = buildPatterns(excludePatterns);
+   }
+
+   private List buildPatterns(String[] patterns) {
+   FileSystem fileSystem = FileSystems.getDefault();
+   List matchers = new ArrayList<>();
+
+   for (String patternStr : patterns) {
+   matchers.add(fileSystem.getPathMatcher("glob:" + 
patternStr));
--- End diff --

will these matchers also work with files that reside in HDFS?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r67319452
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FilesFilter.java ---
@@ -0,0 +1,79 @@
+package org.apache.flink.api.common.io;
--- End diff --

missing license


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r67319428
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FilesFilterTest.java ---
@@ -0,0 +1,33 @@
+package org.apache.flink.api.common.io;
--- End diff --

missing license


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r67319317
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---
@@ -70,7 +70,17 @@
 * The fraction that the last split may be larger than the others.
 */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
-   
+
+   /**
+* Patterns for file names to include
+*/
+   private static final String INCLUDE_PATTERNS = 
"fileInputFormat.include";
--- End diff --

these should be prefixed with KEY_


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-06-16 Thread mushketyk
GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/2109

[FLINK-3677] FileInputFormat: Allow to specify include/exclude file name 
patterns

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink file-include

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2109


commit f45dadf90322cc180b1afe1dc91d83a3aced7e22
Author: Ivan Mushketyk 
Date:   2016-06-14T21:44:19Z

[FLINK-3677] Add FileMatcher class

commit 8d4a2a72020ee19fe70d06ded953af67b6ed487d
Author: Ivan Mushketyk 
Date:   2016-06-15T20:47:43Z

[FLINK-3677] FileInputFormat: Allow to specify include/exclude file name 
patterns

commit 350527bb6c1bf7a6e9d252bb921125249103635c
Author: Ivan Mushketyk 
Date:   2016-06-15T20:48:13Z

[FLINK-3677] Rename test

commit 6045fe79e192aca6e97f568351927a17e0c64d09
Author: Ivan Mushketyk 
Date:   2016-06-15T21:07:55Z

[FLINK-3677] Tests refactoring




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---