Jenkins build is back to normal : flink-snapshot-deployment #658

2017-11-23 Thread Apache Jenkins Server
See 




flink git commit: [hotfix] Always explicitly set hadoop.version in create_binary_release

2017-11-23 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/release-1.4 13631b961 -> 3b58038d6


[hotfix] Always explicitly set hadoop.version in create_binary_release

Before, the "hadoop2" profile would create a binary release for whatever
happens to be the default hadoop.version.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b58038d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b58038d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b58038d

Branch: refs/heads/release-1.4
Commit: 3b58038d65cabdfd9da16150d4328227ca6afba1
Parents: 13631b9
Author: Aljoscha Krettek 
Authored: Mon Nov 20 18:06:11 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 18:09:26 2017 +0100

--
 tools/releasing/create_binary_release.sh | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3b58038d/tools/releasing/create_binary_release.sh
--
diff --git a/tools/releasing/create_binary_release.sh 
b/tools/releasing/create_binary_release.sh
index 42b5a8e..3b1508e 100755
--- a/tools/releasing/create_binary_release.sh
+++ b/tools/releasing/create_binary_release.sh
@@ -78,7 +78,7 @@ cd ..
 
 
 if [ "$SCALA_VERSION" == "none" ] && [ "$HADOOP_VERSION" == "none" ]; then
-  make_binary_release "hadoop2" "" "2.11"
+  make_binary_release "hadoop24" "-Dhadoop.version=2.4.1" "2.11"
   make_binary_release "hadoop26" "-Dhadoop.version=2.6.5" "2.11"
   make_binary_release "hadoop27" "-Dhadoop.version=2.7.3" "2.11"
   make_binary_release "hadoop28" "-Dhadoop.version=2.8.0" "2.11"
@@ -87,7 +87,7 @@ then
   make_binary_release "hadoop2" "-Dhadoop.version=$HADOOP_VERSION" "2.11"
 elif [ "$SCALA_VERSION" != none ] && [ "$HADOOP_VERSION" == "none" ]
 then
-  make_binary_release "hadoop2" "" "$SCALA_VERSION"
+  make_binary_release "hadoop24" "-Dhadoop.version=2.4.1" "$SCALA_VERSION"
   make_binary_release "hadoop26" "-Dhadoop.version=2.6.5" "$SCALA_VERSION"
   make_binary_release "hadoop27" "-Dhadoop.version=2.7.3" "$SCALA_VERSION"
   make_binary_release "hadoop28" "-Dhadoop.version=2.8.0" "$SCALA_VERSION"



flink git commit: [hotfix] Exclude python-source.zip from checkstyle

2017-11-23 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master d86c6b6bb -> 54dd91603


[hotfix] Exclude python-source.zip from checkstyle

This was causing the snapshot deployment to fail.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54dd9160
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54dd9160
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54dd9160

Branch: refs/heads/master
Commit: 54dd91603f80aa49000795b9dd4a53896c6da464
Parents: d86c6b6
Author: Aljoscha Krettek 
Authored: Thu Nov 23 17:57:50 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 17:58:35 2017 +0100

--
 tools/maven/suppressions.xml | 4 
 1 file changed, 4 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/54dd9160/tools/maven/suppressions.xml
--
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index a2de9e8..50a56ce 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -42,4 +42,8 @@ under the License.

+   
+   
 



[2/2] flink git commit: [FLINK-8118] [table] Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread fhueske
[FLINK-8118] [table] Allow to specify the offsets of KafkaTableSources

This closes #5056.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fb24581
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fb24581
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fb24581

Branch: refs/heads/release-1.4
Commit: 2fb24581a1775084e3be8c2575c129d250f39313
Parents: 828ef09
Author: Xingcan Cui 
Authored: Thu Nov 23 00:00:39 2017 +0800
Committer: Fabian Hueske 
Committed: Thu Nov 23 15:34:22 2017 +0100

--
 docs/dev/table/sourceSinks.md   |  50 +++-
 .../kafka/Kafka010AvroTableSource.java  |   2 +-
 .../kafka/Kafka010JsonTableSource.java  |   2 +-
 .../connectors/kafka/Kafka010TableSource.java   |   2 +-
 .../kafka/Kafka011AvroTableSource.java  |   2 +-
 .../kafka/Kafka011JsonTableSource.java  |   2 +-
 .../connectors/kafka/Kafka011TableSource.java   |   2 +-
 .../kafka/Kafka08AvroTableSource.java   |   2 +-
 .../kafka/Kafka08JsonTableSource.java   |   2 +-
 .../connectors/kafka/Kafka08TableSource.java|   2 +-
 .../kafka/Kafka09AvroTableSource.java   |   2 +-
 .../kafka/Kafka09JsonTableSource.java   |   2 +-
 .../connectors/kafka/Kafka09TableSource.java|   2 +-
 .../connectors/kafka/KafkaAvroTableSource.java  |   2 +-
 .../connectors/kafka/KafkaJsonTableSource.java  |   2 +-
 .../connectors/kafka/KafkaTableSource.java  | 126 ++-
 .../kafka/KafkaTableSourceTestBase.java |  44 +++
 17 files changed, 230 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/docs/dev/table/sourceSinks.md
--
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 7387358..aaf23bc 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -145,7 +145,7 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 
 
 
-* **Missing Field Handling** By default, a missing JSON field is set to 
`null`. You can enable strict JSON parsing that will cancel the source (and 
query) if a field is missing.
+* **Missing Field Handling:** By default, a missing JSON field is set to 
`null`. You can enable strict JSON parsing that will cancel the source (and 
query) if a field is missing.
 
 
 
@@ -169,6 +169,30 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 
 
 
+* **Specify the start reading position:** By default, the table source will 
start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions via the builder's methods, which 
correspond to the configurations in section [Kafka Consumers Start Position 
Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+
+
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build();
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build()
+{% endhighlight %}
+
+
+
 {% top %}
 
 ### KafkaAvroTableSource
@@ -265,6 +289,30 @@ val source: TableSource[_] = 
Kafka010AvroTableSource.builder()
 
 
 
+* **Specify the start reading position:** By default, the table source will 
start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions via the builder's methods, which 
correspond to the configurations in section [Kafka Consumers Start Position 
Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+
+
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build();
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build()
+{% endhighlight %}
+
+
+
 {% top %}
 
 ### Configuring a Processing Time Attribute

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb24581/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
 

[1/2] flink git commit: [FLINK-8118] [table] Improve KafkaTableSource documentation

2017-11-23 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.4 828ef09b0 -> 13631b961


[FLINK-8118] [table] Improve KafkaTableSource documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/13631b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/13631b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/13631b96

Branch: refs/heads/release-1.4
Commit: 13631b9617d32e46eba51c9125019ec5e77c39f3
Parents: 2fb2458
Author: twalthr 
Authored: Thu Nov 23 14:30:15 2017 +0100
Committer: Fabian Hueske 
Committed: Thu Nov 23 15:34:22 2017 +0100

--
 docs/dev/table/sourceSinks.md   | 56 ++--
 .../kafka/KafkaTableSourceTestBase.java |  5 +-
 2 files changed, 31 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/13631b96/docs/dev/table/sourceSinks.md
--
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index aaf23bc..2b10278 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -63,7 +63,7 @@ A `KafkaJsonTableSource` is created and configured using a 
builder. The followin
 
 {% highlight java %}
 // create builder
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -80,7 +80,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 {% highlight scala %}
 // create builder
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -108,7 +108,7 @@ Map mapping = new HashMap<>();
 mapping.put("sensorId", "id");
 mapping.put("temperature", "temp");
 
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -126,7 +126,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -150,7 +150,7 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 
 
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // configure missing field behavior
   .failOnMissingField(true)
@@ -160,7 +160,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // configure missing field behavior
   .failOnMissingField(true)
@@ -174,20 +174,20 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 
 
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build();
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build()
 {% endhighlight %}
 
@@ -205,7 +205,7 @@ A `KafkaAvroTableSource` is created and configured using a 
builder. The followin
 
 {% highlight java %}
 // create builder
-TableSource source = Kafka010AvroTableSource.builder()
+KafkaTableSource source = Kafka010AvroTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -224,7 +224,7 @@ TableSource source = Kafka010AvroTableSource.builder()
 
 {% highlight scala %}
 // create builder
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -256,7 +256,7 @@ Map mapping = new HashMap<>();
 mapping.put("sensorId", "id");
 mapping.put("temperature", "temp");
 
-TableSource source = Kafka010AvroTableSource.builder()
+KafkaTableSource source = Kafka010AvroTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -264,15 +264,15 @@ TableSource source = Kafka010AvroTableSource.builder()
 

flink git commit: [FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed a time limit in exceptional stream task shutdown.

2017-11-23 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/release-1.4 e100861f8 -> 828ef09b0


[FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed 
a time limit in exceptional stream task shutdown.

This closes #5058.

(cherry picked from commit d86c6b6)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/828ef09b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/828ef09b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/828ef09b

Branch: refs/heads/release-1.4
Commit: 828ef09b09f872107b412501774b42efaf6caa37
Parents: e100861
Author: Stefan Richter 
Authored: Wed Nov 22 17:52:35 2017 +0100
Committer: Stefan Richter 
Committed: Thu Nov 23 15:25:01 2017 +0100

--
 .../configuration/TimerServiceOptions.java  | 38 
 .../runtime/tasks/ProcessingTimeService.java| 12 
 .../streaming/runtime/tasks/StreamTask.java | 18 +-
 .../tasks/SystemProcessingTimeService.java  |  6 ++
 .../tasks/TestProcessingTimeService.java|  6 ++
 .../tasks/SystemProcessingTimeServiceTest.java  | 65 
 6 files changed, 142 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
new file mode 100644
index 000..835adce
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Timer service configuration options.
+ */
+@PublicEvolving
+public class TimerServiceOptions {
+
+   /**
+* This configures how long we wait for the {@link 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService}
+* to finish all pending timer threads when the stream task performs a 
failover shutdown. See FLINK-5465.
+*/
+   public static final ConfigOption 
TIMER_SERVICE_TERMINATION_AWAIT_MS = ConfigOptions
+   .key("timerservice.exceptional.shutdown.timeout")
+   .defaultValue(7500L);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/828ef09b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index b238252..2516299 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Defines the current processing time and handles all related actions,
@@ -93,4 +94,15 @@ public abstract class ProcessingTimeService {
 * will result in a hard exception.
 */
public abstract void shutdownService();
+
+   /**
+* Shuts down and clean up the timer service provider hard and 
immediately. This does wait
+* for all timers to complete or until the time limit is exceeded. Any 
call to
+* {@link #registerTimer(long, 

flink git commit: [FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed a time limit in exceptional stream task shutdown.

2017-11-23 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/master fda2c9ff6 -> d86c6b6bb


[FLINK-5465] [streaming] Wait for pending timer threads to finish or to exceed 
a time limit in exceptional stream task shutdown.

This closes #5058.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d86c6b6b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d86c6b6b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d86c6b6b

Branch: refs/heads/master
Commit: d86c6b6bb32adee9d4b5c9098340a34e8a8a7f1d
Parents: fda2c9f
Author: Stefan Richter 
Authored: Wed Nov 22 17:52:35 2017 +0100
Committer: Stefan Richter 
Committed: Thu Nov 23 15:23:43 2017 +0100

--
 .../configuration/TimerServiceOptions.java  | 38 
 .../runtime/tasks/ProcessingTimeService.java| 12 
 .../streaming/runtime/tasks/StreamTask.java | 18 +-
 .../tasks/SystemProcessingTimeService.java  |  6 ++
 .../tasks/TestProcessingTimeService.java|  6 ++
 .../tasks/SystemProcessingTimeServiceTest.java  | 65 
 6 files changed, 142 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
new file mode 100644
index 000..835adce
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/configuration/TimerServiceOptions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/**
+ * Timer service configuration options.
+ */
+@PublicEvolving
+public class TimerServiceOptions {
+
+   /**
+* This configures how long we wait for the {@link 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService}
+* to finish all pending timer threads when the stream task performs a 
failover shutdown. See FLINK-5465.
+*/
+   public static final ConfigOption 
TIMER_SERVICE_TERMINATION_AWAIT_MS = ConfigOptions
+   .key("timerservice.exceptional.shutdown.timeout")
+   .defaultValue(7500L);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d86c6b6b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index b238252..2516299 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Defines the current processing time and handles all related actions,
@@ -93,4 +94,15 @@ public abstract class ProcessingTimeService {
 * will result in a hard exception.
 */
public abstract void shutdownService();
+
+   /**
+* Shuts down and clean up the timer service provider hard and 
immediately. This does wait
+* for all timers to complete or until the time limit is exceeded. Any 
call to
+* {@link #registerTimer(long, ProcessingTimeCallback)} will result in 
a hard exception 

flink git commit: [hotfix][docs] Improve Kafka exactly-once docs

2017-11-23 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/release-1.4 62bf00189 -> e100861f8


[hotfix][docs] Improve Kafka exactly-once docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e100861f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e100861f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e100861f

Branch: refs/heads/release-1.4
Commit: e100861f84fe60ec6bb8172bb5a3cc453640fdb3
Parents: 62bf001
Author: Piotr Nowojski 
Authored: Thu Nov 23 13:08:43 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 15:02:49 2017 +0100

--
 docs/dev/connectors/kafka.md | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e100861f/docs/dev/connectors/kafka.md
--
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index ad4cc2f..5376d5b 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -538,7 +538,10 @@ chosen by passing appropriate `semantic` parameter to the 
`FlinkKafkaProducer011
  be duplicated.
  * `Semantic.AT_LEAST_ONCE` (default setting): similar to 
`setFlushOnCheckpoint(true)` in
  `FlinkKafkaProducer010`. This guarantees that no records will be lost 
(although they can be duplicated).
- * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once 
semantic.
+ * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once 
semantic. Whenever you write
+ to Kafka using transactions, do not forget about setting desired 
`isolation.level` (`read_committed`
+ or `read_uncommitted` - the latter one is the default value) for any 
application consuming records
+ from Kafka.
 
 
   Attention: Depending on your Kafka configuration, even 
after Kafka acknowledges



flink git commit: [hotfix][docs] Improve Kafka exactly-once docs

2017-11-23 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master 7ff3f373a -> fda2c9ff6


[hotfix][docs] Improve Kafka exactly-once docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fda2c9ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fda2c9ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fda2c9ff

Branch: refs/heads/master
Commit: fda2c9ff6080c4e776fb36b76d574bcd3777dd7b
Parents: 7ff3f37
Author: Piotr Nowojski 
Authored: Thu Nov 23 13:08:43 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 15:02:26 2017 +0100

--
 docs/dev/connectors/kafka.md | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fda2c9ff/docs/dev/connectors/kafka.md
--
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index ad4cc2f..5376d5b 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -538,7 +538,10 @@ chosen by passing appropriate `semantic` parameter to the 
`FlinkKafkaProducer011
  be duplicated.
  * `Semantic.AT_LEAST_ONCE` (default setting): similar to 
`setFlushOnCheckpoint(true)` in
  `FlinkKafkaProducer010`. This guarantees that no records will be lost 
(although they can be duplicated).
- * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once 
semantic.
+ * `Semantic.EXACTLY_ONCE`: uses Kafka transactions to provide exactly-once 
semantic. Whenever you write
+ to Kafka using transactions, do not forget about setting desired 
`isolation.level` (`read_committed`
+ or `read_uncommitted` - the latter one is the default value) for any 
application consuming records
+ from Kafka.
 
 
   Attention: Depending on your Kafka configuration, even 
after Kafka acknowledges



[1/2] flink git commit: [FLINK-8118] [table] Fix documentation mistakes

2017-11-23 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 458c909ca -> 7ff3f373a


[FLINK-8118] [table] Fix documentation mistakes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ff3f373
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ff3f373
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ff3f373

Branch: refs/heads/master
Commit: 7ff3f373a353f9ed8cd1973f2bde172ab7bd992d
Parents: 4083c70
Author: twalthr 
Authored: Thu Nov 23 14:30:15 2017 +0100
Committer: twalthr 
Committed: Thu Nov 23 14:49:14 2017 +0100

--
 docs/dev/table/sourceSinks.md   | 56 ++--
 .../kafka/KafkaTableSourceTestBase.java |  5 +-
 2 files changed, 31 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7ff3f373/docs/dev/table/sourceSinks.md
--
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index aaf23bc..2b10278 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -63,7 +63,7 @@ A `KafkaJsonTableSource` is created and configured using a 
builder. The followin
 
 {% highlight java %}
 // create builder
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -80,7 +80,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 {% highlight scala %}
 // create builder
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -108,7 +108,7 @@ Map mapping = new HashMap<>();
 mapping.put("sensorId", "id");
 mapping.put("temperature", "temp");
 
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -126,7 +126,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -150,7 +150,7 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 
 
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // configure missing field behavior
   .failOnMissingField(true)
@@ -160,7 +160,7 @@ TableSource source = Kafka010JsonTableSource.builder()
 
 
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // configure missing field behavior
   .failOnMissingField(true)
@@ -174,20 +174,20 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 
 
 {% highlight java %}
-TableSource source = Kafka010JsonTableSource.builder()
+KafkaTableSource source = Kafka010JsonTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build();
 {% endhighlight %}
 
 
 
 {% highlight scala %}
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // ...
   // start reading from the earliest offset
-  .startReadingFromEarliest()
+  .fromEarliest()
   .build()
 {% endhighlight %}
 
@@ -205,7 +205,7 @@ A `KafkaAvroTableSource` is created and configured using a 
builder. The followin
 
 {% highlight java %}
 // create builder
-TableSource source = Kafka010AvroTableSource.builder()
+KafkaTableSource source = Kafka010AvroTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -224,7 +224,7 @@ TableSource source = Kafka010AvroTableSource.builder()
 
 {% highlight scala %}
 // create builder
-val source: TableSource[_] = Kafka010JsonTableSource.builder()
+val source: KafkaTableSource = Kafka010JsonTableSource.builder()
   // set Kafka topic
   .forTopic("sensors")
   // set Kafka consumer properties
@@ -256,7 +256,7 @@ Map mapping = new HashMap<>();
 mapping.put("sensorId", "id");
 mapping.put("temperature", "temp");
 
-TableSource source = Kafka010AvroTableSource.builder()
+KafkaTableSource source = Kafka010AvroTableSource.builder()
   // ...
   // set Table schema
   .withSchema(TableSchema.builder()
@@ -264,15 +264,15 @@ TableSource source = Kafka010AvroTableSource.builder()
 .field("temperature", 

[2/2] flink git commit: [FLINK-8118] [table] Allow to specify the offsets of KafkaTableSources

2017-11-23 Thread twalthr
[FLINK-8118] [table] Allow to specify the offsets of KafkaTableSources

This closes #5056.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4083c70d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4083c70d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4083c70d

Branch: refs/heads/master
Commit: 4083c70dc88e0022daaab807d67b922d426fb533
Parents: 458c909
Author: Xingcan Cui 
Authored: Thu Nov 23 00:00:39 2017 +0800
Committer: twalthr 
Committed: Thu Nov 23 14:49:14 2017 +0100

--
 docs/dev/table/sourceSinks.md   |  50 +++-
 .../kafka/Kafka010AvroTableSource.java  |   2 +-
 .../kafka/Kafka010JsonTableSource.java  |   2 +-
 .../connectors/kafka/Kafka010TableSource.java   |   2 +-
 .../kafka/Kafka011AvroTableSource.java  |   2 +-
 .../kafka/Kafka011JsonTableSource.java  |   2 +-
 .../connectors/kafka/Kafka011TableSource.java   |   2 +-
 .../kafka/Kafka08AvroTableSource.java   |   2 +-
 .../kafka/Kafka08JsonTableSource.java   |   2 +-
 .../connectors/kafka/Kafka08TableSource.java|   2 +-
 .../kafka/Kafka09AvroTableSource.java   |   2 +-
 .../kafka/Kafka09JsonTableSource.java   |   2 +-
 .../connectors/kafka/Kafka09TableSource.java|   2 +-
 .../connectors/kafka/KafkaAvroTableSource.java  |   2 +-
 .../connectors/kafka/KafkaJsonTableSource.java  |   2 +-
 .../connectors/kafka/KafkaTableSource.java  | 126 ++-
 .../kafka/KafkaTableSourceTestBase.java |  44 +++
 17 files changed, 230 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/docs/dev/table/sourceSinks.md
--
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 7387358..aaf23bc 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -145,7 +145,7 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 
 
 
-* **Missing Field Handling** By default, a missing JSON field is set to 
`null`. You can enable strict JSON parsing that will cancel the source (and 
query) if a field is missing.
+* **Missing Field Handling:** By default, a missing JSON field is set to 
`null`. You can enable strict JSON parsing that will cancel the source (and 
query) if a field is missing.
 
 
 
@@ -169,6 +169,30 @@ val source: TableSource[_] = 
Kafka010JsonTableSource.builder()
 
 
 
+* **Specify the start reading position:** By default, the table source will 
start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions via the builder's methods, which 
correspond to the configurations in section [Kafka Consumers Start Position 
Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+
+
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build();
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build()
+{% endhighlight %}
+
+
+
 {% top %}
 
 ### KafkaAvroTableSource
@@ -265,6 +289,30 @@ val source: TableSource[_] = 
Kafka010AvroTableSource.builder()
 
 
 
+* **Specify the start reading position:** By default, the table source will 
start reading data from the committed group offsets in Zookeeper or Kafka 
brokers. You can specify other start positions via the builder's methods, which 
correspond to the configurations in section [Kafka Consumers Start Position 
Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+
+
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build();
+{% endhighlight %}
+
+
+
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build()
+{% endhighlight %}
+
+
+
 {% top %}
 
 ### Configuring a Processing Time Attribute

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
 

[2/3] flink git commit: [FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint

2017-11-23 Thread aljoscha
[FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint

Previously faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, 
written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
written record 44
5. crash
6. recover to checkpoint 1, txn1 from producerA found to 
"pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from 
KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed.

With this fix, after re-initialization txn3 will have different 
producerId/epoch counters compared to txn1.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27564c33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27564c33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27564c33

Branch: refs/heads/release-1.4
Commit: 27564c33955d8e53f0275a5b43d4b2415ba86547
Parents: 736b908
Author: Piotr Nowojski 
Authored: Wed Nov 22 15:53:08 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 14:45:07 2017 +0100

--
 .../connectors/kafka/FlinkKafkaProducer011.java | 85 ++--
 .../kafka/FlinkKafkaProducer011ITCase.java  | 65 +++
 2 files changed, 91 insertions(+), 59 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/27564c33/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0c741f5..b14e487 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -223,16 +222,11 @@ public class FlinkKafkaProducer011
private final int kafkaProducersPoolSize;
 
/**
-* Available transactional ids.
+* Pool of available transactional ids.
 */
private final BlockingDeque availableTransactionalIds = new 
LinkedBlockingDeque<>();
 
/**
-* Pool of KafkaProducers objects.
-*/
-   private transient Optional producersPool = 
Optional.empty();
-
-   /**
 * Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
 */
private boolean writeTimestampToKafka = false;
@@ -599,12 +593,6 @@ public class FlinkKafkaProducer011
catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
}
-   try {
-   producersPool.ifPresent(pool -> pool.close());
-   }
-   catch (Exception e) {
-   asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
-   }
// make sure we propagate pending errors
checkErroneous();
}
@@ -615,7 +603,7 @@ public class FlinkKafkaProducer011
protected KafkaTransactionState beginTransaction() throws 
FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
-   FlinkKafkaProducer producer = 
createOrGetProducerFromPool();
+   FlinkKafkaProducer producer = 
createTransactionalProducer();
producer.beginTransaction();
return new 
KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
@@ -631,21 +619,6 @@ public class FlinkKafkaProducer011
}
}
 
-   private FlinkKafkaProducer 
createOrGetProducerFromPool() throws FlinkKafka011Exception {
-   FlinkKafkaProducer producer = 
getProducersPool().poll();
-   if (producer 

[1/3] flink git commit: [hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead of generic Exception

2017-11-23 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/release-1.4 8a052bf09 -> 62bf00189


[hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead 
of generic Exception


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/736b9088
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/736b9088
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/736b9088

Branch: refs/heads/release-1.4
Commit: 736b9088dcee64a1d3b19575f29a80c377f94fb8
Parents: 8a052bf
Author: Piotr Nowojski 
Authored: Wed Nov 22 11:37:48 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 14:45:00 2017 +0100

--
 .../kafka/FlinkKafka011ErrorCode.java   | 26 
 .../kafka/FlinkKafka011Exception.java   | 42 
 .../connectors/kafka/FlinkKafkaProducer011.java | 22 +-
 3 files changed, 81 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/736b9088/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
new file mode 100644
index 000..4f5de4f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+/**
+ * Error codes used in {@link FlinkKafka011Exception}.
+ */
+public enum FlinkKafka011ErrorCode {
+   PRODUCERS_POOL_EMPTY,
+   EXTERNAL_ERROR
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/736b9088/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
new file mode 100644
index 000..6b16e53
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception used by {@link FlinkKafkaProducer011} and {@link 
FlinkKafkaConsumer011}.
+ */
+public class FlinkKafka011Exception extends FlinkException {
+
+   private final FlinkKafka011ErrorCode errorCode;
+
+   public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String 
message) {
+   super(message);
+   this.errorCode = errorCode;
+   }
+
+   

[3/3] flink git commit: [hotfix][kafka] Remove unused method in kafka tests

2017-11-23 Thread aljoscha
[hotfix][kafka] Remove unused method in kafka tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62bf0018
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62bf0018
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62bf0018

Branch: refs/heads/release-1.4
Commit: 62bf00189b136dd34c5af5d6181c33c1415f16b9
Parents: 27564c3
Author: Piotr Nowojski 
Authored: Wed Nov 22 15:55:20 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 14:45:16 2017 +0100

--
 .../kafka/FlinkKafkaProducer011ITCase.java  | 16 
 1 file changed, 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/62bf0018/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index a32c7f8..85735c8 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -29,12 +29,7 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-
 import kafka.server.KafkaServer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.junit.Before;
 import org.junit.Test;
@@ -553,17 +548,6 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
}
}
 
-   private void assertRecord(String topicName, String expectedKey, String 
expectedValue) {
-   try (KafkaConsumer kafkaConsumer = new 
KafkaConsumer<>(extraProperties)) {
-   
kafkaConsumer.subscribe(Collections.singletonList(topicName));
-   ConsumerRecords records = 
kafkaConsumer.poll(1);
-
-   ConsumerRecord record = 
Iterables.getOnlyElement(records);
-   assertEquals(expectedKey, record.key());
-   assertEquals(expectedValue, record.value());
-   }
-   }
-
private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) 
throws Exception {
try {
autoCloseable.close();



[1/3] flink git commit: [hotfix][kafka] Remove unused method in kafka tests

2017-11-23 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master ccf917de2 -> 458c909ca


[hotfix][kafka] Remove unused method in kafka tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/458c909c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/458c909c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/458c909c

Branch: refs/heads/master
Commit: 458c909caf6f3ab1a6ed90c2508eacf686d1d101
Parents: f214e7d
Author: Piotr Nowojski 
Authored: Wed Nov 22 15:55:20 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 14:44:18 2017 +0100

--
 .../kafka/FlinkKafkaProducer011ITCase.java  | 16 
 1 file changed, 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/458c909c/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index a32c7f8..85735c8 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -29,12 +29,7 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-
 import kafka.server.KafkaServer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.junit.Before;
 import org.junit.Test;
@@ -553,17 +548,6 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
}
}
 
-   private void assertRecord(String topicName, String expectedKey, String 
expectedValue) {
-   try (KafkaConsumer kafkaConsumer = new 
KafkaConsumer<>(extraProperties)) {
-   
kafkaConsumer.subscribe(Collections.singletonList(topicName));
-   ConsumerRecords records = 
kafkaConsumer.poll(1);
-
-   ConsumerRecord record = 
Iterables.getOnlyElement(records);
-   assertEquals(expectedKey, record.key());
-   assertEquals(expectedValue, record.value());
-   }
-   }
-
private void closeIgnoringProducerFenced(AutoCloseable autoCloseable) 
throws Exception {
try {
autoCloseable.close();



[2/3] flink git commit: [FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint

2017-11-23 Thread aljoscha
[FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint

Previously faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, 
written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
written record 44
5. crash
6. recover to checkpoint 1, txn1 from producerA found to 
"pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from 
KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed.

With this fix, after re-initialization txn3 will have different 
producerId/epoch counters compared to txn1.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f214e7d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f214e7d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f214e7d8

Branch: refs/heads/master
Commit: f214e7d8ed5a38a8b56f12cf8d7bc7dd0ba31189
Parents: 2ac32c5
Author: Piotr Nowojski 
Authored: Wed Nov 22 15:53:08 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 14:44:18 2017 +0100

--
 .../connectors/kafka/FlinkKafkaProducer011.java | 85 ++--
 .../kafka/FlinkKafkaProducer011ITCase.java  | 65 +++
 2 files changed, 91 insertions(+), 59 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f214e7d8/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 0c741f5..b14e487 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -66,7 +66,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -223,16 +222,11 @@ public class FlinkKafkaProducer011
private final int kafkaProducersPoolSize;
 
/**
-* Available transactional ids.
+* Pool of available transactional ids.
 */
private final BlockingDeque availableTransactionalIds = new 
LinkedBlockingDeque<>();
 
/**
-* Pool of KafkaProducers objects.
-*/
-   private transient Optional producersPool = 
Optional.empty();
-
-   /**
 * Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
 */
private boolean writeTimestampToKafka = false;
@@ -599,12 +593,6 @@ public class FlinkKafkaProducer011
catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
}
-   try {
-   producersPool.ifPresent(pool -> pool.close());
-   }
-   catch (Exception e) {
-   asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
-   }
// make sure we propagate pending errors
checkErroneous();
}
@@ -615,7 +603,7 @@ public class FlinkKafkaProducer011
protected KafkaTransactionState beginTransaction() throws 
FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
-   FlinkKafkaProducer producer = 
createOrGetProducerFromPool();
+   FlinkKafkaProducer producer = 
createTransactionalProducer();
producer.beginTransaction();
return new 
KafkaTransactionState(producer.getTransactionalId(), producer);
case AT_LEAST_ONCE:
@@ -631,21 +619,6 @@ public class FlinkKafkaProducer011
}
}
 
-   private FlinkKafkaProducer 
createOrGetProducerFromPool() throws FlinkKafka011Exception {
-   FlinkKafkaProducer producer = 
getProducersPool().poll();
-   if (producer == 

[3/3] flink git commit: [hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead of generic Exception

2017-11-23 Thread aljoscha
[hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead 
of generic Exception


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ac32c59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ac32c59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ac32c59

Branch: refs/heads/master
Commit: 2ac32c596bfaa833beefefd8de85c504e2d8d623
Parents: ccf917d
Author: Piotr Nowojski 
Authored: Wed Nov 22 11:37:48 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 14:44:18 2017 +0100

--
 .../kafka/FlinkKafka011ErrorCode.java   | 26 
 .../kafka/FlinkKafka011Exception.java   | 42 
 .../connectors/kafka/FlinkKafkaProducer011.java | 22 +-
 3 files changed, 81 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
new file mode 100644
index 000..4f5de4f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+/**
+ * Error codes used in {@link FlinkKafka011Exception}.
+ */
+public enum FlinkKafka011ErrorCode {
+   PRODUCERS_POOL_EMPTY,
+   EXTERNAL_ERROR
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
new file mode 100644
index 000..6b16e53
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception used by {@link FlinkKafkaProducer011} and {@link 
FlinkKafkaConsumer011}.
+ */
+public class FlinkKafka011Exception extends FlinkException {
+
+   private final FlinkKafka011ErrorCode errorCode;
+
+   public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String 
message) {
+   super(message);
+   this.errorCode = errorCode;
+   }
+
+   public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String 
message, 

flink git commit: [FLINK-6505] Proactively cleanup local FS for RocksDBKeyedStateBackend on startup

2017-11-23 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/release-1.4 35517f129 -> 8a052bf09


[FLINK-6505] Proactively cleanup local FS for RocksDBKeyedStateBackend on 
startup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8a052bf0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8a052bf0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8a052bf0

Branch: refs/heads/release-1.4
Commit: 8a052bf0948d92d6fccc4d1c6c4bd2aa459032c9
Parents: 35517f1
Author: Bowen Li 
Authored: Tue Oct 10 07:31:17 2017 +0200
Committer: Stefan Richter 
Committed: Thu Nov 23 13:51:41 2017 +0100

--
 .../state/RocksDBKeyedStateBackend.java | 28 ++--
 1 file changed, 14 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8a052bf0/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f67daab..9185ad0 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -235,20 +235,14 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
this.instanceBasePath = 
Preconditions.checkNotNull(instanceBasePath);
this.instanceRocksDBPath = new File(instanceBasePath, "db");
 
-   if (!instanceBasePath.exists()) {
-   if (!instanceBasePath.mkdirs()) {
-   throw new IOException("Could not create RocksDB 
data directory.");
-   }
+   if (instanceBasePath.exists()) {
+   // Clear the base directory when the backend is created
+   // in case something crashed and the backend never 
reached dispose()
+   cleanInstanceBasePath();
}
 
-   // clean it, this will remove the last part of the path but 
RocksDB will recreate it
-   try {
-   if (instanceRocksDBPath.exists()) {
-   LOG.warn("Deleting already existing db 
directory {}.", instanceRocksDBPath);
-   FileUtils.deleteDirectory(instanceRocksDBPath);
-   }
-   } catch (IOException e) {
-   throw new IOException("Error cleaning RocksDB data 
directory.", e);
+   if (!instanceBasePath.mkdirs()) {
+   throw new IOException("Could not create RocksDB data 
directory.");
}
 
this.keyGroupPrefixBytes = getNumberOfKeyGroups() > 
(Byte.MAX_VALUE + 1) ? 2 : 1;
@@ -312,10 +306,16 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
IOUtils.closeQuietly(dbOptions);
IOUtils.closeQuietly(columnOptions);
 
+   cleanInstanceBasePath();
+   }
+
+   private void cleanInstanceBasePath() {
+   LOG.info("Deleting existing instance base directory {}.", 
instanceBasePath);
+
try {
FileUtils.deleteDirectory(instanceBasePath);
-   } catch (IOException ioex) {
-   LOG.info("Could not delete instace base path for 
RocksDB: " + instanceBasePath, ioex);
+   } catch (IOException ex) {
+   LOG.warn("Could not delete instance base path for 
RocksDB: " + instanceBasePath, ex);
}
}
 



flink git commit: [FLIN-6505] Proactively cleanup local FS for RocksDBKeyedStateBackend on startup

2017-11-23 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/master 200612ee0 -> ccf917de2


[FLIN-6505] Proactively cleanup local FS for RocksDBKeyedStateBackend on startup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ccf917de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ccf917de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ccf917de

Branch: refs/heads/master
Commit: ccf917de23ac94b032da11fb536d778f0566792f
Parents: 200612e
Author: Bowen Li 
Authored: Mon Oct 9 22:31:17 2017 -0700
Committer: Stefan Richter 
Committed: Thu Nov 23 12:38:58 2017 +0100

--
 .../state/RocksDBKeyedStateBackend.java | 28 ++--
 1 file changed, 14 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ccf917de/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index f67daab..9185ad0 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -235,20 +235,14 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
this.instanceBasePath = 
Preconditions.checkNotNull(instanceBasePath);
this.instanceRocksDBPath = new File(instanceBasePath, "db");
 
-   if (!instanceBasePath.exists()) {
-   if (!instanceBasePath.mkdirs()) {
-   throw new IOException("Could not create RocksDB 
data directory.");
-   }
+   if (instanceBasePath.exists()) {
+   // Clear the base directory when the backend is created
+   // in case something crashed and the backend never 
reached dispose()
+   cleanInstanceBasePath();
}
 
-   // clean it, this will remove the last part of the path but 
RocksDB will recreate it
-   try {
-   if (instanceRocksDBPath.exists()) {
-   LOG.warn("Deleting already existing db 
directory {}.", instanceRocksDBPath);
-   FileUtils.deleteDirectory(instanceRocksDBPath);
-   }
-   } catch (IOException e) {
-   throw new IOException("Error cleaning RocksDB data 
directory.", e);
+   if (!instanceBasePath.mkdirs()) {
+   throw new IOException("Could not create RocksDB data 
directory.");
}
 
this.keyGroupPrefixBytes = getNumberOfKeyGroups() > 
(Byte.MAX_VALUE + 1) ? 2 : 1;
@@ -312,10 +306,16 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
IOUtils.closeQuietly(dbOptions);
IOUtils.closeQuietly(columnOptions);
 
+   cleanInstanceBasePath();
+   }
+
+   private void cleanInstanceBasePath() {
+   LOG.info("Deleting existing instance base directory {}.", 
instanceBasePath);
+
try {
FileUtils.deleteDirectory(instanceBasePath);
-   } catch (IOException ioex) {
-   LOG.info("Could not delete instace base path for 
RocksDB: " + instanceBasePath, ioex);
+   } catch (IOException ex) {
+   LOG.warn("Could not delete instance base path for 
RocksDB: " + instanceBasePath, ex);
}
}
 



[2/2] flink-web git commit: Rebuild website

2017-11-23 Thread aljoscha
Rebuild website


Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/389b00e6
Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/389b00e6
Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/389b00e6

Branch: refs/heads/asf-site
Commit: 389b00e6779a24404f88707a7c4048546087039d
Parents: 3df7f59
Author: Aljoscha Krettek 
Authored: Thu Nov 23 12:08:13 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 12:08:13 2017 +0100

--
 content/blog/feed.xml | 9 +
 content/news/2017/11/22/release-1.4-and-1.5-timeline.html | 9 +
 2 files changed, 10 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink-web/blob/389b00e6/content/blog/feed.xml
--
diff --git a/content/blog/feed.xml b/content/blog/feed.xml
index f39fda1..b60a59d 100644
--- a/content/blog/feed.xml
+++ b/content/blog/feed.xml
@@ -20,7 +20,7 @@ in the hands of users./p
 
 pThis post will describe how the community plans to get there and the 
rationale behind the approach./p
 
-h2 
id=coming-soon-major-changes-to-the-flinks-runtimeComing soon: 
Major Changes to the Flink’s Runtime/h2
+h2 id=coming-soon-major-changes-to-flinks-runtimeComing 
soon: Major Changes to Flink’s Runtime/h2
 
 pThere are 3 significant improvements to the Apache Flink engine that 
the community has nearly
 completed and that will have a meaningful impact on Flink’s operability and 
performance./p
@@ -33,9 +33,10 @@ completed and that will have a meaningful impact on 
Flink’s operability and pe
 
 pNext, we’ll go through each of these improvements in more 
detail./p
 
-h2 
id=reworking-flinks-deployment-model-and-distributed-processReworking
 Flink’s Deployment Model and Distributed Process/h2
+h2 
id=reworking-flinks-deployment-model-and-distributed-processingReworking
 Flink’s Deployment Model and Distributed Processing/h2
 
-pa 
href=https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077FLIP-6/a;
 is an initiative
+pa 
href=https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077FLIP-6/a;
 (FLIP is short for
+FLink Improvement Proposal and FLIPs are proposals for bigger changes to 
Flink) is an initiative
 that’s been in the works for more than a year and represents a major 
refactor of Flink’s deployment
 model and distributed process. The underlying motivation for FLIP-6 was the 
fact that Flink is being
 adopted by a wider range of developer communities–both developers coming 
from the big data and
@@ -71,7 +72,7 @@ driven network I/O and application-level flow control, 
ensuring that Flink will
 network capacity, as well as credit-based flow control which offers more 
fine-grained backpressuring
 for improved checkpoint alignments./p
 
-pIn our testing (a 
href=https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-nico-kruber-building-a-network-stack-for-optimal-throughput-lowlatency-tradeoffssee
 slide 26 here/a),
+pIn our testing (a 
href=https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-nico-kruber-building-a-network-stack-for-optimal-throughput-lowlatency-tradeoffs#26see
 slide 26 here/a),
 we’ve seen a substantial improvement in latency using event-driven network 
I/O, and the community
 is also doing work to make sure we’re able to provide this increase in speed 
without a measurable
 throughput tradeoff./p

http://git-wip-us.apache.org/repos/asf/flink-web/blob/389b00e6/content/news/2017/11/22/release-1.4-and-1.5-timeline.html
--
diff --git a/content/news/2017/11/22/release-1.4-and-1.5-timeline.html 
b/content/news/2017/11/22/release-1.4-and-1.5-timeline.html
index 8079574..8c2eda9 100644
--- a/content/news/2017/11/22/release-1.4-and-1.5-timeline.html
+++ b/content/news/2017/11/22/release-1.4-and-1.5-timeline.html
@@ -153,7 +153,7 @@ in the hands of users.
 
 This post will describe how the community plans to get there and the 
rationale behind the approach.
 
-Coming soon: Major 
Changes to the Flink’s Runtime
+Coming soon: Major 
Changes to Flink’s Runtime
 
 There are 3 significant improvements to the Apache Flink engine that the 
community has nearly
 completed and that will have a meaningful impact on Flink’s operability and 
performance.
@@ -166,9 +166,10 @@ completed and that will have a meaningful impact on 
Flink’s operability and pe
 
 Next, we’ll go through each of these improvements in more detail.
 
-Reworking 
Flink’s Deployment Model and Distributed Process
+Reworking 
Flink’s Deployment Model and Distributed Processing
 

[1/2] flink-web git commit: Fix small issues in 1.4 to 1.5 blog post

2017-11-23 Thread aljoscha
Repository: flink-web
Updated Branches:
  refs/heads/asf-site c2b139f18 -> 389b00e67


Fix small issues in 1.4 to 1.5 blog post


Project: http://git-wip-us.apache.org/repos/asf/flink-web/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink-web/commit/3df7f594
Tree: http://git-wip-us.apache.org/repos/asf/flink-web/tree/3df7f594
Diff: http://git-wip-us.apache.org/repos/asf/flink-web/diff/3df7f594

Branch: refs/heads/asf-site
Commit: 3df7f594ce905540e47e095704d08b34e1b89811
Parents: c2b139f
Author: Aljoscha Krettek 
Authored: Thu Nov 23 12:07:28 2017 +0100
Committer: Aljoscha Krettek 
Committed: Thu Nov 23 12:07:28 2017 +0100

--
 _posts/2017-11-21-release-1.4-and-1.5-timeline.md | 9 +
 1 file changed, 5 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink-web/blob/3df7f594/_posts/2017-11-21-release-1.4-and-1.5-timeline.md
--
diff --git a/_posts/2017-11-21-release-1.4-and-1.5-timeline.md 
b/_posts/2017-11-21-release-1.4-and-1.5-timeline.md
index 776cc07..b6712ae 100644
--- a/_posts/2017-11-21-release-1.4-and-1.5-timeline.md
+++ b/_posts/2017-11-21-release-1.4-and-1.5-timeline.md
@@ -27,7 +27,7 @@ in the hands of users.
 
 This post will describe how the community plans to get there and the rationale 
behind the approach.
 
-## Coming soon: Major Changes to the Flink’s Runtime
+## Coming soon: Major Changes to Flink’s Runtime
 
 There are 3 significant improvements to the Apache Flink engine that the 
community has nearly
 completed and that will have a meaningful impact on Flink’s operability and 
performance.
@@ -38,9 +38,10 @@ completed and that will have a meaningful impact on 
Flink’s operability and pe
 
 Next, we’ll go through each of these improvements in more detail.
 
-## Reworking Flink’s Deployment Model and Distributed Process
+## Reworking Flink’s Deployment Model and Distributed Processing
 
-[FLIP-6](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)
 is an initiative
+[FLIP-6](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077)
 (FLIP is short for
+FLink Improvement Proposal and FLIPs are proposals for bigger changes to 
Flink) is an initiative
 that’s been in the works for more than a year and represents a major 
refactor of Flink’s deployment
 model and distributed process. The underlying motivation for FLIP-6 was the 
fact that Flink is being
 adopted by a wider range of developer communities--both developers coming from 
the big data and
@@ -74,7 +75,7 @@ driven network I/O and application-level flow control, 
ensuring that Flink will
 network capacity, as well as credit-based flow control which offers more 
fine-grained backpressuring
 for improved checkpoint alignments.
 
-In our testing ([see slide 26 
here](https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-nico-kruber-building-a-network-stack-for-optimal-throughput-lowlatency-tradeoffs)),
+In our testing ([see slide 26 
here](https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-nico-kruber-building-a-network-stack-for-optimal-throughput-lowlatency-tradeoffs#26)),
 we’ve seen a substantial improvement in latency using event-driven network 
I/O, and the community
 is also doing work to make sure we’re able to provide this increase in speed 
without a measurable
 throughput tradeoff.