flink git commit: [hotfix] [FLINK-3679] Improve Javadocs of DeserializationSchemas

2017-03-08 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/master c39ad31f3 -> 96d24445e


[hotfix] [FLINK-3679] Improve Javadocs of DeserializationSchemas

Javadocs of the `deserialize(...)` method should inform that returning
null from the method is allowed, if the message cannot be deserialized.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/96d24445
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/96d24445
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/96d24445

Branch: refs/heads/master
Commit: 96d24445e3fb9540091a01e0fe34fcb51bc0dd58
Parents: c39ad31
Author: Tzu-Li (Gordon) Tai 
Authored: Thu Mar 9 14:34:53 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Thu Mar 9 14:34:53 2017 +0800

--
 .../util/serialization/KeyedDeserializationSchema.java   | 11 +++
 .../util/serialization/DeserializationSchema.java|  3 ++-
 2 files changed, 9 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/96d24445/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
index 01e72ca..b5a33bc 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -34,10 +34,12 @@ public interface KeyedDeserializationSchema extends 
Serializable, ResultTypeQ
/**
 * Deserializes the byte message.
 *
-* @param messageKey the key as a byte array (null if no key has been 
set)
-* @param message The message, as a byte array. (null if the message 
was empty or deleted)
-* @param partition The partition the message has originated from
-* @param offset the offset of the message in the original source (for 
example the Kafka offset)  @return The deserialized message as an object.
+* @param messageKey the key as a byte array (null if no key has been 
set).
+* @param message The message, as a byte array (null if the message was 
empty or deleted).
+* @param partition The partition the message has originated from.
+* @param offset the offset of the message in the original source (for 
example the Kafka offset).
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
@@ -46,6 +48,7 @@ public interface KeyedDeserializationSchema extends 
Serializable, ResultTypeQ
 * true is returned the element won't be emitted.
 * 
 * @param nextElement The element to test for the end-of-stream signal.
+*
 * @return True, if the element signals end of stream, false otherwise.
 */
boolean isEndOfStream(T nextElement);

http://git-wip-us.apache.org/repos/asf/flink/blob/96d24445/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index 2e27ba6..03cab20 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -40,7 +40,8 @@ public interface DeserializationSchema extends 
Serializable, ResultTypeQuerya
 * Deserializes the byte message.
 * 
 * @param message The message, as a byte array.
-* @return The deserialized message as an object.
+*
+* @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
T deserialize(byte[] message) throws IOException;
 



[2/2] flink git commit: [FLINK-3679] [kafka] Improve null record handling for FlinkKafkaConsumer

2017-03-08 Thread tzulitai
[FLINK-3679] [kafka] Improve null record handling for FlinkKafkaConsumer

This commit generally improves null record handling by:
 - also update the offset in state holders if record is null
 - move null record related tests to AbstractFetcherTest, so that
   behaviour is tested for all fetcher implementations
 - let the docs be more informative of the behaviour of the consumer
   when corrupted messages are encountered.

This closes #3314.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c39ad31f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c39ad31f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c39ad31f

Branch: refs/heads/master
Commit: c39ad31f3c321a1803abc97b2ba3074561d9af6e
Parents: afb4c5e
Author: Tzu-Li (Gordon) Tai 
Authored: Thu Mar 9 01:28:08 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Thu Mar 9 14:05:38 2017 +0800

--
 docs/dev/connectors/kafka.md|  14 +-
 .../connectors/kafka/Kafka09FetcherTest.java|  84 
 .../kafka/internals/AbstractFetcher.java|  57 ++-
 .../kafka/internals/AbstractFetcherTest.java| 461 +++
 .../AbstractFetcherTimestampsTest.java  | 335 --
 5 files changed, 503 insertions(+), 448 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/docs/dev/connectors/kafka.md
--
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 331c9c7..06e40b2 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -146,10 +146,6 @@ The Flink Kafka Consumer needs to know how to turn the 
binary data in Kafka into
 `DeserializationSchema` allows users to specify such a schema. The `T 
deserialize(byte[] message)`
 method gets called for each Kafka message, passing the value from Kafka.
 
-There are two possible design choices when the `DeserializationSchema` 
encounters a corrupted message. It can
-either throw an `IOException` which causes the pipeline to be restarted, or it 
can return `null` where the Flink
-Kafka consumer will silently skip the corrupted message.
-
 It is usually helpful to start from the `AbstractDeserializationSchema`, which 
takes care of describing the
 produced Java/Scala type to Flink's type system. Users that implement a 
vanilla `DeserializationSchema` need
 to implement the `getProducedType(...)` method themselves.
@@ -167,6 +163,16 @@ For convenience, Flink provides the following schemas:
 into an ObjectNode object, from which fields can be accessed using 
objectNode.get("field").as(Int/String/...)().
 The KeyValue objectNode contains a "key" and "value" field which contain 
all fields, as well as
 an optional "metadata" field that exposes the offset/partition/topic for 
this message.
+
+When encountering a corrupted message that cannot be deserialized for any 
reason, there
+are two options - either throwing an exception from the `deserialize(...)` 
method
+which will cause the job to fail and be restarted, or returning `null` to allow
+the Flink Kafka consumer to silently skip the corrupted message. Note that
+due to the consumer's fault tolerance (see below sections for more details),
+failing the job on the corrupted message will let the consumer attempt
+to deserialize the message again. Therefore, if deserialization still fails, 
the
+consumer will fall into a non-stop restart and fail loop on that corrupted
+message.
 
 ### Kafka Consumers Start Position Configuration
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c39ad31f/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 61a8855..49144e6 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -31,7 +29,6 @@ 

[1/2] flink git commit: [FLINK-3679] [kafka] Allow Kafka consumer to skip corrupted messages

2017-03-08 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/master adbf846f2 -> c39ad31f3


[FLINK-3679] [kafka] Allow Kafka consumer to skip corrupted messages


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afb4c5e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afb4c5e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afb4c5e0

Branch: refs/heads/master
Commit: afb4c5e02c513a82d2ad7f7816065fdd93665e0e
Parents: adbf846
Author: Haohui Mai 
Authored: Thu Mar 2 13:33:13 2017 -0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Thu Mar 9 14:05:37 2017 +0800

--
 docs/dev/connectors/kafka.md|  4 +
 .../connectors/kafka/Kafka09FetcherTest.java| 84 
 .../kafka/internals/AbstractFetcher.java|  4 +
 3 files changed, 92 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/docs/dev/connectors/kafka.md
--
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 0f700ab..331c9c7 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -146,6 +146,10 @@ The Flink Kafka Consumer needs to know how to turn the 
binary data in Kafka into
 `DeserializationSchema` allows users to specify such a schema. The `T 
deserialize(byte[] message)`
 method gets called for each Kafka message, passing the value from Kafka.
 
+There are two possible design choices when the `DeserializationSchema` 
encounters a corrupted message. It can
+either throw an `IOException` which causes the pipeline to be restarted, or it 
can return `null` where the Flink
+Kafka consumer will silently skip the corrupted message.
+
 It is usually helpful to start from the `AbstractDeserializationSchema`, which 
takes care of describing the
 produced Java/Scala type to Flink's type system. Users that implement a 
vanilla `DeserializationSchema` need
 to implement the `getProducedType(...)` method themselves.

http://git-wip-us.apache.org/repos/asf/flink/blob/afb4c5e0/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index 49144e6..61a8855 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -29,6 +31,7 @@ import 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.CollectingSourceContext;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -49,6 +52,8 @@ import org.mockito.stubbing.Answer;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -419,6 +424,85 @@ public class Kafka09FetcherTest {
assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
}
 
+   @Test
+   public void testSkipCorruptedMessage() throws Exception {
+
+   // - some test data -
+
+   final String topic = "test-topic";
+   final int partition = 3;
+   final byte[] payload = new byte[] {1, 2, 3, 4};
+
+   final List> records = 
Arrays.asList(
+   new 

Build failed in Jenkins: flink-snapshot-deployment #398

2017-03-08 Thread Apache Jenkins Server
See 


--
[...truncated 170.16 KB...]
[ERROR] public FSDataOutputStream create(Path f, boolean overwrite) throws 
IOException {
[ERROR] ^
[ERROR] 
:649:
 warning: no description for @throws
[ERROR] * @throws IOException
[ERROR] ^
[ERROR] 
:52:
 warning: no @param for inputSplits
[ERROR] InputSplitAssigner getInputSplitAssigner(T[] inputSplits);
[ERROR] ^
[ERROR] 
:33:
 warning: no @return
[ERROR] int getVersion();
[ERROR] ^
[ERROR] 
:55:
 warning: no description for @throws
[ERROR] * @throws IOException
[ERROR] ^
[ERROR] 
:65:
 warning: no description for @throws
[ERROR] * @throws IOException
[ERROR] ^
[ERROR] 
:338:
 error: no tag name after @
[ERROR] * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this 
configuration prefix allows
[ERROR] ^
[ERROR] 
:984:
 error: malformed HTML
[ERROR] * This key was used in Flink versions <= 1.1.X with the savepoint 
backend
[ERROR] ^
[ERROR] 
:420:
 error: no tag name after @
[ERROR] * Similar to the {@see YARN_APPLICATION_MASTER_ENV_PREFIX}, this 
configuration prefix allows
[ERROR] ^
[ERROR] 
:95:
 warning: no @throws for java.lang.ClassNotFoundException
[ERROR] public  Class getClass(String key, Class 
defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
[ERROR] ^
[ERROR] 
:160:
 warning: no @param for overrideDefault
[ERROR] public String getString(ConfigOption configOption, String 
overrideDefault) {
[ERROR] ^
[ERROR] 
:537:
 warning: no @param for props
[ERROR] public void addAllToProperties(Properties props) {
[ERROR] ^
[ERROR] 
:54:
 warning: no @param for dynamicProperties
[ERROR] public static void setDynamicProperties(Configuration 
dynamicProperties) {
[ERROR] ^
[ERROR] 
:61:
 warning: no @return
[ERROR] public static Configuration getDynamicProperties() {
[ERROR] ^
[ERROR] 
:90:
 warning: no @return
[ERROR] public static Configuration loadConfiguration(final String configDir) {
[ERROR] ^
[ERROR] 
:59:
 error: bad use of '>'
[ERROR] /** Number of network (event loop) threads for the KvState client (0 => 
Use number of available cores). */
[ERROR] ^
[ERROR] 
:50:
 error: bad use of '>'
[ERROR] /** Number of async query threads for the KvStateServerHandler (0 => 
#slots). */
[ERROR] ^
[ERROR] 
:45:
 error: bad use of '>'
[ERROR] /** Number of network (event loop) threads for the KvState server (0 => 
#slots). */
[ERROR] ^
[ERROR] 
:40:
 error: bad use of '>'
[ERROR] /** Port to bind KvState server to (0 

buildbot failure in on flink-docs-release-1.2

2017-03-08 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-release-1.2 while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-1.2/builds/52

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave1_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.2' 
triggered this build
Build Source Stamp: [branch release-1.2] HEAD
Blamelist: 

BUILD FAILED: failed

Sincerely,
 -The Buildbot





flink git commit: [FLINK-5983] [table] Convert FOR into WHILE loops for aggregation functions.

2017-03-08 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 31a57c5a8 -> adbf846f2


[FLINK-5983] [table] Convert FOR into WHILE loops for aggregation functions.

This closes #3489.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/adbf846f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/adbf846f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/adbf846f

Branch: refs/heads/master
Commit: adbf846f23881b98ab9dc5886a0b066b8aa1ded6
Parents: 31a57c5
Author: Fabian Hueske 
Authored: Tue Mar 7 22:17:54 2017 +0100
Committer: twalthr 
Committed: Wed Mar 8 17:59:31 2017 +0100

--
 .../aggregate/AggregateAggFunction.scala| 29 ++-
 .../aggregate/AggregateMapFunction.scala| 17 ---
 .../AggregateReduceCombineFunction.scala| 16 --
 .../AggregateReduceGroupFunction.scala  | 52 
 ...SetSessionWindowAggReduceGroupFunction.scala | 36 +-
 ...aSetSessionWindowAggregatePreProcessor.scala | 27 +++---
 ...umbleCountWindowAggReduceGroupFunction.scala | 32 +++-
 ...mbleTimeWindowAggReduceCombineFunction.scala | 16 --
 ...TumbleTimeWindowAggReduceGroupFunction.scala | 33 -
 .../aggregate/DataSetWindowAggMapFunction.scala | 23 +
 .../IncrementalAggregateAllWindowFunction.scala |  2 +-
 .../IncrementalAggregateWindowFunction.scala|  8 ++-
 ...UnboundedProcessingOverProcessFunction.scala |  6 ++-
 13 files changed, 193 insertions(+), 104 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
index 4d1579b..11d55e5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
@@ -36,43 +36,50 @@ class AggregateAggFunction(
 private val aggFields: Array[Int])
   extends DataStreamAggFunc[Row, Row, Row] {
 
-  val aggsWithIdx: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex
-
   override def createAccumulator(): Row = {
 val accumulatorRow: Row = new Row(aggregates.length)
-aggsWithIdx.foreach { case (agg, i) =>
-  accumulatorRow.setField(i, agg.createAccumulator())
+var i = 0
+while (i < aggregates.length) {
+  accumulatorRow.setField(i, aggregates(i).createAccumulator())
+  i += 1
 }
 accumulatorRow
   }
 
-  override def add(value: Row, accumulatorRow: Row) = {
+  override def add(value: Row, accumulatorRow: Row): Unit = {
 
-aggsWithIdx.foreach { case (agg, i) =>
+var i = 0
+while (i < aggregates.length) {
   val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator]
   val v = value.getField(aggFields(i))
-  agg.accumulate(acc, v)
+  aggregates(i).accumulate(acc, v)
+  i += 1
 }
   }
 
   override def getResult(accumulatorRow: Row): Row = {
 val output = new Row(aggFields.length)
 
-aggsWithIdx.foreach { case (agg, i) =>
-  output.setField(i, 
agg.getValue(accumulatorRow.getField(i).asInstanceOf[Accumulator]))
+var i = 0
+while (i < aggregates.length) {
+  val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator]
+  output.setField(i, aggregates(i).getValue(acc))
+  i += 1
 }
 output
   }
 
   override def merge(aAccumulatorRow: Row, bAccumulatorRow: Row): Row = {
 
-aggsWithIdx.foreach { case (agg, i) =>
+var i = 0
+while (i < aggregates.length) {
   val aAcc = aAccumulatorRow.getField(i).asInstanceOf[Accumulator]
   val bAcc = bAccumulatorRow.getField(i).asInstanceOf[Accumulator]
   val accumulators: JList[Accumulator] = new JArrayList[Accumulator]()
   accumulators.add(aAcc)
   accumulators.add(bAcc)
-  aAccumulatorRow.setField(i, agg.merge(accumulators))
+  aAccumulatorRow.setField(i, aggregates(i).merge(accumulators))
+  i += 1
 }
 aAccumulatorRow
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/adbf846f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateMapFunction.scala
 

[2/2] flink git commit: [FLINK-5047] [table] Add sliding group-windows for batch tables

2017-03-08 Thread twalthr
[FLINK-5047] [table] Add sliding group-windows for batch tables

This closes #3364.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31a57c5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31a57c5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31a57c5a

Branch: refs/heads/master
Commit: 31a57c5a89d6d22ccb629c2adfe4ffb87441e6dd
Parents: bec818d
Author: twalthr 
Authored: Wed Jan 18 16:56:02 2017 +0100
Committer: twalthr 
Committed: Wed Mar 8 17:01:27 2017 +0100

--
 .../table/functions/AggregateFunction.scala |   8 +-
 .../nodes/dataset/DataSetWindowAggregate.scala  | 117 -
 .../table/runtime/aggregate/AggregateUtil.scala | 236 ---
 ...SetSessionWindowAggReduceGroupFunction.scala | 201 
 ...sionWindowAggregateReduceGroupFunction.scala | 201 
 ...taSetSlideTimeWindowAggFlatMapFunction.scala |  63 +
 ...tSlideTimeWindowAggReduceGroupFunction.scala | 202 
 ...SetSlideWindowAggReduceCombineFunction.scala | 117 +
 ...taSetSlideWindowAggReduceGroupFunction.scala | 141 +++
 ...umbleCountWindowAggReduceGroupFunction.scala |   3 -
 ...TumbleTimeWindowAggReduceGroupFunction.scala |   3 +-
 .../aggregate/DataSetWindowAggMapFunction.scala | 112 +
 .../DataSetWindowAggregateMapFunction.scala | 111 -
 .../IncrementalAggregateAllWindowFunction.scala |   7 +-
 .../scala/stream/table/AggregationsITCase.scala |  43 +---
 .../dataset/DataSetWindowAggregateITCase.scala  | 163 -
 .../datastream/DataStreamAggregateITCase.scala  | 235 ++
 17 files changed, 1566 insertions(+), 397 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
index 967d2ea..773c71f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala
@@ -61,7 +61,7 @@ abstract class AggregateFunction[T] extends 
UserDefinedFunction {
   def getValue(accumulator: Accumulator): T
 
   /**
-* Process the input values and update the provided accumulator instance.
+* Processes the input values and update the provided accumulator instance.
 *
 * @param accumulator the accumulator which contains the current
 *aggregated results
@@ -70,9 +70,9 @@ abstract class AggregateFunction[T] extends 
UserDefinedFunction {
   def accumulate(accumulator: Accumulator, input: Any): Unit
 
   /**
-* Merge a list of accumulator instances into one accumulator instance.
+* Merges a list of accumulator instances into one accumulator instance.
 *
-* IMPORTANT: You may only return a new accumulator instance or the the 
first accumulator of the
+* IMPORTANT: You may only return a new accumulator instance or the first 
accumulator of the
 * input list. If you return another instance, the result of the 
aggregation function might be
 * incorrect.
 *
@@ -88,7 +88,7 @@ abstract class AggregateFunction[T] extends 
UserDefinedFunction {
 *
 * @return The type information for the accumulator.
 */
-  def getAccumulatorType(): TypeInformation[_] = null
+  def getAccumulatorType: TypeInformation[_] = null
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index fb5ff3b..a94deb1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -111,17 +111,25 @@ class DataSetWindowAggregate(
 
 // whether identifiers are matched case-sensitively
 val caseSensitive = 
tableEnv.getFrameworkConfig.getParserConfig.caseSensitive()
+
 window match {
   case EventTimeTumblingGroupWindow(_, _, size) =>
 

[1/2] flink git commit: [FLINK-5047] [table] Add sliding group-windows for batch tables

2017-03-08 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master bec818d84 -> 31a57c5a8


http://git-wip-us.apache.org/repos/asf/flink/blob/31a57c5a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
index d57f4f7..77ea66e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
@@ -21,17 +21,16 @@ package org.apache.flink.table.runtime.dataset
 import java.math.BigDecimal
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
 import org.apache.flink.table.api.scala._
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 
 import scala.collection.JavaConverters._
 
@@ -197,4 +196,162 @@ class DataSetWindowAggregateITCase(
   .toDataSet[Row]
   }
 
+  // 
--
+  // Sliding windows
+  // 
--
+
+  @Test(expected = classOf[UnsupportedOperationException])
+  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+val table = env
+  .fromCollection(data)
+  .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+// Count sliding group window on event-time are currently not supported
+table
+  .window(Slide over 2.rows every 2.rows on 'long as 'w)
+  .groupBy('w)
+  .select('int.count)
+  .toDataSet[Row]
+  }
+
+  @Test
+  def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+// please keep this test in sync with the DataStream variant
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env
+  .fromCollection(data)
+  .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+val windowedTable = table
+  .window(Slide over 5.milli every 2.milli on 'long as 'w)
+  .groupBy('w)
+  .select('int.count, 'w.start, 'w.end)
+
+val expected =
+  "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013\n" +
+  "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017\n" +
+  "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019\n" +
+  "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021\n" +
+  "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003\n" +
+  "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011\n" +
+  "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007\n" +
+  "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009\n" +
+  "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005"
+
+val results = windowedTable.toDataSet[Row].collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
+// please keep this test in sync with the DataStream variant
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val table = env
+  .fromCollection(data)
+  .toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string)
+
+val windowedTable = table
+  .window(Slide over 10.milli every 5.milli on 'long as 'w)
+  .groupBy('string, 'w)
+  .select('string, 'int.count, 'w.start, 'w.end)
+
+val expected =
+  "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005\n" +
+  "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+  "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01\n" +
+  "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015\n" +
+  "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02\n" +
+  "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025\n" +
+  "Hello,1,1970-01-01 

flink git commit: [FLINK-5414] [table] Bump up Calcite version to 1.11. (Jark Wu and Haohui Mai)

2017-03-08 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 121b12b7c -> bec818d84


[FLINK-5414] [table] Bump up Calcite version to 1.11. (Jark Wu and Haohui Mai)

This closes #3338.
This closes #3426.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bec818d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bec818d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bec818d8

Branch: refs/heads/master
Commit: bec818d84a65a812290d49bca9cfd62de7379b1e
Parents: 121b12b
Author: Haohui Mai 
Authored: Mon Feb 27 14:24:08 2017 -0800
Committer: twalthr 
Committed: Wed Mar 8 16:27:55 2017 +0100

--
 flink-libraries/flink-table/pom.xml |  2 +-
 .../flink/table/calcite/FlinkTypeFactory.scala  | 21 +-
 .../flink/table/codegen/ExpressionReducer.scala | 10 -
 .../functions/utils/ScalarSqlFunction.scala |  3 +-
 .../flink/table/plan/ProjectionTranslator.scala | 40 ++--
 .../flink/table/plan/nodes/FlinkRel.scala   |  6 ++-
 .../scala/batch/table/FieldProjectionTest.scala |  8 ++--
 .../scala/stream/sql/WindowAggregateTest.scala  |  8 ++--
 .../scala/stream/table/GroupWindowTest.scala|  8 ++--
 .../table/expressions/ScalarFunctionsTest.scala | 10 ++---
 10 files changed, 75 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/pom.xml
--
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index 428b947..b26fe54 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -51,7 +51,7 @@ under the License.

org.apache.calcite
calcite-core
-   1.9.0
+   1.11.0



org.apache.calcite.avatica

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 251be14..22a5c9f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -131,15 +131,18 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   }
 
   override def createTypeWithNullability(
-relDataType: RelDataType,
-nullable: Boolean)
-  : RelDataType = relDataType match {
-case composite: CompositeRelDataType =>
-  // at the moment we do not care about nullability
-  composite
-case _ =>
-  super.createTypeWithNullability(relDataType, nullable)
-  }
+  relDataType: RelDataType,
+  nullable: Boolean)
+: RelDataType = relDataType match {
+  case composite: CompositeRelDataType =>
+// at the moment we do not care about nullability
+canonize(composite)
+  case array: ArrayRelDataType =>
+val elementType = createTypeWithNullability(array.getComponentType, 
nullable)
+canonize(new ArrayRelDataType(array.typeInfo, elementType, nullable))
+  case _ =>
+super.createTypeWithNullability(relDataType, nullable)
+}
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index 0f1de21..3fcbdc1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -106,8 +106,16 @@ class ExpressionReducer(config: TableConfig)
 case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY =>
   reducedValues.add(unreduced)
 case _ =>
+  val reducedValue = reduced.getField(reducedIdx)
+  // RexBuilder handle double literal incorrectly, convert it into 
BigDecimal manually
+  val value = if (unreduced.getType.getSqlTypeName == 

[3/5] flink git commit: [FLINK-3427] [webui] Rebuild web UI

2017-03-08 Thread uce
[FLINK-3427] [webui] Rebuild web UI

This closes #3366.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/121b12b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/121b12b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/121b12b7

Branch: refs/heads/master
Commit: 121b12b7c7156a5b45f2cb9e069b4ec685d33364
Parents: 4ef18f6
Author: Ufuk Celebi 
Authored: Wed Mar 8 11:36:35 2017 +0100
Committer: Ufuk Celebi 
Committed: Wed Mar 8 15:28:41 2017 +0100

--
 flink-runtime-web/web-dashboard/web/js/index.js |  4 +-
 .../web-dashboard/web/js/vendor.js  | 10 ++---
 .../web/partials/jobs/job.plan.html |  3 +-
 .../jobs/job.plan.node-list.watermarks.html | 45 
 .../partials/jobs/job.plan.node.watermarks.html | 33 ++
 5 files changed, 87 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/121b12b7/flink-runtime-web/web-dashboard/web/js/index.js
--
diff --git a/flink-runtime-web/web-dashboard/web/js/index.js 
b/flink-runtime-web/web-dashboard/web/js/index.js
index e4b76aa..cb9fd6c 100644
--- a/flink-runtime-web/web-dashboard/web/js/index.js
+++ b/flink-runtime-web/web-dashboard/web/js/index.js
@@ -1,2 +1,2 @@
-angular.module("flinkApp",["ui.router","angularMoment","dndLists"]).run(["$rootScope",function(e){return
 e.sidebarVisible=!1,e.showSidebar=function(){return 
e.sidebarVisible=!e.sidebarVisible,e.sidebarClass="force-show"}}]).value("flinkConfig",{jobServer:"","refresh-interval":1e4}).run(["JobsService","MainService","flinkConfig","$interval",function(e,t,r,n){return
 t.loadConfig().then(function(t){return 
angular.extend(r,t),e.listJobs(),n(function(){return 
e.listJobs()},r["refresh-interval"])})}]).config(["$uiViewScrollProvider",function(e){return
 e.useAnchorScroll()}]).run(["$rootScope","$state",function(e,t){return 
e.$on("$stateChangeStart",function(e,r,n,o){if(r.redirectTo)return 
e.preventDefault(),t.go(r.redirectTo,n)})}]).config(["$stateProvider","$urlRouterProvider",function(e,t){return
 
e.state("overview",{url:"/overview",views:{main:{templateUrl:"partials/overview.html",controller:"OverviewController"}}}).state("running-jobs",{url:"/running-jobs",views:{main:{templateUrl:"parti
 
als/jobs/running-jobs.html",controller:"RunningJobsController"}}}).state("completed-jobs",{url:"/completed-jobs",views:{main:{templateUrl:"partials/jobs/completed-jobs.html",controller:"CompletedJobsController"}}}).state("single-job",{url:"/jobs/{jobid}","abstract":!0,views:{main:{templateUrl:"partials/jobs/job.html",controller:"SingleJobController"}}}).state("single-job.plan",{url:"",redirectTo:"single-job.plan.subtasks",views:{details:{templateUrl:"partials/jobs/job.plan.html",controller:"JobPlanController"}}}).state("single-job.plan.subtasks",{url:"",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.subtasks.html",controller:"JobPlanSubtasksController"}}}).state("single-job.plan.metrics",{url:"/metrics",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.metrics.html",controller:"JobPlanMetricsController"}}}).state("single-job.plan.taskmanagers",{url:"/taskmanagers",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.taskmanage
 
rs.html",controller:"JobPlanTaskManagersController"}}}).state("single-job.plan.accumulators",{url:"/accumulators",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.accumulators.html",controller:"JobPlanAccumulatorsController"}}}).state("single-job.plan.checkpoints",{url:"/checkpoints",redirectTo:"single-job.plan.checkpoints.overview",views:{"node-details":{templateUrl:"partials/jobs/job.plan.node-list.checkpoints.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.overview",{url:"/overview",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.overview.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.summary",{url:"/summary",views:{"checkpoints-view":{templateUrl:"partials/jobs/job.plan.node.checkpoints.summary.html",controller:"JobPlanCheckpointsController"}}}).state("single-job.plan.checkpoints.history",{url:"/history",views:{"checkpoints-view":{templateUrl:"partia
 

[1/5] flink git commit: [FLINK-3427] [webui] Rebuild web UI

2017-03-08 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 7a629fc59 -> 121b12b7c


http://git-wip-us.apache.org/repos/asf/flink/blob/121b12b7/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
--
diff --git a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
index 6d88b23..c52d67f 100644
--- a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
+++ b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.html
@@ -20,7 +20,7 @@ limitations under the License.
 
   
 
-  
+  
 
   
   
@@ -30,6 +30,7 @@ limitations under the License.
   Subtasks
   TaskManagers
   Metrics
+  Watermarks
   Accumulators
   Checkpoints
   Back Pressure

http://git-wip-us.apache.org/repos/asf/flink/blob/121b12b7/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.watermarks.html
--
diff --git 
a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.watermarks.html
 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.watermarks.html
new file mode 100644
index 000..9a8095a
--- /dev/null
+++ 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node-list.watermarks.html
@@ -0,0 +1,45 @@
+
+
+
+  
+
+  Name
+  Low Watermark
+  Parallelism
+  Status
+
+  
+  
+
+  {{ v.name | humanizeText }}
+  {{ watermarks[v.id]["lowWatermark"] | humanizeWatermark }}
+  {{ v.parallelism }}
+  
+{{v.status}}
+  
+
+
+  
+
+No Watermarks
+  
+
+  
+
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/121b12b7/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.watermarks.html
--
diff --git 
a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.watermarks.html
 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.watermarks.html
new file mode 100644
index 000..f1530a5
--- /dev/null
+++ 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.watermarks.html
@@ -0,0 +1,33 @@
+
+
+
+  
+
+  Subtask
+  Watermark
+
+  
+  
+
+  {{ subtaskIndex | increment }}
+  {{ watermark | humanizeWatermark }}
+
+  
+
\ No newline at end of file



[5/5] flink git commit: [FLINK-3427] [webui] Add watermark tracking

2017-03-08 Thread uce
[FLINK-3427] [webui] Add watermark tracking


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d84b65ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d84b65ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d84b65ff

Branch: refs/heads/master
Commit: d84b65ff15876feb3e26dd20beb2e743968502bc
Parents: 7a629fc
Author: Ufuk Celebi 
Authored: Wed Mar 8 11:34:29 2017 +0100
Committer: Ufuk Celebi 
Committed: Wed Mar 8 15:28:41 2017 +0100

--
 .../app/partials/jobs/job.plan.jade |  5 +-
 .../jobs/job.plan.node-list.watermarks.jade | 36 
 .../partials/jobs/job.plan.node.watermarks.jade | 27 ++
 .../app/scripts/common/filters.coffee   | 17 
 .../web-dashboard/app/scripts/index.coffee  | 15 +++-
 .../app/scripts/modules/jobs/jobs.ctrl.coffee   | 68 +--
 .../app/scripts/modules/jobs/jobs.dir.coffee| 90 ++--
 7 files changed, 221 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
--
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
index e84dd04..c33b9a3 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
@@ -18,7 +18,7 @@
 split
   .split#canvas
 .canvas-wrapper
-  div.main-canvas(job-plan, plan="plan", jobid="{{jobid}}", 
set-node="changeNode(nodeid)")
+  div.main-canvas(job-plan, plan="plan", low-watermarks="lowWatermarks" 
jobid="{{jobid}}", set-node="changeNode(nodeid)")
 
   .split#job-panel
 .panel.panel-default.panel-multi(ng-if="plan")
@@ -34,6 +34,9 @@ split
 a(ui-sref=".metrics({nodeid: nodeid})") Metrics
 
   li(ui-sref-active='active')
+a(ui-sref=".watermarks({nodeid: nodeid})") Watermarks
+
+  li(ui-sref-active='active')
 a(ui-sref=".accumulators({nodeid: nodeid})") Accumulators
 
   li(ui-sref-active='active')

http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
--
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
new file mode 100644
index 000..6b4c6a2
--- /dev/null
+++ 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
@@ -0,0 +1,36 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+table.table.table-body-hover.table-clickable.table-activable
+  thead
+tr
+  th Name
+  th Low Watermark
+  th Parallelism
+  th Status
+
+  tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid && 
hasWatermarks(nodeid) }" ng-click="changeNode(v.id)")
+tr(ng-if="v.type == 'regular'")
+
+  td.td-long {{ v.name | humanizeText }}
+  td {{ watermarks | lowWatermark:v.id }}
+  td {{ v.parallelism }}
+  td 
+bs-label(status="{{v.status}}") {{v.status}}
+tr(ng-if="nodeid && v.id == nodeid && hasWatermarks(nodeid)")
+  td(colspan="11")
+div(ng-include=" 'partials/jobs/job.plan.node.watermarks.html' ")

http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
--
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
new file mode 100644
index 000..b406a1c
--- /dev/null
+++ 

[4/5] flink git commit: [FLINK-3427] [webui] Refactorings to watermark tracking

2017-03-08 Thread uce
[FLINK-3427] [webui] Refactorings to watermark tracking


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ef18f65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ef18f65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ef18f65

Branch: refs/heads/master
Commit: 4ef18f6597bfde99733f8d4f4a54b90fc943c663
Parents: d84b65f
Author: Ufuk Celebi 
Authored: Tue Mar 7 11:36:50 2017 +0100
Committer: Ufuk Celebi 
Committed: Wed Mar 8 15:28:41 2017 +0100

--
 .../app/partials/jobs/job.plan.jade |  2 +-
 .../jobs/job.plan.node-list.watermarks.jade | 14 +--
 .../partials/jobs/job.plan.node.watermarks.jade | 10 +-
 .../app/scripts/common/filters.coffee   | 16 +---
 .../web-dashboard/app/scripts/index.coffee  |  9 +-
 .../app/scripts/modules/jobs/jobs.ctrl.coffee   | 98 
 .../app/scripts/modules/jobs/jobs.dir.coffee| 17 ++--
 7 files changed, 93 insertions(+), 73 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
--
diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
index c33b9a3..6c4cf0b 100644
--- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade
@@ -18,7 +18,7 @@
 split
   .split#canvas
 .canvas-wrapper
-  div.main-canvas(job-plan, plan="plan", low-watermarks="lowWatermarks" 
jobid="{{jobid}}", set-node="changeNode(nodeid)")
+  div.main-canvas(job-plan, plan="plan", watermarks="watermarks" 
jobid="{{jobid}}", set-node="changeNode(nodeid)")
 
   .split#job-panel
 .panel.panel-default.panel-multi(ng-if="plan")

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
--
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
index 6b4c6a2..4605b61 100644
--- 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
+++ 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade
@@ -23,14 +23,14 @@ table.table.table-body-hover.table-clickable.table-activable
   th Parallelism
   th Status
 
-  tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid && 
hasWatermarks(nodeid) }" ng-click="changeNode(v.id)")
+  tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" 
ng-click="changeNode(v.id)")
 tr(ng-if="v.type == 'regular'")
-
   td.td-long {{ v.name | humanizeText }}
-  td {{ watermarks | lowWatermark:v.id }}
+  td {{ watermarks[v.id]["lowWatermark"] | humanizeWatermark }}
   td {{ v.parallelism }}
-  td 
+  td
 bs-label(status="{{v.status}}") {{v.status}}
-tr(ng-if="nodeid && v.id == nodeid && hasWatermarks(nodeid)")
-  td(colspan="11")
-div(ng-include=" 'partials/jobs/job.plan.node.watermarks.html' ")
+tr(ng-if="nodeid && v.id == nodeid")
+  td(colspan="4")
+div(ng-show="hasWatermark(v.id)" ng-include=" 
'partials/jobs/job.plan.node.watermarks.html' ")
+div(ng-show="!hasWatermark(v.id)") No Watermarks

http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
--
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
index b406a1c..451ccaa 100644
--- 
a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
+++ 
b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade
@@ -15,13 +15,13 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 
-table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="hasWatermarks(nodeid)")
+table.table.table-hover.table-clickable.table-activable.table-inner
   thead
 tr
-  th id
+  th Subtask
   th Watermark
 
   tbody
-tr(ng-repeat="watermark in watermarksByNode(nodeid)")
-  td {{ watermark.id }}
-  td {{ watermark.value | parseWatermark }}
+tr(ng-repeat="(subtaskIndex, watermark) in 
watermarks[nodeid]['watermarks']")
+  td {{ subtaskIndex | increment }}
+  

[2/5] flink git commit: [FLINK-3427] [webui] Rebuild web UI

2017-03-08 Thread uce
http://git-wip-us.apache.org/repos/asf/flink/blob/121b12b7/flink-runtime-web/web-dashboard/web/js/vendor.js
--
diff --git a/flink-runtime-web/web-dashboard/web/js/vendor.js 
b/flink-runtime-web/web-dashboard/web/js/vendor.js
index 135358e..ee2e3e3 100644
--- a/flink-runtime-web/web-dashboard/web/js/vendor.js
+++ b/flink-runtime-web/web-dashboard/web/js/vendor.js
@@ -22,9 +22,9 @@ n.left=void 
0!==t.left?t.left:n.left}},color:{get:function(){return s},set:funct
 
g.scale(o)._ticks(t.utils.calcTicksY(Z/36,w)).tickSize(-X,0),v.scale(a)._ticks(t.utils.calcTicksY(Z/36,w)),R?v.tickSize(tt.length?0:-X,0):v.tickSize(J.length?0:-X,0);var
 u=J.length?1:0,c=tt.length&&!Y(tt)?1:0,d=R?c:u,p=R?u:c;ot.select(".nv-focus 
.nv-y1.nv-axis").style("opacity",d),ot.select(".nv-focus 
.nv-y2.nv-axis").style("opacity",p).attr("transform","translate("+r.range()[1]+",0)"),ot.select(".nv-focus
 .nv-y1.nv-axis").transition().duration(P).call(g),ot.select(".nv-focus 
.nv-y2.nv-axis").transition().duration(P).call(v)}var 
G=d3.select(this);t.utils.initSVG(G);var 
X=t.utils.availableWidth(_,G,$),Z=t.utils.availableHeight(M,G,$)-(T?O:0),Q=O-k.top-k.bottom;if(e.update=function(){G.transition().duration(P).call(e)},e.container=this,F.setter(U(w),e.update).getter(H(w)).update(),F.disabled=w.map(function(t){return!!t.disabled}),!q){var
 K;q={};for(K in F)F[K]instanceof 
Array?q[K]=F[K].slice(0):q[K]=F[K]}if(!(w&&(function(t){return
 t.values.length}).length))return t.
 utils.noData(e,G),e;G.selectAll(".nv-noData").remove();var 
J=w.filter(function(t){return!t.disabled&}),tt=w.filter(function(t){return!t.bar});r=J.length&&!R?f.xScale():l.xScale(),i=p.scale(),o=R?l.yScale():f.yScale(),a=R?f.yScale():l.yScale(),s=R?c.yScale():d.yScale(),u=R?d.yScale():c.yScale();var
 
et=w.filter(function(t){return!t.disabled&&(R?!t.bar:t.bar)}).map(function(t){return
 
t.values.map(function(t,e){return{x:C(t,e),y:S(t,e)}})}),nt=w.filter(function(t){return!t.disabled&&(R?t.bar:!t.bar)}).map(function(t){return
 
t.values.map(function(t,e){return{x:C(t,e),y:S(t,e)}})});r.range([0,X]),i.domain(d3.extent(d3.merge(et.concat(nt)),function(t){return
 t.x})).range([0,X]);var 
rt=G.selectAll("g.nv-wrap.nv-linePlusBar").data([w]),it=rt.enter().append("g").attr("class","nvd3
 nv-wrap 
nv-linePlusBar").append("g"),ot=rt.select("g");it.append("g").attr("class","nv-legendWrap");var
 at=it.append("g").attr("class","nv-focus");at.append("g").attr("class","nv-x 
nv-axis"),at.append("g").att
 r("class","nv-y1 nv-axis"),at.append("g").attr("class","nv-y2 
nv-axis"),at.append("g").attr("class","nv-barsWrap"),at.append("g").attr("class","nv-linesWrap");var
 
st=it.append("g").attr("class","nv-context");if(st.append("g").attr("class","nv-x
 nv-axis"),st.append("g").attr("class","nv-y1 
nv-axis"),st.append("g").attr("class","nv-y2 
nv-axis"),st.append("g").attr("class","nv-barsWrap"),st.append("g").attr("class","nv-linesWrap"),st.append("g").attr("class","nv-brushBackground"),st.append("g").attr("class","nv-x
 nv-brush"),A){var 
ut=b.align()?X/2:X,lt=b.align()?ut:0;b.width(ut),ot.select(".nv-legendWrap").datum(w.map(function(t){return
 t.originalKey=void 
0===t.originalKey?t.key:t.originalKey,R?t.key=t.originalKey+(t.bar?W:z):t.key=t.originalKey+(t.bar?z:W),t})).call(b),b.height()>$.top&&($.top=b.height(),Z=t.utils.availableHeight(M,G,$)-O),ot.select(".nv-legendWrap").attr("transform","translate("+lt+","+-$.top+")")}else
 ot.select(".nv-legendWrap").selectAll("*").remove();rt.attr("tran
 
sform","translate("+$.left+","+$.top+")"),ot.select(".nv-context").style("display",T?"initial":"none"),d.width(X).height(Q).color(w.map(function(t,e){return
 
t.color||E(t,e)}).filter(function(t,e){return!w[e].disabled&[e].bar})),c.width(X).height(Q).color(w.map(function(t,e){return
 t.color||E(t,e)}).filter(function(t,e){return!w[e].disabled&&!w[e].bar}));var 
ct=ot.select(".nv-context 
.nv-barsWrap").datum(J.length?J:[{values:[]}]),ft=ot.select(".nv-context 
.nv-linesWrap").datum(Y(tt)?[{values:[]}]:tt.filter(function(t){return!t.disabled}));ot.select(".nv-context").attr("transform","translate(0,"+(Z+$.bottom+k.top)+")"),ct.transition().call(d),ft.transition().call(c),D&&(p._ticks(t.utils.calcTicksX(X/100,w)).tickSize(-Q,0),ot.select(".nv-context
 
.nv-x.nv-axis").attr("transform","translate(0,"+s.range()[0]+")"),ot.select(".nv-context
 
.nv-x.nv-axis").transition().call(p)),N&&(m.scale(s)._ticks(Q/36).tickSize(-X,0),y.scale(u)._ticks(Q/36).tickSize(J.length?0:-X,0),ot.select(".nv-context
  
.nv-y3.nv-axis").style("opacity",J.length?1:0).attr("transform","translate(0,"+i.range()[0]+")"),ot.select(".nv-context
 
.nv-y2.nv-axis").style("opacity",tt.length?1:0).attr("transform","translate("+i.range()[1]+",0)"),ot.select(".nv-context
 .nv-y1.nv-axis").transition().call(m),ot.select(".nv-context 
.nv-y2.nv-axis").transition().call(y)),x.x(i).on("brush",V),j&(j);var 

flink git commit: [FLINK-5722] [table] Add dedicated DataSetDistinct operator.

2017-03-08 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 338c30a41 -> 7a629fc59


[FLINK-5722] [table] Add dedicated DataSetDistinct operator.

- Uses hash-combiner for more better combine rate.

This closes #3471.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a629fc5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a629fc5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a629fc5

Branch: refs/heads/master
Commit: 7a629fc59ff206ba51f22e1bf35fe50882e63538
Parents: 338c30a
Author: Fabian Hueske 
Authored: Fri Mar 3 23:17:36 2017 +0100
Committer: twalthr 
Committed: Wed Mar 8 14:14:06 2017 +0100

--
 .../plan/nodes/dataset/DataSetDistinct.scala| 94 
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  1 +
 .../rules/dataSet/DataSetAggregateRule.scala|  9 +-
 .../rules/dataSet/DataSetDistinctRule.scala | 61 +
 .../runtime/aggregate/DistinctReduce.scala  | 26 ++
 .../scala/batch/sql/DistinctAggregateTest.scala | 45 --
 .../batch/sql/QueryDecorrelationTest.scala  | 10 +--
 .../api/scala/batch/sql/SetOperatorsTest.scala  |  5 +-
 .../scala/batch/table/FieldProjectionTest.scala | 10 +--
 9 files changed, 218 insertions(+), 43 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala
new file mode 100644
index 000..14116f1
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.dataset
+
+import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, 
RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import 
org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint
+import org.apache.flink.api.java.DataSet
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.runtime.aggregate.DistinctReduce
+import org.apache.flink.types.Row
+
+import scala.collection.JavaConverters._
+
+/**
+  * DataSet RelNode for a Distinct (LogicalAggregate without aggregation 
functions).
+  *
+  */
+class DataSetDistinct(
+   cluster: RelOptCluster,
+   traitSet: RelTraitSet,
+   input: RelNode,
+   rowRelDataType: RelDataType,
+   ruleDescription: String)
+  extends SingleRel(cluster, traitSet, input)
+  with DataSetRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
+new DataSetDistinct(
+  cluster,
+  traitSet,
+  inputs.get(0),
+  rowRelDataType,
+  ruleDescription
+)
+  }
+
+  override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
+
+val child = this.getInput
+val rowCnt = metadata.getRowCount(child)
+val rowSize = this.estimateRowSize(child.getRowType)
+// less expensive than DataSetAggregate without aggregates
+planner.getCostFactory.makeCost(rowCnt, 0, rowCnt * rowSize * 0.9)
+  }
+
+  override def toString: String = {
+s"Distinct(distinct: (${rowTypeToString(rowRelDataType)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw).item("distinct", rowTypeToString(rowRelDataType))
+  }
+
+  def rowTypeToString(rowType: RelDataType): String = {
+rowType.getFieldList.asScala.map(_.getName).mkString(", ")
+  }
+
+  override 

flink git commit: [FLINK-4326] [scripts] Flink foreground services

2017-03-08 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master e9a5c8629 -> 338c30a41


[FLINK-4326] [scripts] Flink foreground services

Add a "start-foreground" option to the Flink service scripts which does
not daemonize the service nor redirect output.

This closes #3492.
This closes #3351.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/338c30a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/338c30a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/338c30a4

Branch: refs/heads/master
Commit: 338c30a41d4ff04ce196bdaeb5251a222dc109c0
Parents: e9a5c86
Author: Greg Hogan 
Authored: Fri Oct 7 16:06:48 2016 -0400
Committer: Ufuk Celebi 
Committed: Wed Mar 8 10:54:12 2017 +0100

--
 docs/setup/cluster_setup.md |  6 +-
 .../src/main/flink-bin/bin/flink-console.sh | 65 
 flink-dist/src/main/flink-bin/bin/jobmanager.sh | 10 ++-
 .../src/main/flink-bin/bin/taskmanager.sh   | 42 +++--
 flink-dist/src/main/flink-bin/bin/zookeeper.sh  | 10 ++-
 .../flink-bin/conf/log4j-console.properties | 39 
 .../src/main/flink-bin/conf/logback-console.xml | 56 +
 7 files changed, 200 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/338c30a4/docs/setup/cluster_setup.md
--
diff --git a/docs/setup/cluster_setup.md b/docs/setup/cluster_setup.md
index 7d3684f..c86e353 100644
--- a/docs/setup/cluster_setup.md
+++ b/docs/setup/cluster_setup.md
@@ -132,18 +132,18 @@ To stop Flink, there is also a `stop-cluster.sh` script.
 
 ### Adding JobManager/TaskManager Instances to a Cluster
 
-You can add both JobManager and TaskManager instances to your running cluster 
with the `bin/taskmanager.sh` and `bin/jobmanager.sh` scripts.
+You can add both JobManager and TaskManager instances to your running cluster 
with the `bin/jobmanager.sh` and `bin/taskmanager.sh` scripts.
 
  Adding a JobManager
 
 ~~~bash
-bin/jobmanager.sh (start cluster)|stop|stop-all
+bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
 ~~~
 
  Adding a TaskManager
 
 ~~~bash
-bin/taskmanager.sh start|stop|stop-all
+bin/taskmanager.sh start|start-foreground|stop|stop-all
 ~~~
 
 Make sure to call these scripts on the hosts on which you want to start/stop 
the respective instance.

http://git-wip-us.apache.org/repos/asf/flink/blob/338c30a4/flink-dist/src/main/flink-bin/bin/flink-console.sh
--
diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh 
b/flink-dist/src/main/flink-bin/bin/flink-console.sh
new file mode 100644
index 000..71c5c35
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh
@@ -0,0 +1,65 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Start a Flink service as a console application. Must be stopped with Ctrl-C
+# or with SIGTERM by kill or the controlling process.
+USAGE="Usage: flink-console.sh (jobmanager|taskmanager|zookeeper) [args]"
+
+SERVICE=$1
+ARGS=("${@:2}") # get remaining arguments as array
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+case $SERVICE in
+(jobmanager)
+CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
+;;
+
+(taskmanager)
+CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
+;;
+
+(zookeeper)
+
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
+;;
+
+(*)
+echo "Unknown service '${SERVICE}'. $USAGE."
+exit 1
+;;
+esac
+
+FLINK_TM_CLASSPATH=`constructFlinkClassPath`
+
+log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties"