buildbot success in on flink-docs-release-1.1

2016-12-08 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-1.1 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-1.1/builds/133

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave1_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.1' 
triggered this build
Build Source Stamp: [branch release-1.1] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





buildbot success in on flink-docs-master

2016-12-08 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-master while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-master/builds/550

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave2_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered 
this build
Build Source Stamp: [branch master] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





flink git commit: [FLINK-5039] Bump Avro version to 1.7.7.

2016-12-08 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.1 671b434cf -> 3ae6e9e09


[FLINK-5039] Bump Avro version to 1.7.7.

This closes #2953.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ae6e9e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ae6e9e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ae6e9e0

Branch: refs/heads/release-1.1
Commit: 3ae6e9e09ba74f88fe87d1ac130b3cc232a5e88c
Parents: 671b434
Author: Robert Metzger 
Authored: Tue Dec 6 21:03:10 2016 +0100
Committer: Fabian Hueske 
Committed: Thu Dec 8 21:24:15 2016 +0100

--
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3ae6e9e0/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 4594a9d..bc7bcec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -213,13 +213,13 @@ under the License.

org.apache.avro
avro
-   1.7.6
+   1.7.7



org.apache.avro
avro-ipc
-   1.7.6
+   1.7.7

 




[4/4] flink git commit: [FLINK-3921] Add support to set encoding in CsvReader and StringParser.

2016-12-08 Thread fhueske
[FLINK-3921] Add support to set encoding in CsvReader and StringParser.

- extends first commit.

This closes #2901.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41d5875b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41d5875b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41d5875b

Branch: refs/heads/master
Commit: 41d5875bfc272f2cd5c7e8c8523036684865c1ce
Parents: f2186af
Author: Greg Hogan 
Authored: Mon Nov 28 12:43:47 2016 -0500
Committer: Fabian Hueske 
Committed: Thu Dec 8 21:21:48 2016 +0100

--
 .../api/common/io/DelimitedInputFormat.java |  67 --
 .../api/common/io/GenericCsvInputFormat.java|  79 +---
 .../apache/flink/types/parser/FieldParser.java  |  18 ++-
 .../api/common/io/DelimitedInputFormatTest.java |  94 ++
 .../common/io/GenericCsvInputFormatTest.java| 125 ++-
 .../types/parser/VarLengthStringParserTest.java |  12 +-
 .../org/apache/flink/api/java/io/CsvReader.java |  24 ++--
 .../apache/flink/api/java/io/CSVReaderTest.java |  23 ++--
 .../flink/api/java/io/CsvInputFormatTest.java   |   6 +-
 .../runtime/io/RowCsvInputFormatTest.scala  |   6 +-
 .../flink/api/scala/io/CsvInputFormatTest.scala |  14 +--
 11 files changed, 273 insertions(+), 195 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index fd02c82..5c8dfc1 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -20,17 +20,18 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
@@ -56,8 +57,11 @@ public abstract class DelimitedInputFormat extends 
FileInputFormat imple
 */
private static final Logger LOG = 
LoggerFactory.getLogger(DelimitedInputFormat.class);
 
-   /** The default charset  to convert strings to bytes */
-   private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8");
+   // The charset used to convert strings to bytes
+   private String charsetName = "UTF-8";
+
+   // Charset is not serializable
+   private transient Charset charset;
 
/**
 * The default read buffer size = 1MB.
@@ -157,9 +161,12 @@ public abstract class DelimitedInputFormat extends 
FileInputFormat imple
// 

//  The configuration parameters. Configured on the instance and 
serialized to be shipped.
// 

-   
+
+   // The delimiter may be set with a byte-sequence or a String. In the 
latter
+   // case the byte representation is updated consistent with current 
charset.
private byte[] delimiter = new byte[] {'\n'};
-   
+   private String delimiterString = null;
+
private int lineLengthLimit = Integer.MAX_VALUE;

private int bufferSize = -1;
@@ -182,8 +189,42 @@ public abstract class DelimitedInputFormat extends 
FileInputFormat imple
}
loadConfigParameters(configuration);
}
-   
-   
+
+   /**
+* Get the character set used for the row delimiter. This is also used 
by
+* subclasses to interpret field delimiters, comment strings, and for
+* configuring {@link FieldParser}s.
+*
+* @return the charset
+*/
+   @PublicEvolving
+   public Charset 

[2/4] flink git commit: [FLINK-5039] Bump Avro version to 1.7.7.

2016-12-08 Thread fhueske
[FLINK-5039] Bump Avro version to 1.7.7.

This closes #2953.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d8f03e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d8f03e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d8f03e7

Branch: refs/heads/master
Commit: 2d8f03e7ad12af3a0dcb7bec087c25f19a4fd03e
Parents: 677d0d9
Author: Robert Metzger 
Authored: Tue Dec 6 21:03:10 2016 +0100
Committer: Fabian Hueske 
Committed: Thu Dec 8 18:45:55 2016 +0100

--
 pom.xml | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2d8f03e7/pom.xml
--
diff --git a/pom.xml b/pom.xml
index d3ddf92..04ba726 100644
--- a/pom.xml
+++ b/pom.xml
@@ -220,13 +220,13 @@ under the License.

org.apache.avro
avro
-   1.7.6
+   1.7.7



org.apache.avro
avro-ipc
-   1.7.6
+   1.7.7

 




flink git commit: [FLINK-5169] [network] Fix String formats in spillable partition

2016-12-08 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.1 b046038ae -> 671b434cf


[FLINK-5169] [network] Fix String formats in spillable partition

This closes #2967.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/671b434c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/671b434c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/671b434c

Branch: refs/heads/release-1.1
Commit: 671b434cf3e09d6edb247f6f84f38b8088fc5c5d
Parents: b046038
Author: Boris Osipov 
Authored: Thu Dec 8 16:58:21 2016 +0300
Committer: Ufuk Celebi 
Committed: Thu Dec 8 15:22:08 2016 +0100

--
 .../runtime/io/network/partition/SpillableSubpartitionView.java| 2 +-
 .../runtime/io/network/partition/SpilledSubpartitionView.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/671b434c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 533f95b..df8de54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -206,7 +206,7 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
public String toString() {
boolean hasSpilled = spilledView != null;
 
-   return String.format("SpillableSubpartitionView(index: %d, 
buffers: %d, spilled? {}) of ResultPartition %s",
+   return String.format("SpillableSubpartitionView(index: %d, 
buffers: %d, spilled? %b) of ResultPartition %s",
parent.index,
numBuffers,
hasSpilled,

http://git-wip-us.apache.org/repos/asf/flink/blob/671b434c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index 7488132..fec0f2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -166,7 +166,7 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
 
@Override
public String toString() {
-   return String.format("SpilledSubpartitionView(index: %d, 
buffers: {}) of ResultPartition %s",
+   return String.format("SpilledSubpartitionView(index: %d, 
buffers: %d) of ResultPartition %s",
parent.index,
numberOfSpilledBuffers,
parent.parent.getPartitionId());



flink git commit: [FLINK-5169] [network] Fix String formats in spillable partition

2016-12-08 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 4eb71927b -> 55d60615a


[FLINK-5169] [network] Fix String formats in spillable partition

This closes #2967.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55d60615
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55d60615
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55d60615

Branch: refs/heads/master
Commit: 55d60615a5e3f72135aad7f0ec5a2c1b4485b908
Parents: 4eb7192
Author: Boris Osipov 
Authored: Thu Dec 8 16:58:21 2016 +0300
Committer: Ufuk Celebi 
Committed: Thu Dec 8 15:21:24 2016 +0100

--
 .../runtime/io/network/partition/SpillableSubpartitionView.java| 2 +-
 .../runtime/io/network/partition/SpilledSubpartitionView.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/55d60615/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 533f95b..df8de54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -206,7 +206,7 @@ class SpillableSubpartitionView implements 
ResultSubpartitionView {
public String toString() {
boolean hasSpilled = spilledView != null;
 
-   return String.format("SpillableSubpartitionView(index: %d, 
buffers: %d, spilled? {}) of ResultPartition %s",
+   return String.format("SpillableSubpartitionView(index: %d, 
buffers: %d, spilled? %b) of ResultPartition %s",
parent.index,
numBuffers,
hasSpilled,

http://git-wip-us.apache.org/repos/asf/flink/blob/55d60615/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index 7488132..fec0f2a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -166,7 +166,7 @@ class SpilledSubpartitionView implements 
ResultSubpartitionView, NotificationLis
 
@Override
public String toString() {
-   return String.format("SpilledSubpartitionView(index: %d, 
buffers: {}) of ResultPartition %s",
+   return String.format("SpilledSubpartitionView(index: %d, 
buffers: %d) of ResultPartition %s",
parent.index,
numberOfSpilledBuffers,
parent.parent.getPartitionId());



[3/3] flink git commit: [FLINK-5020] Make the GenericWriteAheadSink rescalable.

2016-12-08 Thread chesnay
[FLINK-5020] Make the GenericWriteAheadSink rescalable.

Integrates the new state abstractions with the GenericWriteAheadSink
so that the latter can change its parallelism when resuming execution
from a savepoint, without geopardizing the provided guarantees.

This closes #2759


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4eb71927
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4eb71927
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4eb71927

Branch: refs/heads/master
Commit: 4eb71927bc4f0832eb08a79394ad6864a3c2e142
Parents: 86f784a
Author: kl0u 
Authored: Wed Oct 26 17:19:12 2016 +0200
Committer: zentol 
Committed: Thu Dec 8 12:27:14 2016 +0100

--
 .../cassandra/CassandraConnectorITCase.java |  38 ++--
 .../runtime/operators/CheckpointCommitter.java  |   1 +
 .../operators/GenericWriteAheadSink.java| 105 ++-
 .../operators/GenericWriteAheadSinkTest.java|  50 +++---
 .../operators/WriteAheadSinkTestBase.java   | 172 +--
 5 files changed, 276 insertions(+), 90 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
--
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 2bb6fd1..f2e8f8b 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -47,7 +47,6 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.TestEnvironment;
 
@@ -71,6 +70,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Scanner;
 import java.util.UUID;
 
@@ -262,9 +262,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase> harness,
-   CassandraTupleWriteAheadSink> 
sink) {
+   protected void 
verifyResultsIdealCircumstances(CassandraTupleWriteAheadSink> sink) {
 
ResultSet result = session.execute(SELECT_DATA_QUERY);
ArrayList list = new ArrayList<>();
@@ -279,9 +277,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase> harness,
-   CassandraTupleWriteAheadSink> 
sink) {
+   protected void 
verifyResultsDataPersistenceUponMissedNotify(CassandraTupleWriteAheadSink> sink) {
 
ResultSet result = session.execute(SELECT_DATA_QUERY);
ArrayList list = new ArrayList<>();
@@ -296,9 +292,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase> harness,
-   CassandraTupleWriteAheadSink> 
sink) {
+   protected void 
verifyResultsDataDiscardingUponRestore(CassandraTupleWriteAheadSink> sink) {
 
ResultSet result = session.execute(SELECT_DATA_QUERY);
ArrayList list = new ArrayList<>();
@@ -315,6 +309,30 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase 
sink, int startElementCounter, int endElementCounter) {
+
+   // IMPORTANT NOTE:
+   //
+   // for cassandra we always have to start from 1 because
+   // all operators will share the same final db
+
+   ArrayList expected = new ArrayList<>();
+   for (int i = 1; i <= endElementCounter; i++) {
+   expected.add(i);
+   }
+
+   ArrayList actual = new ArrayList<>();
+   ResultSet result = 

[2/3] flink git commit: [FLINK-5164] Disable some Hadoop-compat tests on Windows

2016-12-08 Thread chesnay
[FLINK-5164] Disable some Hadoop-compat tests on Windows

This closes #2889.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe843e13
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe843e13
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe843e13

Branch: refs/heads/master
Commit: fe843e1377aa08a10394bbfa67dc9d3b2a23b805
Parents: 4414008
Author: zentol 
Authored: Fri Nov 25 14:58:48 2016 +0100
Committer: zentol 
Committed: Thu Dec 8 12:04:48 2016 +0100

--
 .../test/hadoopcompatibility/mapred/HadoopMapredITCase.java | 9 +
 .../mapreduce/HadoopInputOutputITCase.java  | 8 
 .../flink/test/hadoop/mapred/HadoopIOFormatsITCase.java | 9 +
 .../flink/test/hadoop/mapred/WordCountMapredITCase.java | 9 +
 .../test/hadoop/mapreduce/WordCountMapreduceITCase.java | 9 +
 .../api/scala/hadoop/mapred/WordCountMapredITCase.scala | 8 
 .../scala/hadoop/mapreduce/WordCountMapreduceITCase.scala   | 8 
 7 files changed, 60 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
--
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
index ccc0d82..0b5a366 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java
@@ -21,12 +21,21 @@ package org.apache.flink.test.hadoopcompatibility.mapred;
 import 
org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.OperatingSystem;
+import org.junit.Assume;
+import org.junit.Before;
 
 public class HadoopMapredITCase extends JavaProgramTestBase {

protected String textPath;
protected String resultPath;
 
+   @Before
+   public void checkOperatingSystem() {
+   // FLINK-5164 - see 
https://wiki.apache.org/hadoop/WindowsProblems
+   Assume.assumeTrue("This test can't run successfully on 
Windows.", !OperatingSystem.isWindows());
+   }
+
@Override
protected void preSubmit() throws Exception {
textPath = createTempFile("text.txt", WordCountData.TEXT);

http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
--
diff --git 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
index 698e356..48aa258 100644
--- 
a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
+++ 
b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java
@@ -21,12 +21,20 @@ package org.apache.flink.test.hadoopcompatibility.mapreduce;
 import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.util.OperatingSystem;
+import org.junit.Assume;
+import org.junit.Before;
 
 public class HadoopInputOutputITCase extends JavaProgramTestBase {

protected String textPath;
protected String resultPath;

+   @Before
+   public void checkOperatingSystem() {
+   // FLINK-5164 - see 
https://wiki.apache.org/hadoop/WindowsProblems
+   Assume.assumeTrue("This test can't run successfully on 
Windows.", !OperatingSystem.isWindows());
+   }

@Override
protected void preSubmit() throws Exception {


[1/3] flink git commit: [FLINK-4563] [metrics] scope caching not adjusted for multiple reporters

2016-12-08 Thread chesnay
Repository: flink
Updated Branches:
  refs/heads/master 441400855 -> 4eb71927b


[FLINK-4563] [metrics] scope caching not adjusted for multiple reporters

This closes #2650.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86f784a3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86f784a3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86f784a3

Branch: refs/heads/master
Commit: 86f784a3613ccd5d78197d94198a64b6f1333578
Parents: fe843e1
Author: Anton Mushin 
Authored: Mon Oct 17 17:23:01 2016 +0400
Committer: zentol 
Committed: Thu Dec 8 12:04:48 2016 +0100

--
 .../metrics/groups/AbstractMetricGroup.java |  53 
 .../metrics/groups/AbstractMetricGroupTest.java | 135 +++
 2 files changed, 165 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/86f784a3/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 04b8158..6ff9776 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -85,9 +86,9 @@ public abstract class AbstractMetricGroup> impl
 *  For example ["host-7", "taskmanager-2", "window_word_count", 
"my-mapper" ]. */
private final String[] scopeComponents;
 
-   /** The metrics scope represented by this group, as a concatenated 
string, lazily computed.
+   /** Array containing the metrics scope represented by this group for 
each reporter, as a concatenated string, lazily computed.
 * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
-   private String scopeString;
+   private final String[] scopeStrings;
 
/** The logical metrics scope represented by this group, as a 
concatenated string, lazily computed.
 * For example: "taskmanager.job.task" */
@@ -105,6 +106,7 @@ public abstract class AbstractMetricGroup> impl
this.registry = checkNotNull(registry);
this.scopeComponents = checkNotNull(scope);
this.parent = parent;
+   this.scopeStrings = new String[registry.getReporters() == null 
? 0 : registry.getReporters().size()];
}
 
public Map getAllVariables() {
@@ -210,19 +212,7 @@ public abstract class AbstractMetricGroup> impl
 * @return fully qualified metric name
 */
public String getMetricIdentifier(String metricName, CharacterFilter 
filter) {
-   if (scopeString == null) {
-   if (filter != null) {
-   scopeString = ScopeFormat.concat(filter, 
registry.getDelimiter(), scopeComponents);
-   } else {
-   scopeString = 
ScopeFormat.concat(registry.getDelimiter(), scopeComponents);
-   }
-   }
-
-   if (filter != null) {
-   return scopeString + registry.getDelimiter() + 
filter.filterCharacters(metricName);
-   } else {
-   return scopeString + registry.getDelimiter() + 
metricName;
-   }
+   return getMetricIdentifier(metricName, filter, -1);
}
 
/**
@@ -235,12 +225,29 @@ public abstract class AbstractMetricGroup> impl
 * @return fully qualified metric name
 */
public String getMetricIdentifier(String metricName, CharacterFilter 
filter, int reporterIndex) {
-   if (filter != null) {
-   scopeString = ScopeFormat.concat(filter, 
registry.getDelimiter(reporterIndex), scopeComponents);
-   return scopeString + 
registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName);
+   if (scopeStrings.length == 0 || (reporterIndex < 0 || 
reporterIndex >= scopeStrings.length)) {
+   char delimiter = registry.getDelimiter();
+   String newScopeString;
+   if (filter != null) {
+   newScopeString = ScopeFormat.concat(filter, 
delimiter, scopeComponents);
+   metricName =