Build failed in Jenkins: flink-snapshot-deployment #403

2017-03-13 Thread Apache Jenkins Server
See 


--
[...truncated 170.33 KB...]
[ERROR] public static void rethrowIOException(Throwable t) throws IOException {
[ERROR] ^
[ERROR] 
:342:
 warning: no description for @throws
[ERROR] * @throws IOException
[ERROR] ^
[ERROR] 
:343:
 warning: no description for @throws
[ERROR] * @throws ClassNotFoundException
[ERROR] ^
[ERROR] 
:363:
 warning: no description for @throws
[ERROR] * @throws IOException
[ERROR] ^
[ERROR] 
:364:
 warning: no description for @throws
[ERROR] * @throws ClassNotFoundException
[ERROR] ^
[ERROR] 
:248:
 warning: no @param for closeables
[ERROR] public static void closeAllQuietly(Iterable 
closeables) {
[ERROR] ^
[ERROR] 
:259:
 warning: no @param for closeable
[ERROR] public static void closeQuietly(AutoCloseable closeable) {
[ERROR] ^
[ERROR] 
:77:
 warning: no @param for hostPort
[ERROR] public static URL getCorrectHostnamePort(String hostPort) {
[ERROR] ^
[ERROR] 
:56:
 warning: no @param for 
[ERROR] public static  T checkNotNull(T reference) {
[ERROR] ^
[ERROR] 
:73:
 warning: no @param for 
[ERROR] public static  T checkNotNull(T reference, @Nullable String 
errorMessage) {
[ERROR] ^
[ERROR] 
:99:
 warning: no @param for 
[ERROR] public static  T checkNotNull(T reference,
[ERROR] ^
[ERROR] 
:37:
 warning: no description for @throws
[ERROR] * @throws IllegalArgumentException
[ERROR] ^
[ERROR] 
:61:
 warning: no description for @throws
[ERROR] * @throws IllegalArgumentException
[ERROR] ^
[ERROR] 
:85:
 warning: no description for @throws
[ERROR] * @throws IllegalArgumentException
[ERROR] ^
[ERROR] 
:87:
 warning: no @param for logger
[ERROR] public static long getLong(Properties config, String key, long 
defaultValue, Logger logger) {
[ERROR] ^
[ERROR] 
:69:
 error: reference not found
[ERROR] * @see org.apache.commons.codec.binary.Hex#encodeHexString(byte[])
[ERROR] ^
[ERROR] 
:32:
 warning: no description for @param
[ERROR] * @param visitable
[ERROR] ^
[ERROR] 
:39:
 warning: no description for @param
[ERROR] * @param visitable
[ERROR] ^
[ERROR] 
:112:
 warning: no description for @throws
[ERROR] * @throws IOException
[ERROR] ^
[ERROR] 
:271:
 warning: no @throws for java.io.IOException
[ERROR] public static void setDefaultScheme(Configuration config) throws 
IOException {
[ERROR] ^
[ERROR] 
:472:
 warning: no @throws for java.io.IOException
[ERROR] public abstract void 

flink git commit: [FLINK-6007] Allow key removal from within the watermark callback.

2017-03-13 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/master 5f08e5359 -> 14c1941d8


[FLINK-6007] Allow key removal from within the watermark callback.

When deleting a key from the InternalWatermarkCallbackService, the
deleted key is put into a separate set, and the actual deletion
happens after the iteration over all keys has finished. To avoid
checkpointing the deletion set, the actual cleanup also happens
upon checkpointing.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14c1941d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14c1941d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14c1941d

Branch: refs/heads/master
Commit: 14c1941d8eaa583eb8f7eeb5478e605850c0d355
Parents: 5f08e53
Author: kl0u 
Authored: Wed Mar 8 20:18:18 2017 +0100
Committer: kl0u 
Committed: Mon Mar 13 17:29:03 2017 +0100

--
 .../api/operators/AbstractStreamOperator.java   |   6 +
 .../operators/InternalTimeServiceManager.java   |   5 +
 .../InternalWatermarkCallbackService.java   | 112 +-
 .../operators/AbstractStreamOperatorTest.java   | 117 +++
 .../util/AbstractStreamOperatorTestHarness.java |   9 ++
 5 files changed, 222 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 6e6b147..ef23be9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -957,4 +957,10 @@ public abstract class AbstractStreamOperator
return timeServiceManager == null ? 0 :
timeServiceManager.numEventTimeTimers();
}
+
+   @VisibleForTesting
+   public int numKeysForWatermarkCallback() {
+   return timeServiceManager == null ? 0 :
+   timeServiceManager.numKeysForWatermarkCallback();
+   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index 71ffbd2..0b60232 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -188,4 +188,9 @@ class InternalTimeServiceManager {
}
return count;
}
+
+   @VisibleForTesting
+   public int numKeysForWatermarkCallback() {
+   return watermarkCallbackService.numKeysForWatermarkCallback();
+   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/14c1941d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
index a4263e4..9a43853 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalWatermarkCallbackService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -38,8 +39,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * The watermark callback service allows to register a {@link 
OnWatermarkCallback OnWatermarkCallback}
  * and multiple keys, for which the callback will be invoked every time 

flink git commit: [FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization

2017-03-13 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/release-1.2 1ea252a41 -> b7d288fe2


[FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization

This commit adds a reimplemented JavaSerializer to be registered with
Kryo. This is due to a know issue with Kryo's JavaSerializer that may
use the wrong classloader for deserialzation.

Instead of registering Kryo's JavaSerializer for Throwables, it is now
changed to register the reimplemented JavaSerializer. Users who bump
into ClassNotFoundExceptions if they are using Kryo's JavaSerializer for
their own types are also recommended to change to Flink's JavaSerializer.

This closes #3518.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7d288fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7d288fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7d288fe

Branch: refs/heads/release-1.2
Commit: b7d288fe27d718a19ba06c18cf5cd8c21b167260
Parents: 1ea252a
Author: Tzu-Li (Gordon) Tai 
Authored: Sun Mar 12 22:46:27 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Mon Mar 13 23:44:29 2017 +0800

--
 docs/dev/custom_serializers.md  | 12 +++
 .../typeutils/runtime/kryo/JavaSerializer.java  | 82 
 .../typeutils/runtime/kryo/KryoSerializer.java  |  5 +-
 .../apache/flink/util/InstantiationUtil.java|  4 +-
 4 files changed, 98 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b7d288fe/docs/dev/custom_serializers.md
--
diff --git a/docs/dev/custom_serializers.md b/docs/dev/custom_serializers.md
index 2b72ca0..ddfc2ee 100644
--- a/docs/dev/custom_serializers.md
+++ b/docs/dev/custom_serializers.md
@@ -109,4 +109,16 @@ For Google Protobuf you need the following Maven 
dependency:
 
 Please adjust the versions of both libraries as needed.
 
+### Issue with using Kryo's `JavaSerializer` 
 
+If you register Kryo's `JavaSerializer` for your custom type, you may
+encounter `ClassNotFoundException`s even though your custom type class is
+included in the submitted user code jar. This is due to a know issue with
+Kryo's `JavaSerializer`, which may incorrectly use the wrong classloader.
+
+In this case, you should use 
`org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer`
+instead to resolve the issue. This is a reimplemented `JavaSerializer` in Flink
+that makes sure the user code classloader is used.
+
+Please refer to [FLINK-6025](https://issues.apache.org/jira/browse/FLINK-6025)
+for more details.

http://git-wip-us.apache.org/repos/asf/flink/blob/b7d288fe/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
new file mode 100644
index 000..a51647c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.ObjectMap;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * This is a reimplementation of Kryo's {@link 
com.esotericsoftware.kryo.serializers.JavaSerializer},
+ * that additionally makes sure the {@link ObjectInputStream} used for 
deserialization specifically uses Kryo's
+ * registered classloader.
+ *
+ * Flink maintains this reimplementation due to a 

flink git commit: [FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization

2017-03-13 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/release-1.1 01703e60e -> e50bf6506


[FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization

This commit adds a reimplemented JavaSerializer to be registered with
Kryo. This is due to a know issue with Kryo's JavaSerializer that may
use the wrong classloader for deserialzation.

Instead of registering Kryo's JavaSerializer for Throwables, it is now
changed to register the reimplemented JavaSerializer. Users who bump
into ClassNotFoundExceptions if they are using Kryo's JavaSerializer for
their own types are also recommended to change to Flink's
JavaSerializer.

This closes #3519.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e50bf650
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e50bf650
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e50bf650

Branch: refs/heads/release-1.1
Commit: e50bf65065722ea02a40406c2c65eed3d65a814a
Parents: 01703e6
Author: Tzu-Li (Gordon) Tai 
Authored: Sun Mar 12 22:46:27 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Mon Mar 13 23:42:38 2017 +0800

--
 docs/apis/best_practices.md | 13 
 .../typeutils/runtime/kryo/JavaSerializer.java  | 82 
 .../typeutils/runtime/kryo/KryoSerializer.java  |  5 +-
 .../apache/flink/util/InstantiationUtil.java|  4 +-
 4 files changed, 99 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e50bf650/docs/apis/best_practices.md
--
diff --git a/docs/apis/best_practices.md b/docs/apis/best_practices.md
index 7ae1b64..7e0c3e4 100644
--- a/docs/apis/best_practices.md
+++ b/docs/apis/best_practices.md
@@ -270,6 +270,19 @@ For Google Protobuf you need the following Maven 
dependency:
 
 Please adjust the versions of both libraries as needed.
 
+### Issue with using Kryo's `JavaSerializer` 
+  
+If you register Kryo's `JavaSerializer` for your custom type, you may
+encounter `ClassNotFoundException`s even though your custom type class is
+included in the submitted user code jar. This is due to a know issue with
+Kryo's `JavaSerializer`, which may incorrectly use the wrong classloader.
+
+In this case, you should use 
`org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer`
+instead to resolve the issue. This is a reimplemented `JavaSerializer` in Flink
+that makes sure the user code classloader is used.
+
+Please refer to [FLINK-6025](https://issues.apache.org/jira/browse/FLINK-6025)
+for more details.
 
 ## Using Logback instead of Log4j
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e50bf650/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
new file mode 100644
index 000..a51647c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.ObjectMap;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * This is a reimplementation of Kryo's {@link 
com.esotericsoftware.kryo.serializers.JavaSerializer},
+ * that additionally makes sure the {@link ObjectInputStream} used for 
deserialization specifically uses Kryo's
+ * registered classloader.
+ *
+ * Flink maintains this 

[1/2] flink git commit: [FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization

2017-03-13 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/master fcd264a70 -> 5f08e5359


[FLINK-6025] [core] Add Flink's own JavaSerializer for Kryo serialization

This commit adds a reimplemented JavaSerializer to be registered with
Kryo. This is due to a know issue with Kryo's JavaSerializer that may
use the wrong classloader for deserialzation.

Instead of registering Kryo's JavaSerializer for Throwables, it is now
changed to register the reimplemented JavaSerializer. Users who bump
into ClassNotFoundExceptions if they are using Kryo's JavaSerializer for
their own types are also recommended to change to Flink's JavaSerializer.

This closes #3517.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2143172
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2143172
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2143172

Branch: refs/heads/master
Commit: f2143172feca2925832c8b26c3c9fbbb92ecd48f
Parents: fcd264a
Author: Tzu-Li (Gordon) Tai 
Authored: Sun Mar 12 22:46:27 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Mon Mar 13 23:31:33 2017 +0800

--
 docs/dev/custom_serializers.md  | 12 +++
 .../typeutils/runtime/kryo/JavaSerializer.java  | 82 
 .../typeutils/runtime/kryo/KryoSerializer.java  |  5 +-
 .../apache/flink/util/InstantiationUtil.java|  4 +-
 4 files changed, 98 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f2143172/docs/dev/custom_serializers.md
--
diff --git a/docs/dev/custom_serializers.md b/docs/dev/custom_serializers.md
index 2b72ca0..ddfc2ee 100644
--- a/docs/dev/custom_serializers.md
+++ b/docs/dev/custom_serializers.md
@@ -109,4 +109,16 @@ For Google Protobuf you need the following Maven 
dependency:
 
 Please adjust the versions of both libraries as needed.
 
+### Issue with using Kryo's `JavaSerializer` 
 
+If you register Kryo's `JavaSerializer` for your custom type, you may
+encounter `ClassNotFoundException`s even though your custom type class is
+included in the submitted user code jar. This is due to a know issue with
+Kryo's `JavaSerializer`, which may incorrectly use the wrong classloader.
+
+In this case, you should use 
`org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer`
+instead to resolve the issue. This is a reimplemented `JavaSerializer` in Flink
+that makes sure the user code classloader is used.
+
+Please refer to [FLINK-6025](https://issues.apache.org/jira/browse/FLINK-6025)
+for more details.

http://git-wip-us.apache.org/repos/asf/flink/blob/f2143172/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
new file mode 100644
index 000..a51647c
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/JavaSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime.kryo;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.ObjectMap;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * This is a reimplementation of Kryo's {@link 
com.esotericsoftware.kryo.serializers.JavaSerializer},
+ * that additionally makes sure the {@link ObjectInputStream} used for 
deserialization specifically uses Kryo's
+ * registered classloader.
+ *
+ * Flink maintains this reimplementation due to a known issue 

[2/2] flink git commit: [FLINK-3123] [kafka] Allow custom specific start offsets for FlinkKafkaConsumer

2017-03-13 Thread tzulitai
[FLINK-3123] [kafka] Allow custom specific start offsets for FlinkKafkaConsumer

This closes #2687.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f08e535
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f08e535
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f08e535

Branch: refs/heads/master
Commit: 5f08e53592ebd29cfcd8ee486fcfd6229b82aa69
Parents: f214317
Author: Tzu-Li (Gordon) Tai 
Authored: Fri Mar 10 21:11:42 2017 +0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Mon Mar 13 23:38:13 2017 +0800

--
 docs/dev/connectors/kafka.md|  35 ++-
 .../connectors/kafka/Kafka010ITCase.java|   4 +
 .../connectors/kafka/Kafka08ITCase.java |   9 +-
 .../connectors/kafka/Kafka09ITCase.java |   4 +
 .../kafka/FlinkKafkaConsumerBase.java   | 101 ++-
 .../connectors/kafka/config/StartupMode.java|   9 +-
 .../KafkaConsumerPartitionAssignmentTest.java   |  33 --
 .../connectors/kafka/KafkaConsumerTestBase.java |  82 +--
 8 files changed, 251 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/docs/dev/connectors/kafka.md
--
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 06e40b2..6d58b0c 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -220,7 +220,40 @@ All versions of the Flink Kafka Consumer have the above 
explicit configuration m
  record. Under these modes, committed offsets in Kafka will be ignored and
  not used as starting positions.
  
-Note that these settings do not affect the start position when the job is
+You can also specify the exact offsets the consumer should start from for each 
partition:
+
+
+
+{% highlight java %}
+Map specificStartOffsets = new HashMap<>();
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);
+
+myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
+{% endhighlight %}
+
+
+{% highlight scala %}
+val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, 
java.lang.Long]()
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
+specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)
+
+myConsumer.setStartFromSpecificOffsets(specificStartOffsets)
+{% endhighlight %}
+
+
+
+The above example configures the consumer to start from the specified offsets 
for
+partitions 0, 1, and 2 of topic `myTopic`. The offset values should be the
+next record that the consumer should read for each partition. Note that
+if the consumer needs to read a partition which does not have a specified
+offset within the provided offsets map, it will fallback to the default
+group offsets behaviour (i.e. `setStartFromGroupOffsets()`) for that
+particular partition.
+
+Note that these start position configuration methods do not affect the start 
position when the job is
 automatically restored from a failure or manually restored using a savepoint.
 On restore, the start position of each Kafka partition is determined by the
 offsets stored in the savepoint or checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index a375fb6..2085169 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -147,6 +147,10 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
runStartFromGroupOffsets();
}
 
+   @Test(timeout = 6)
+   public void testStartFromSpecificOffsets() throws Exception {
+   runStartFromSpecificOffsets();
+   }
 
// --- offset committing ---
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5f08e535/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java

flink git commit: [FLINK-5971] [flip-6] Add timeout for registered jobs on the ResourceManager

2017-03-13 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 04aee61d8 -> fcd264a70


[FLINK-5971] [flip-6] Add timeout for registered jobs on the ResourceManager

This PR introduces a timeout for inactive jobs on the ResourceManager. A job is 
inactive
if there is no active leader known for this job. In case that a job times out, 
it will
be removed from the ResourceManager. Additionally, this PR removes the 
dependency of
the JobLeaderIdService on the RunningJobsRegistry.

Fix YarnFlinkApplicationMasterRunner to use correct arguments for 
JobLeaderIdService

Fix race condition in JobLeaderIdListener#cancelTimeout

This closes #3488.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcd264a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcd264a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcd264a7

Branch: refs/heads/master
Commit: fcd264a707d3dd8ef4247825752c8639732c943c
Parents: 04aee61
Author: Till Rohrmann 
Authored: Mon Mar 6 16:57:43 2017 +0100
Committer: Till Rohrmann 
Committed: Mon Mar 13 15:03:18 2017 +0100

--
 .../apache/flink/configuration/AkkaOptions.java |   7 +
 .../configuration/ResourceManagerOptions.java   |  40 +++
 .../resourcemanager/JobLeaderIdActions.java |   8 +-
 .../resourcemanager/JobLeaderIdService.java | 119 +---
 .../resourcemanager/ResourceManager.java|   6 +-
 .../ResourceManagerConfiguration.java   |  48 ++--
 .../resourcemanager/ResourceManagerRunner.java  |   5 +-
 .../resourcemanager/JobLeaderIdServiceTest.java | 269 +++
 .../resourcemanager/ResourceManagerHATest.java  |  10 +-
 .../ResourceManagerJobMasterTest.java   |  10 +-
 .../ResourceManagerTaskExecutorTest.java|  10 +-
 .../slotmanager/SlotProtocolTest.java   |  21 +-
 .../taskexecutor/TaskExecutorITCase.java|  10 +-
 .../yarn/YarnFlinkApplicationMasterRunner.java  |   5 +-
 14 files changed, 498 insertions(+), 70 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
index 7e4c2b7..97b209e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java
@@ -29,6 +29,13 @@ import org.apache.flink.annotation.PublicEvolving;
 public class AkkaOptions {
 
/**
+* Timeout for akka ask calls
+*/
+   public static final ConfigOption AKKA_ASK_TIMEOUT = 
ConfigOptions
+   .key("akka.ask.timeout")
+   .defaultValue("10 s");
+
+   /**
 * The Akka tcp connection timeout.
 */
public static final ConfigOption AKKA_TCP_TIMEOUT = 
ConfigOptions

http://git-wip-us.apache.org/repos/asf/flink/blob/fcd264a7/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
new file mode 100644
index 000..6a09f19
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * The set of configuration options relating to the ResourceManager
+ */
+@PublicEvolving
+public class ResourceManagerOptions {
+
+   /**
+* Timeout for jobs which don't have a job manager as leader assigned.
+*/
+   public static final ConfigOption 

flink git commit: [FLINK-5882] [table] TableFunction (UDTF) should support variable types and variable arguments

2017-03-13 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 9b179beae -> 04aee61d8


[FLINK-5882] [table] TableFunction (UDTF) should support variable types and 
variable arguments

This closes #3407.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04aee61d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04aee61d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04aee61d

Branch: refs/heads/master
Commit: 04aee61d86f9ba30715c133380560739282feb81
Parents: 9b179be
Author: Zhuoluo Yang 
Authored: Tue Mar 7 12:02:46 2017 +0800
Committer: twalthr 
Committed: Mon Mar 13 10:55:10 2017 +0100

--
 .../codegen/calls/TableFunctionCallGen.scala| 17 ++---
 .../DataSetUserDefinedFunctionITCase.scala  | 37 
 .../DataStreamUserDefinedFunctionITCase.scala   | 34 --
 .../table/utils/UserDefinedTableFunctions.scala | 11 +-
 4 files changed, 91 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/04aee61d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
index 890b6bd..ba90292 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/TableFunctionCallGen.scala
@@ -44,14 +44,21 @@ class TableFunctionCallGen(
   codeGenerator: CodeGenerator,
   operands: Seq[GeneratedExpression])
 : GeneratedExpression = {
-// determine function signature
-val matchingSignature = getSignature(tableFunction, signature)
+// determine function method
+val matchingMethod = getEvalMethod(tableFunction, signature)
   .getOrElse(throw new CodeGenException("No matching signature found."))
+val matchingSignature = matchingMethod.getParameterTypes
+
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+if (operands.length > matchingSignature.length) {
+  operands.drop(matchingSignature.length).foreach(op =>
+paramToOperands = paramToOperands :+ 
(matchingSignature.last.getComponentType, op)
+  )
+}
 
 // convert parameters for function (output boxing)
-val parameters = matchingSignature
-.zip(operands)
-.map { case (paramClass, operandExpr) =>
+val parameters = paramToOperands.map { case (paramClass, operandExpr) =>
   if (paramClass.isPrimitive) {
 operandExpr
   } else if (ClassUtils.isPrimitiveWrapper(paramClass)

http://git-wip-us.apache.org/repos/asf/flink/blob/04aee61d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
index 33b2439..20bbf8b 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
@@ -31,6 +31,7 @@ import org.apache.flink.table.utils._
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.apache.flink.types.Row
+import org.junit.Assert._
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -277,6 +278,42 @@ class DataSetUserDefinedFunctionITCase(
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  @Test
+  def testTableFunctionWithVariableArguments(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+val varArgsFunc0 = new VarArgsFunc0
+tableEnv.registerFunction("VarArgsFunc0", varArgsFunc0)
+
+val result = testData(env)
+  .toTable(tableEnv, 'a, 'b, 'c)
+  .select('c)
+  .join(varArgsFunc0("1", "2", 'c))
+
+val expected = "Anna#44,1\n" +
+  "Anna#44,2\n" +
+  "Anna#44,Anna#44\n" +
+  "Jack#22,1\n" +
+  "Jack#22,2\n" +
+  "Jack#22,Jack#22\n" +
+ 

flink git commit: [FLINK-5881] [table] ScalarFunction(UDF) should support variable types and variable arguments

2017-03-13 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 354a13edf -> 9b179beae


[FLINK-5881] [table] ScalarFunction(UDF) should support variable types and 
variable arguments

This closes #3389.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b179bea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b179bea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b179bea

Branch: refs/heads/master
Commit: 9b179beaea2b623ad3637e417f6d8014b696d038
Parents: 354a13e
Author: Zhuoluo Yang 
Authored: Wed Feb 22 18:53:34 2017 +0800
Committer: twalthr 
Committed: Mon Mar 13 10:29:35 2017 +0100

--
 .../codegen/calls/ScalarFunctionCallGen.scala   | 17 ++--
 .../functions/utils/ScalarSqlFunction.scala | 26 --
 .../utils/UserDefinedFunctionUtils.scala| 74 +--
 .../java/utils/UserDefinedScalarFunctions.java  | 20 +
 .../UserDefinedScalarFunctionTest.scala | 95 +++-
 .../utils/UserDefinedScalarFunctions.scala  | 36 
 6 files changed, 229 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9b179bea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
index 7ff18eb..b0b4e09 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
@@ -43,15 +43,22 @@ class ScalarFunctionCallGen(
   codeGenerator: CodeGenerator,
   operands: Seq[GeneratedExpression])
 : GeneratedExpression = {
-// determine function signature and result class
-val matchingSignature = getSignature(scalarFunction, signature)
+// determine function method and result class
+val matchingMethod = getEvalMethod(scalarFunction, signature)
   .getOrElse(throw new CodeGenException("No matching signature found."))
+val matchingSignature = matchingMethod.getParameterTypes
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+if (operands.length > matchingSignature.length) {
+  operands.drop(matchingSignature.length).foreach(op =>
+paramToOperands = paramToOperands :+ 
(matchingSignature.last.getComponentType, op)
+  )
+}
+
 // convert parameters for function (output boxing)
-val parameters = matchingSignature
-.zip(operands)
-.map { case (paramClass, operandExpr) =>
+val parameters = paramToOperands.map { case (paramClass, operandExpr) =>
   if (paramClass.isPrimitive) {
 operandExpr
   } else if (ClassUtils.isPrimitiveWrapper(paramClass)

http://git-wip-us.apache.org/repos/asf/flink/blob/9b179bea/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index dc6d41f..e2cd272 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -113,9 +113,15 @@ object ScalarSqlFunction {
   .getParameterTypes(foundSignature)
   .map(typeFactory.createTypeFromTypeInfo)
 
-inferredTypes.zipWithIndex.foreach {
-  case (inferredType, i) =>
-operandTypes(i) = inferredType
+for (i <- operandTypes.indices) {
+  if (i < inferredTypes.length - 1) {
+operandTypes(i) = inferredTypes(i)
+  } else if (null != inferredTypes.last.getComponentType) {
+// last argument is a collection, the array type
+operandTypes(i) = inferredTypes.last.getComponentType
+  } else {
+operandTypes(i) = inferredTypes.last
+  }
 }
   }
 }
@@ -137,8 +143,18 @@ object ScalarSqlFunction {
   }
 
   override def getOperandCountRange: SqlOperandCountRange = {
-val signatureLengths = signatures.map(_.length)
-