buildbot success in on flink-docs-release-1.1
The Buildbot has detected a restored build on builder flink-docs-release-1.1 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-1.1/builds/133 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave1_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.1' triggered this build Build Source Stamp: [branch release-1.1] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
buildbot success in on flink-docs-master
The Buildbot has detected a restored build on builder flink-docs-master while building . Full details are available at: https://ci.apache.org/builders/flink-docs-master/builds/550 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave2_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered this build Build Source Stamp: [branch master] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
flink git commit: [FLINK-5039] Bump Avro version to 1.7.7.
Repository: flink Updated Branches: refs/heads/release-1.1 671b434cf -> 3ae6e9e09 [FLINK-5039] Bump Avro version to 1.7.7. This closes #2953. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3ae6e9e0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3ae6e9e0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3ae6e9e0 Branch: refs/heads/release-1.1 Commit: 3ae6e9e09ba74f88fe87d1ac130b3cc232a5e88c Parents: 671b434 Author: Robert MetzgerAuthored: Tue Dec 6 21:03:10 2016 +0100 Committer: Fabian Hueske Committed: Thu Dec 8 21:24:15 2016 +0100 -- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3ae6e9e0/pom.xml -- diff --git a/pom.xml b/pom.xml index 4594a9d..bc7bcec 100644 --- a/pom.xml +++ b/pom.xml @@ -213,13 +213,13 @@ under the License. org.apache.avro avro - 1.7.6 + 1.7.7 org.apache.avro avro-ipc - 1.7.6 + 1.7.7
[4/4] flink git commit: [FLINK-3921] Add support to set encoding in CsvReader and StringParser.
[FLINK-3921] Add support to set encoding in CsvReader and StringParser. - extends first commit. This closes #2901. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/41d5875b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/41d5875b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/41d5875b Branch: refs/heads/master Commit: 41d5875bfc272f2cd5c7e8c8523036684865c1ce Parents: f2186af Author: Greg HoganAuthored: Mon Nov 28 12:43:47 2016 -0500 Committer: Fabian Hueske Committed: Thu Dec 8 21:21:48 2016 +0100 -- .../api/common/io/DelimitedInputFormat.java | 67 -- .../api/common/io/GenericCsvInputFormat.java| 79 +--- .../apache/flink/types/parser/FieldParser.java | 18 ++- .../api/common/io/DelimitedInputFormatTest.java | 94 ++ .../common/io/GenericCsvInputFormatTest.java| 125 ++- .../types/parser/VarLengthStringParserTest.java | 12 +- .../org/apache/flink/api/java/io/CsvReader.java | 24 ++-- .../apache/flink/api/java/io/CSVReaderTest.java | 23 ++-- .../flink/api/java/io/CsvInputFormatTest.java | 6 +- .../runtime/io/RowCsvInputFormatTest.scala | 6 +- .../flink/api/scala/io/CsvInputFormatTest.scala | 14 +-- 11 files changed, 273 insertions(+), 195 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/41d5875b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index fd02c82..5c8dfc1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -20,17 +20,18 @@ package org.apache.flink.api.common.io; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.Charset; @@ -56,8 +57,11 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple */ private static final Logger LOG = LoggerFactory.getLogger(DelimitedInputFormat.class); - /** The default charset to convert strings to bytes */ - private static final Charset UTF_8_CHARSET = Charset.forName("UTF-8"); + // The charset used to convert strings to bytes + private String charsetName = "UTF-8"; + + // Charset is not serializable + private transient Charset charset; /** * The default read buffer size = 1MB. @@ -157,9 +161,12 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple // // The configuration parameters. Configured on the instance and serialized to be shipped. // - + + // The delimiter may be set with a byte-sequence or a String. In the latter + // case the byte representation is updated consistent with current charset. private byte[] delimiter = new byte[] {'\n'}; - + private String delimiterString = null; + private int lineLengthLimit = Integer.MAX_VALUE; private int bufferSize = -1; @@ -182,8 +189,42 @@ public abstract class DelimitedInputFormat extends FileInputFormat imple } loadConfigParameters(configuration); } - - + + /** +* Get the character set used for the row delimiter. This is also used by +* subclasses to interpret field delimiters, comment strings, and for +* configuring {@link FieldParser}s. +* +* @return the charset +*/ + @PublicEvolving + public Charset
[2/4] flink git commit: [FLINK-5039] Bump Avro version to 1.7.7.
[FLINK-5039] Bump Avro version to 1.7.7. This closes #2953. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2d8f03e7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2d8f03e7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2d8f03e7 Branch: refs/heads/master Commit: 2d8f03e7ad12af3a0dcb7bec087c25f19a4fd03e Parents: 677d0d9 Author: Robert MetzgerAuthored: Tue Dec 6 21:03:10 2016 +0100 Committer: Fabian Hueske Committed: Thu Dec 8 18:45:55 2016 +0100 -- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2d8f03e7/pom.xml -- diff --git a/pom.xml b/pom.xml index d3ddf92..04ba726 100644 --- a/pom.xml +++ b/pom.xml @@ -220,13 +220,13 @@ under the License. org.apache.avro avro - 1.7.6 + 1.7.7 org.apache.avro avro-ipc - 1.7.6 + 1.7.7
flink git commit: [FLINK-5169] [network] Fix String formats in spillable partition
Repository: flink Updated Branches: refs/heads/release-1.1 b046038ae -> 671b434cf [FLINK-5169] [network] Fix String formats in spillable partition This closes #2967. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/671b434c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/671b434c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/671b434c Branch: refs/heads/release-1.1 Commit: 671b434cf3e09d6edb247f6f84f38b8088fc5c5d Parents: b046038 Author: Boris OsipovAuthored: Thu Dec 8 16:58:21 2016 +0300 Committer: Ufuk Celebi Committed: Thu Dec 8 15:22:08 2016 +0100 -- .../runtime/io/network/partition/SpillableSubpartitionView.java| 2 +- .../runtime/io/network/partition/SpilledSubpartitionView.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/671b434c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 533f95b..df8de54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -206,7 +206,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { public String toString() { boolean hasSpilled = spilledView != null; - return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? {}) of ResultPartition %s", + return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? %b) of ResultPartition %s", parent.index, numBuffers, hasSpilled, http://git-wip-us.apache.org/repos/asf/flink/blob/671b434c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index 7488132..fec0f2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -166,7 +166,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis @Override public String toString() { - return String.format("SpilledSubpartitionView(index: %d, buffers: {}) of ResultPartition %s", + return String.format("SpilledSubpartitionView(index: %d, buffers: %d) of ResultPartition %s", parent.index, numberOfSpilledBuffers, parent.parent.getPartitionId());
flink git commit: [FLINK-5169] [network] Fix String formats in spillable partition
Repository: flink Updated Branches: refs/heads/master 4eb71927b -> 55d60615a [FLINK-5169] [network] Fix String formats in spillable partition This closes #2967. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55d60615 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55d60615 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55d60615 Branch: refs/heads/master Commit: 55d60615a5e3f72135aad7f0ec5a2c1b4485b908 Parents: 4eb7192 Author: Boris OsipovAuthored: Thu Dec 8 16:58:21 2016 +0300 Committer: Ufuk Celebi Committed: Thu Dec 8 15:21:24 2016 +0100 -- .../runtime/io/network/partition/SpillableSubpartitionView.java| 2 +- .../runtime/io/network/partition/SpilledSubpartitionView.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/55d60615/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java index 533f95b..df8de54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java @@ -206,7 +206,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView { public String toString() { boolean hasSpilled = spilledView != null; - return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? {}) of ResultPartition %s", + return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? %b) of ResultPartition %s", parent.index, numBuffers, hasSpilled, http://git-wip-us.apache.org/repos/asf/flink/blob/55d60615/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index 7488132..fec0f2a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -166,7 +166,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis @Override public String toString() { - return String.format("SpilledSubpartitionView(index: %d, buffers: {}) of ResultPartition %s", + return String.format("SpilledSubpartitionView(index: %d, buffers: %d) of ResultPartition %s", parent.index, numberOfSpilledBuffers, parent.parent.getPartitionId());
[3/3] flink git commit: [FLINK-5020] Make the GenericWriteAheadSink rescalable.
[FLINK-5020] Make the GenericWriteAheadSink rescalable. Integrates the new state abstractions with the GenericWriteAheadSink so that the latter can change its parallelism when resuming execution from a savepoint, without geopardizing the provided guarantees. This closes #2759 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4eb71927 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4eb71927 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4eb71927 Branch: refs/heads/master Commit: 4eb71927bc4f0832eb08a79394ad6864a3c2e142 Parents: 86f784a Author: kl0uAuthored: Wed Oct 26 17:19:12 2016 +0200 Committer: zentol Committed: Thu Dec 8 12:27:14 2016 +0100 -- .../cassandra/CassandraConnectorITCase.java | 38 ++-- .../runtime/operators/CheckpointCommitter.java | 1 + .../operators/GenericWriteAheadSink.java| 105 ++- .../operators/GenericWriteAheadSinkTest.java| 50 +++--- .../operators/WriteAheadSinkTestBase.java | 172 +-- 5 files changed, 276 insertions(+), 90 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4eb71927/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java -- diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 2bb6fd1..f2e8f8b 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -47,7 +47,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.TestEnvironment; @@ -71,6 +70,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.Scanner; import java.util.UUID; @@ -262,9 +262,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase > harness, - CassandraTupleWriteAheadSink > sink) { + protected void verifyResultsIdealCircumstances(CassandraTupleWriteAheadSink > sink) { ResultSet result = session.execute(SELECT_DATA_QUERY); ArrayList list = new ArrayList<>(); @@ -279,9 +277,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase > harness, - CassandraTupleWriteAheadSink > sink) { + protected void verifyResultsDataPersistenceUponMissedNotify(CassandraTupleWriteAheadSink > sink) { ResultSet result = session.execute(SELECT_DATA_QUERY); ArrayList list = new ArrayList<>(); @@ -296,9 +292,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase > harness, - CassandraTupleWriteAheadSink > sink) { + protected void verifyResultsDataDiscardingUponRestore(CassandraTupleWriteAheadSink > sink) { ResultSet result = session.execute(SELECT_DATA_QUERY); ArrayList list = new ArrayList<>(); @@ -315,6 +309,30 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase sink, int startElementCounter, int endElementCounter) { + + // IMPORTANT NOTE: + // + // for cassandra we always have to start from 1 because + // all operators will share the same final db + + ArrayList expected = new ArrayList<>(); + for (int i = 1; i <= endElementCounter; i++) { + expected.add(i); + } + + ArrayList actual = new ArrayList<>(); + ResultSet result =
[2/3] flink git commit: [FLINK-5164] Disable some Hadoop-compat tests on Windows
[FLINK-5164] Disable some Hadoop-compat tests on Windows This closes #2889. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe843e13 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe843e13 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe843e13 Branch: refs/heads/master Commit: fe843e1377aa08a10394bbfa67dc9d3b2a23b805 Parents: 4414008 Author: zentolAuthored: Fri Nov 25 14:58:48 2016 +0100 Committer: zentol Committed: Thu Dec 8 12:04:48 2016 +0100 -- .../test/hadoopcompatibility/mapred/HadoopMapredITCase.java | 9 + .../mapreduce/HadoopInputOutputITCase.java | 8 .../flink/test/hadoop/mapred/HadoopIOFormatsITCase.java | 9 + .../flink/test/hadoop/mapred/WordCountMapredITCase.java | 9 + .../test/hadoop/mapreduce/WordCountMapreduceITCase.java | 9 + .../api/scala/hadoop/mapred/WordCountMapredITCase.scala | 8 .../scala/hadoop/mapreduce/WordCountMapreduceITCase.scala | 8 7 files changed, 60 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java -- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java index ccc0d82..0b5a366 100644 --- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapredITCase.java @@ -21,12 +21,21 @@ package org.apache.flink.test.hadoopcompatibility.mapred; import org.apache.flink.test.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.OperatingSystem; +import org.junit.Assume; +import org.junit.Before; public class HadoopMapredITCase extends JavaProgramTestBase { protected String textPath; protected String resultPath; + @Before + public void checkOperatingSystem() { + // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems + Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows()); + } + @Override protected void preSubmit() throws Exception { textPath = createTempFile("text.txt", WordCountData.TEXT); http://git-wip-us.apache.org/repos/asf/flink/blob/fe843e13/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java -- diff --git a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java index 698e356..48aa258 100644 --- a/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java +++ b/flink-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java @@ -21,12 +21,20 @@ package org.apache.flink.test.hadoopcompatibility.mapreduce; import org.apache.flink.test.hadoopcompatibility.mapreduce.example.WordCount; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.JavaProgramTestBase; +import org.apache.flink.util.OperatingSystem; +import org.junit.Assume; +import org.junit.Before; public class HadoopInputOutputITCase extends JavaProgramTestBase { protected String textPath; protected String resultPath; + @Before + public void checkOperatingSystem() { + // FLINK-5164 - see https://wiki.apache.org/hadoop/WindowsProblems + Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows()); + } @Override protected void preSubmit() throws Exception {
[1/3] flink git commit: [FLINK-4563] [metrics] scope caching not adjusted for multiple reporters
Repository: flink Updated Branches: refs/heads/master 441400855 -> 4eb71927b [FLINK-4563] [metrics] scope caching not adjusted for multiple reporters This closes #2650. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86f784a3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86f784a3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86f784a3 Branch: refs/heads/master Commit: 86f784a3613ccd5d78197d94198a64b6f1333578 Parents: fe843e1 Author: Anton MushinAuthored: Mon Oct 17 17:23:01 2016 +0400 Committer: zentol Committed: Thu Dec 8 12:04:48 2016 +0100 -- .../metrics/groups/AbstractMetricGroup.java | 53 .../metrics/groups/AbstractMetricGroupTest.java | 135 +++ 2 files changed, 165 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/86f784a3/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 04b8158..6ff9776 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -85,9 +86,9 @@ public abstract class AbstractMetricGroup> impl * For example ["host-7", "taskmanager-2", "window_word_count", "my-mapper" ]. */ private final String[] scopeComponents; - /** The metrics scope represented by this group, as a concatenated string, lazily computed. + /** Array containing the metrics scope represented by this group for each reporter, as a concatenated string, lazily computed. * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */ - private String scopeString; + private final String[] scopeStrings; /** The logical metrics scope represented by this group, as a concatenated string, lazily computed. * For example: "taskmanager.job.task" */ @@ -105,6 +106,7 @@ public abstract class AbstractMetricGroup> impl this.registry = checkNotNull(registry); this.scopeComponents = checkNotNull(scope); this.parent = parent; + this.scopeStrings = new String[registry.getReporters() == null ? 0 : registry.getReporters().size()]; } public Map getAllVariables() { @@ -210,19 +212,7 @@ public abstract class AbstractMetricGroup> impl * @return fully qualified metric name */ public String getMetricIdentifier(String metricName, CharacterFilter filter) { - if (scopeString == null) { - if (filter != null) { - scopeString = ScopeFormat.concat(filter, registry.getDelimiter(), scopeComponents); - } else { - scopeString = ScopeFormat.concat(registry.getDelimiter(), scopeComponents); - } - } - - if (filter != null) { - return scopeString + registry.getDelimiter() + filter.filterCharacters(metricName); - } else { - return scopeString + registry.getDelimiter() + metricName; - } + return getMetricIdentifier(metricName, filter, -1); } /** @@ -235,12 +225,29 @@ public abstract class AbstractMetricGroup> impl * @return fully qualified metric name */ public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex) { - if (filter != null) { - scopeString = ScopeFormat.concat(filter, registry.getDelimiter(reporterIndex), scopeComponents); - return scopeString + registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName); + if (scopeStrings.length == 0 || (reporterIndex < 0 || reporterIndex >= scopeStrings.length)) { + char delimiter = registry.getDelimiter(); + String newScopeString; + if (filter != null) { + newScopeString = ScopeFormat.concat(filter, delimiter, scopeComponents); + metricName =