[flink] branch master updated: [FLINK-11817][docs] Replace fold example in DataStream API Tutorial

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 96fd4c5  [FLINK-11817][docs] Replace fold example in DataStream API 
Tutorial
96fd4c5 is described below

commit 96fd4c5b6c4c781eb5c91c7d9d66029b9eb64be8
Author: leesf <490081...@qq.com>
AuthorDate: Wed Mar 20 18:40:34 2019 +0800

[FLINK-11817][docs] Replace fold example in DataStream API Tutorial
---
 docs/tutorials/datastream_api.md | 62 +---
 1 file changed, 46 insertions(+), 16 deletions(-)

diff --git a/docs/tutorials/datastream_api.md b/docs/tutorials/datastream_api.md
index b0964ee..7f69abe 100644
--- a/docs/tutorials/datastream_api.md
+++ b/docs/tutorials/datastream_api.md
@@ -178,18 +178,33 @@ that we want to aggregate the sum of edited bytes for 
every five seconds:
 {% highlight java %}
 DataStream> result = keyedEdits
 .timeWindow(Time.seconds(5))
-.fold(new Tuple2<>("", 0L), new FoldFunction>() {
+.aggregate(new AggregateFunction, 
Tuple2>() {
 @Override
-public Tuple2 fold(Tuple2 acc, 
WikipediaEditEvent event) {
-acc.f0 = event.getUser();
-acc.f1 += event.getByteDiff();
-return acc;
+public Tuple2 createAccumulator() {
+return new Tuple2<>("", 0L);
+}
+
+@Override
+public Tuple2 add(WikipediaEditEvent value, 
Tuple2 accumulator) {
+accumulator.f0 = value.getUser();
+accumulator.f1 += value.getByteDiff();
+return accumulator;
+}
+
+@Override
+public Tuple2 getResult(Tuple2 
accumulator) {
+return accumulator;
+}
+
+@Override
+public Tuple2 merge(Tuple2 a, 
Tuple2 b) {
+return new Tuple2<>(a.f0, a.f1 + b.f1);
 }
 });
 {% endhighlight %}
 
 The first call, `.timeWindow()`, specifies that we want to have tumbling 
(non-overlapping) windows
-of five seconds. The second call specifies a *Fold transformation* on each 
window slice for
+of five seconds. The second call specifies a *Aggregate transformation* on 
each window slice for
 each unique key. In our case we start from an initial value of `("", 0L)` and 
add to it the byte
 difference of every edit in that time window for a user. The resulting Stream 
now contains
 a `Tuple2` for every user which gets emitted every five seconds.
@@ -212,7 +227,7 @@ The complete code so far is this:
 {% highlight java %}
 package wikiedits;
 
-import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -240,13 +255,28 @@ public class WikipediaAnalysis {
 
 DataStream> result = keyedEdits
   .timeWindow(Time.seconds(5))
-  .fold(new Tuple2<>("", 0L), new FoldFunction>() {
+  .aggregate(new AggregateFunction, Tuple2>() {
 @Override
-public Tuple2 fold(Tuple2 acc, 
WikipediaEditEvent event) {
-  acc.f0 = event.getUser();
-  acc.f1 += event.getByteDiff();
-  return acc;
-}
+   public Tuple2 createAccumulator() {
+ return new Tuple2<>("", 0L);
+   }
+
+   @Override
+   public Tuple2 add(WikipediaEditEvent value, 
Tuple2 accumulator) {
+ accumulator.f0 = value.getUser();
+ accumulator.f1 += value.getByteDiff();
+  return accumulator;
+   }
+
+   @Override
+   public Tuple2 getResult(Tuple2 accumulator) 
{
+ return accumulator;
+   }
+
+   @Override
+   public Tuple2 merge(Tuple2 a, 
Tuple2 b) {
+ return new Tuple2<>(a.f0, a.f1 + b.f1);
+   }
   });
 
 result.print();
@@ -368,9 +398,9 @@ The output of that command should look similar to this, if 
everything went accor
 03/08/2016 15:09:27 Job execution switched to status RUNNING.
 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
 03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
-03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to SCHEDULED
-03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to DEPLOYING
-03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}

[flink] branch master updated: [FLINK-11692][metrics] Add proxy support to datadog reporter

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2f2790c  [FLINK-11692][metrics] Add proxy support to datadog reporter
2f2790c is described below

commit 2f2790ca1354a9a166dcdd1b8a713b5917aae9a9
Author: Scott Mitchell <15306097+scott-mitch...@users.noreply.github.com>
AuthorDate: Wed Mar 20 06:48:49 2019 -0400

[FLINK-11692][metrics] Add proxy support to datadog reporter
---
 docs/monitoring/metrics.md |  4 +++
 .../flink/metrics/datadog/DatadogHttpClient.java   | 22 --
 .../flink/metrics/datadog/DatadogHttpReporter.java |  8 -
 .../metrics/datadog/DatadogHttpClientTest.java | 35 --
 4 files changed, 63 insertions(+), 6 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index ee7426a..4f93a01 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -760,6 +760,8 @@ Parameters:
 
 - `apikey` - the Datadog API key
 - `tags` - (optional) the global tags that will be applied to metrics when 
sending to Datadog. Tags should be separated by comma only
+- `proxyHost` - (optional) The proxy host to use when sending to Datadog.
+- `proxyPort` - (optional) The proxy port to use when sending to Datadog, 
defaults to 8080.
 
 Example configuration:
 
@@ -768,6 +770,8 @@ Example configuration:
 metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter
 metrics.reporter.dghttp.apikey: xxx
 metrics.reporter.dghttp.tags: myflinkapp,prod
+metrics.reporter.dghttp.proxyHost: my.web.proxy.com
+metrics.reporter.dghttp.proxyPort: 8080
 
 {% endhighlight %}
 
diff --git 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
index 9125629..b9772a3 100644
--- 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
+++ 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpClient.java
@@ -32,6 +32,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -51,16 +53,24 @@ public class DatadogHttpClient {
private final OkHttpClient client;
private final String apiKey;
 
-   public DatadogHttpClient(String dgApiKey) {
+   private final String proxyHost;
+   private final int proxyPort;
+
+   public DatadogHttpClient(String dgApiKey, String dgProxyHost, int 
dgProxyPort) {
if (dgApiKey == null || dgApiKey.isEmpty()) {
throw new IllegalArgumentException("Invalid API key:" + 
dgApiKey);
}
-
apiKey = dgApiKey;
+   proxyHost = dgProxyHost;
+   proxyPort = dgProxyPort;
+
+   Proxy proxy = getProxy();
+
client = new OkHttpClient.Builder()
.connectTimeout(TIMEOUT, TimeUnit.SECONDS)
.writeTimeout(TIMEOUT, TimeUnit.SECONDS)
.readTimeout(TIMEOUT, TimeUnit.SECONDS)
+   .proxy(proxy)
.build();
 
seriesUrl = String.format(SERIES_URL_FORMAT, apiKey);
@@ -68,6 +78,14 @@ public class DatadogHttpClient {
validateApiKey();
}
 
+   Proxy getProxy() {
+   if (proxyHost == null) {
+   return Proxy.NO_PROXY;
+   } else {
+   return new Proxy(Proxy.Type.HTTP, new 
InetSocketAddress(proxyHost, proxyPort));
+   }
+   }
+
private void validateApiKey() {
Request r = new 
Request.Builder().url(validateUrl).get().build();
 
diff --git 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
index 5462b32..29d9497 100644
--- 
a/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
+++ 
b/flink-metrics/flink-metrics-datadog/src/main/java/org/apache/flink/metrics/datadog/DatadogHttpReporter.java
@@ -56,6 +56,8 @@ public class DatadogHttpReporter implements MetricReporter, 
Scheduled {
private List configTags;
 
public static final String API_KEY = "apikey";
+   public static final String PROXY_HOST = "proxyHost";
+   public static final String PROXY_PORT = "proxyPort";
public static final String TAGS = "tags";
 
@Override
@@ -102,7 +104,11 @@ public class Datadog

[flink] 01/02: [hotfix][runtime] Delete unused methods from ExecutionGraph

2019-03-20 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d24e20b8b1ac8bf5207ac8f153eeead914bed834
Author: Gary Yao 
AuthorDate: Wed Mar 13 18:44:32 2019 +0100

[hotfix][runtime] Delete unused methods from ExecutionGraph

Delete methods:
  - getRequiredClasspaths()
  - getRequiredJarFiles()
---
 .../runtime/executiongraph/ExecutionGraph.java  | 21 -
 1 file changed, 21 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ff7e123..93e54e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -83,7 +83,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -602,26 +601,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
//  Properties and Status of the Execution Graph
// 

 
-   /**
-* Returns a list of BLOB keys referring to the JAR files required to 
run this job.
-*
-* @return list of BLOB keys referring to the JAR files required to run 
this job
-*/
-   public Collection getRequiredJarFiles() {
-   return jobInformation.getRequiredJarFileBlobKeys();
-   }
-
-   /**
-* Returns a list of classpaths referring to the directories/JAR files 
required to run this job.
-*
-* @return list of classpaths referring to the directories/JAR files 
required to run this job
-*/
-   public Collection getRequiredClasspaths() {
-   return jobInformation.getRequiredClasspathURLs();
-   }
-
-   // 

-
public void setJsonPlan(String jsonPlan) {
this.jsonPlan = jsonPlan;
}



[flink] branch master updated (2f2790c -> 21cff1d)

2019-03-20 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2f2790c  [FLINK-11692][metrics] Add proxy support to datadog reporter
 new d24e20b  [hotfix][runtime] Delete unused methods from ExecutionGraph
 new 21cff1d  [hotfix][runtime] Delete unused interface 
ExecutionStatusListener

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/executiongraph/ExecutionGraph.java | 48 ---
 .../executiongraph/ExecutionStatusListener.java| 54 -
 .../executiongraph/StatusListenerMessenger.java| 70 --
 .../runtime/messages/ExecutionGraphMessages.scala  | 20 ---
 4 files changed, 192 deletions(-)
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/StatusListenerMessenger.java



[flink] 02/02: [hotfix][runtime] Delete unused interface ExecutionStatusListener

2019-03-20 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21cff1d00d4866614c005e005000b4e8ad783142
Author: Gary Yao 
AuthorDate: Wed Mar 13 18:50:06 2019 +0100

[hotfix][runtime] Delete unused interface ExecutionStatusListener

This closes #7977.
---
 .../runtime/executiongraph/ExecutionGraph.java | 27 -
 .../executiongraph/ExecutionStatusListener.java| 54 -
 .../executiongraph/StatusListenerMessenger.java| 70 --
 .../runtime/messages/ExecutionGraphMessages.scala  | 20 ---
 4 files changed, 171 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 93e54e9..5e44e94 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -212,9 +212,6 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
 * (such as from RUNNING to FINISHED). */
private final List jobStatusListeners;
 
-   /** Listeners that receive messages whenever a single task execution 
changes its status. */
-   private final List executionListeners;
-
/** The implementation that decides how to recover the failures of 
tasks. */
private final FailoverStrategy failoverStrategy;
 
@@ -409,7 +406,6 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
this.currentExecutions = new ConcurrentHashMap<>(16);
 
this.jobStatusListeners  = new CopyOnWriteArrayList<>();
-   this.executionListeners = new CopyOnWriteArrayList<>();
 
this.stateTimestamps = new long[JobStatus.values().length];
this.stateTimestamps[JobStatus.CREATED.ordinal()] = 
System.currentTimeMillis();
@@ -1758,12 +1754,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
}
}
 
-   public void registerExecutionListener(ExecutionStatusListener listener) 
{
-   if (listener != null) {
-   executionListeners.add(listener);
-   }
-   }
-
private void notifyJobStatusChange(JobStatus newState, Throwable error) 
{
if (jobStatusListeners.size() > 0) {
final long timestamp = System.currentTimeMillis();
@@ -1784,23 +1774,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
final ExecutionState newExecutionState,
final Throwable error) {
 
-   if (executionListeners.size() > 0) {
-   final ExecutionJobVertex vertex = 
execution.getVertex().getJobVertex();
-   final String message = error == null ? null : 
ExceptionUtils.stringifyException(error);
-   final long timestamp = System.currentTimeMillis();
-
-   for (ExecutionStatusListener listener : 
executionListeners) {
-   try {
-   listener.executionStatusChanged(
-   getJobID(), 
vertex.getJobVertexId(), vertex.getJobVertex().getName(),
-   
vertex.getParallelism(), execution.getParallelSubtaskIndex(),
-   
execution.getAttemptId(), newExecutionState, timestamp, message);
-   } catch (Throwable t) {
-   LOG.warn("Error while notifying 
ExecutionStatusListener", t);
-   }
-   }
-   }
-
// see what this means for us. currently, the first FAILED 
state means -> FAILED
if (newExecutionState == ExecutionState.FAILED) {
final Throwable ex = error != null ? error : new 
FlinkException("Unknown Error (missing cause)");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
deleted file mode 100644
index 6fb5a1a..000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionStatusListener.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in comp

[flink] branch master updated (21cff1d -> b5c5499)

2019-03-20 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 21cff1d  [hotfix][runtime] Delete unused interface 
ExecutionStatusListener
 new aee1f18  [hotfix] Fix inactive branch in TtlStateTestBase
 new 284e448  [FLINK-11980] Improve efficiency of iterating 
KeySelectionListener on notification
 new b5c5499  [hotfix] Remove unused method in AbstractKeyedStateBackend

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/state/AbstractKeyedStateBackend.java | 20 +++-
 .../flink/runtime/state/ttl/TtlStateTestBase.java|  2 +-
 2 files changed, 8 insertions(+), 14 deletions(-)



[flink] 03/03: [hotfix] Remove unused method in AbstractKeyedStateBackend

2019-03-20 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5c5499789f4e2ab21c4a05bf3da8ce7408b9663
Author: Stefan Richter 
AuthorDate: Wed Mar 20 12:35:04 2019 +0100

[hotfix] Remove unused method in AbstractKeyedStateBackend
---
 .../org/apache/flink/runtime/state/AbstractKeyedStateBackend.java | 8 
 1 file changed, 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 062092e..ab4cbf7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -206,12 +204,6 @@ public abstract class AbstractKeyedStateBackend 
implements
return keySerializerProvider.currentSchemaSerializer();
}
 
-   public TypeSerializerSchemaCompatibility 
checkKeySerializerSchemaCompatibility(
-   TypeSerializerSnapshot 
previousKeySerializerSnapshot) {
-
-   return 
keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(previousKeySerializerSnapshot);
-   }
-
/**
 * @see KeyedStateBackend
 */



[flink] 01/03: [hotfix] Fix inactive branch in TtlStateTestBase

2019-03-20 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aee1f182e8fbd311713d1661ab61755b069b1a6c
Author: Stefan Richter 
AuthorDate: Tue Mar 19 15:00:50 2019 +0100

[hotfix] Fix inactive branch in TtlStateTestBase
---
 .../test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index e6d5ba3..5c92e67 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -515,7 +515,7 @@ public abstract class TtlStateTestBase {
// trigger more cleanup by doing something out side of 
INC_CLEANUP_ALL_KEYS
for (int i = INC_CLEANUP_ALL_KEYS; i < INC_CLEANUP_ALL_KEYS * 
2; i++) {
sbetc.setCurrentKey(Integer.toString(i));
-   if (i / 2 == 0) {
+   if (i % 2 == 0) {
ctx().get();
} else {
ctx().update(ctx().updateEmpty);



[flink] 02/03: [FLINK-11980] Improve efficiency of iterating KeySelectionListener on notification

2019-03-20 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 284e4486e57270c63b4f08d56599b1b4f97006c7
Author: Stefan Richter 
AuthorDate: Tue Mar 19 15:01:06 2019 +0100

[FLINK-11980] Improve efficiency of iterating KeySelectionListener on 
notification

KeySelectionListener was introduced for incremental TTL state cleanup as a 
driver of the cleanup process. Listeners are notified whenever the current key 
in the backend is set (i.e. for every event). The current implementation of the 
collection that holds the listener is a HashSet, iterated via forEach on each 
key change. This method comes with the overhead of creating temporaray objects, 
e.g. iterators, on every invocation and even if there is no listener 
registered. We should rather [...]

This closes #8020.
---
 .../flink/runtime/state/AbstractKeyedStateBackend.java   | 12 +++-
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e28aeef..062092e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,9 +36,8 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -62,7 +61,7 @@ public abstract class AbstractKeyedStateBackend implements
private K currentKey;
 
/** Listeners to changes of keyed context ({@link #currentKey}). */
-   private final Set> keySelectionListeners;
+   private final ArrayList> keySelectionListeners;
 
/** The key group of the currently active key. */
private int currentKeyGroup;
@@ -142,7 +141,7 @@ public abstract class AbstractKeyedStateBackend 
implements
this.executionConfig = executionConfig;
this.keyGroupCompressionDecorator = 
keyGroupCompressionDecorator;
this.ttlTimeProvider = 
Preconditions.checkNotNull(ttlTimeProvider);
-   this.keySelectionListeners = new HashSet<>();
+   this.keySelectionListeners = new ArrayList<>(1);
}
 
private static StreamCompressionDecorator 
determineStreamCompression(ExecutionConfig executionConfig) {
@@ -183,7 +182,10 @@ public abstract class AbstractKeyedStateBackend 
implements
}
 
private void notifyKeySelected(K newKey) {
-   keySelectionListeners.forEach(listener -> 
listener.keySelected(newKey));
+   // we prefer a for-loop over other iteration schemes for 
performance reasons here.
+   for (int i = 0; i < keySelectionListeners.size(); ++i) {
+   keySelectionListeners.get(i).keySelected(newKey);
+   }
}
 
@Override



[flink] 02/03: [FLINK-11980] Improve efficiency of iterating KeySelectionListener on notification

2019-03-20 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ae91fd3bb5d1d132885399e7cc259a59459b8c6e
Author: Stefan Richter 
AuthorDate: Tue Mar 19 15:01:06 2019 +0100

[FLINK-11980] Improve efficiency of iterating KeySelectionListener on 
notification

KeySelectionListener was introduced for incremental TTL state cleanup as
 a driver of the cleanup process. Listeners are notified whenever the
 current key in the backend is set (i.e. for every event). The current
implementation of the collection that holds the listener is a HashSet,
iterated via forEach on each key change. This method comes with the
overhead of creating temporaray objects, e.g. iterators, on every
invocation and even if there is no listener registered. We should rather
use an ArrayList with for-loop iteration in this hot code path to i)
minimize overhead and ii) minimize costs for the very likely case that
there is no listener at all.

This closes #8020.

(cherry picked from commit 284e4486e57270c63b4f08d56599b1b4f97006c7)
---
 .../flink/runtime/state/AbstractKeyedStateBackend.java   | 12 +++-
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index e28aeef..062092e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,9 +36,8 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
 import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -62,7 +61,7 @@ public abstract class AbstractKeyedStateBackend implements
private K currentKey;
 
/** Listeners to changes of keyed context ({@link #currentKey}). */
-   private final Set> keySelectionListeners;
+   private final ArrayList> keySelectionListeners;
 
/** The key group of the currently active key. */
private int currentKeyGroup;
@@ -142,7 +141,7 @@ public abstract class AbstractKeyedStateBackend 
implements
this.executionConfig = executionConfig;
this.keyGroupCompressionDecorator = 
keyGroupCompressionDecorator;
this.ttlTimeProvider = 
Preconditions.checkNotNull(ttlTimeProvider);
-   this.keySelectionListeners = new HashSet<>();
+   this.keySelectionListeners = new ArrayList<>(1);
}
 
private static StreamCompressionDecorator 
determineStreamCompression(ExecutionConfig executionConfig) {
@@ -183,7 +182,10 @@ public abstract class AbstractKeyedStateBackend 
implements
}
 
private void notifyKeySelected(K newKey) {
-   keySelectionListeners.forEach(listener -> 
listener.keySelected(newKey));
+   // we prefer a for-loop over other iteration schemes for 
performance reasons here.
+   for (int i = 0; i < keySelectionListeners.size(); ++i) {
+   keySelectionListeners.get(i).keySelected(newKey);
+   }
}
 
@Override



[flink] 01/03: [hotfix] Fix inactive branch in TtlStateTestBase

2019-03-20 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0ced11c8be7ccaf5112f3e050406cf75d5e55637
Author: Stefan Richter 
AuthorDate: Tue Mar 19 15:00:50 2019 +0100

[hotfix] Fix inactive branch in TtlStateTestBase

(cherry picked from commit aee1f182e8fbd311713d1661ab61755b069b1a6c)
---
 .../test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index e6d5ba3..5c92e67 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -515,7 +515,7 @@ public abstract class TtlStateTestBase {
// trigger more cleanup by doing something out side of 
INC_CLEANUP_ALL_KEYS
for (int i = INC_CLEANUP_ALL_KEYS; i < INC_CLEANUP_ALL_KEYS * 
2; i++) {
sbetc.setCurrentKey(Integer.toString(i));
-   if (i / 2 == 0) {
+   if (i % 2 == 0) {
ctx().get();
} else {
ctx().update(ctx().updateEmpty);



[flink] branch release-1.8 updated (f421f32 -> 2ab4ae7)

2019-03-20 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f421f32  Update NOTICE-binary with latest changes
 new 0ced11c  [hotfix] Fix inactive branch in TtlStateTestBase
 new ae91fd3  [FLINK-11980] Improve efficiency of iterating 
KeySelectionListener on notification
 new 2ab4ae7  [hotfix] Remove unused method in AbstractKeyedStateBackend

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/state/AbstractKeyedStateBackend.java | 20 +++-
 .../flink/runtime/state/ttl/TtlStateTestBase.java|  2 +-
 2 files changed, 8 insertions(+), 14 deletions(-)



[flink] 03/03: [hotfix] Remove unused method in AbstractKeyedStateBackend

2019-03-20 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2ab4ae7e862f98cffe9be187292cea7c6ed2e05c
Author: Stefan Richter 
AuthorDate: Wed Mar 20 12:35:04 2019 +0100

[hotfix] Remove unused method in AbstractKeyedStateBackend

(cherry picked from commit b5c5499789f4e2ab21c4a05bf3da8ce7408b9663)
---
 .../org/apache/flink/runtime/state/AbstractKeyedStateBackend.java | 8 
 1 file changed, 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 062092e..ab4cbf7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -23,8 +23,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -206,12 +204,6 @@ public abstract class AbstractKeyedStateBackend 
implements
return keySerializerProvider.currentSchemaSerializer();
}
 
-   public TypeSerializerSchemaCompatibility 
checkKeySerializerSchemaCompatibility(
-   TypeSerializerSnapshot 
previousKeySerializerSnapshot) {
-
-   return 
keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(previousKeySerializerSnapshot);
-   }
-
/**
 * @see KeyedStateBackend
 */



[flink] branch master updated: Minor example typo fix

2019-03-20 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f9ec828  Minor example typo fix
f9ec828 is described below

commit f9ec828b91fc475f6169a7edce5c4e23f7e4407e
Author: Ankur 
AuthorDate: Tue Mar 19 16:44:39 2019 -0700

Minor example typo fix
---
 docs/dev/batch/dataset_transformations.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/dev/batch/dataset_transformations.md 
b/docs/dev/batch/dataset_transformations.md
index cf2da3f..112572f 100644
--- a/docs/dev/batch/dataset_transformations.md
+++ b/docs/dev/batch/dataset_transformations.md
@@ -593,7 +593,7 @@ public class DistinctReduce
   String next = t.f1;
 
   // check if strings are different
-  if (com == null || !next.equals(comp)) {
+  if (comp == null || !next.equals(comp)) {
 out.collect(new Tuple2(key, next));
 comp = next;
   }



[flink] branch cron-master-e2e updated: [hotfix][e2e] Enable kinesis profile

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch cron-master-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/cron-master-e2e by this push:
 new 135ba12  [hotfix][e2e] Enable kinesis profile
135ba12 is described below

commit 135ba128d093ac48cd6031056b457f2c60788294
Author: zentol 
AuthorDate: Wed Mar 20 15:39:28 2019 +0100

[hotfix][e2e] Enable kinesis profile
---
 .travis.yml | 36 ++--
 1 file changed, 18 insertions(+), 18 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 78d7eac..5c8891d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -50,92 +50,92 @@ matrix:
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -De2e-metrics"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -De2e-metrics 
-Dinclude-kinesis"
   - SCRIPT="split_misc.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dinclude-kinesis"
   - SCRIPT="split_ha.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dinclude-kinesis"
   - SCRIPT="split_sticky.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dinclude-kinesis"
   - SCRIPT="split_checkpoints.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dinclude-kinesis"
   - SCRIPT="split_container.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dinclude-kinesis"
   - SCRIPT="split_heavy.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-De2e-metrics"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-De2e-metrics -Dinclude-kinesis"
   - SCRIPT="split_misc.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-Dinclude-kinesis"
   - SCRIPT="split_ha.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-Dinclude-kinesis"
   - SCRIPT="split_sticky.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-Dinclude-kinesis"
   - SCRIPT="split_checkpoints.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-Dinclude-kinesis"
   - SCRIPT="split_container.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+  - PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-Dinclude-kinesis"
   - SCRIPT="split_heavy.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE="-De2e-metrics"
+  - PROFILE="-De2e-metrics -Dinclude-kinesis"
   - SCRIPT="split_misc_hadoopfree.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE=""
+  - PROFILE="-Dinclude-kinesis"
   - SCRIPT="split_ha.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE=""
+  - PROFILE="-Dinclude-kinesis"
   - SCRIPT="split_sticky.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE=""
+  - PROFILE="-Dinclude-kinesis"
   - SCRIPT="split_checkpoints.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE=""
+  - PROFILE="-Dinclude-kinesis"
   - SCRIPT="split_container.sh"
 - env:
   - REMOTE="apache"
   - BRANCH="master"
-  - PROFILE=""
+  - PROFILE="-Dinclude-kinesis"
   - SCRIPT="split_heavy.sh"
 
 git:



[flink] branch master updated: [FLINK-11825][StateBackends] Resolve name class of StateTTL TimeCharacteristic class.

2019-03-20 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 6a66f7a  [FLINK-11825][StateBackends] Resolve name class of StateTTL 
TimeCharacteristic class.
6a66f7a is described below

commit 6a66f7acf370e12ad65ee24293ed47d2c5db225c
Author: klion26 
AuthorDate: Thu Mar 7 15:07:19 2019 +0800

[FLINK-11825][StateBackends] Resolve name class of StateTTL 
TimeCharacteristic class.

We can not remove the class StateTtlConfig#TimeCharacteristic and use 
org.apache.flink.streaming.api.TimeCharacteristic directly,
because StateTtlConfig locates in module flink-core and 
org.apache.flink.streaming.api.TimeCharacteristic locates in 
flink-streaming-java,
so we choice to rename StateTtlConfig#TimeCharacteristic.

changes include:
- Deprecated the StateTtlConfig#TimeCharacteristic class (for 
backward-compatibility).
- Introduce a new class named StateTtlConfig#TtlTimeCharacteristic.
---
 .../flink/api/common/state/StateTtlConfig.java | 62 --
 1 file changed, 46 insertions(+), 16 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index 5bb44d1..2a78f19 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.state;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.Preconditions;
 
@@ -29,8 +30,10 @@ import java.io.Serializable;
 import java.util.EnumMap;
 
 import static 
org.apache.flink.api.common.state.StateTtlConfig.StateVisibility.NeverReturnExpired;
-import static 
org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristic.ProcessingTime;
+import static 
org.apache.flink.api.common.state.StateTtlConfig.TtlTimeCharacteristic.ProcessingTime;
 import static 
org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCreateAndWrite;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Configuration of state TTL logic.
@@ -41,6 +44,7 @@ import static 
org.apache.flink.api.common.state.StateTtlConfig.UpdateType.OnCrea
  * it can be wrapped with {@link 
org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
  * at the cost of an extra byte in the serialized form.
  */
+@PublicEvolving
 public class StateTtlConfig implements Serializable {
 
private static final long serialVersionUID = -7592693245044289793L;
@@ -72,31 +76,41 @@ public class StateTtlConfig implements Serializable {
 
/**
 * This option configures time scale to use for ttl.
+*
+* @deprecated will be removed in a future version in favor of {@link 
TtlTimeCharacteristic}
 */
+   @Deprecated
public enum TimeCharacteristic {
/** Processing time, see also 
TimeCharacteristic.ProcessingTime. */
ProcessingTime
}
 
+   /**
+* This option configures time scale to use for ttl.
+*/
+   public enum TtlTimeCharacteristic {
+   /** Processing time, see also 
org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime. 
*/
+   ProcessingTime
+   }
+
private final UpdateType updateType;
private final StateVisibility stateVisibility;
-   private final TimeCharacteristic timeCharacteristic;
+   private final TtlTimeCharacteristic ttlTimeCharacteristic;
private final Time ttl;
private final CleanupStrategies cleanupStrategies;
 
private StateTtlConfig(
UpdateType updateType,
StateVisibility stateVisibility,
-   TimeCharacteristic timeCharacteristic,
+   TtlTimeCharacteristic ttlTimeCharacteristic,
Time ttl,
CleanupStrategies cleanupStrategies) {
-   this.updateType = Preconditions.checkNotNull(updateType);
-   this.stateVisibility = 
Preconditions.checkNotNull(stateVisibility);
-   this.timeCharacteristic = 
Preconditions.checkNotNull(timeCharacteristic);
-   this.ttl = Preconditions.checkNotNull(ttl);
+   this.updateType = checkNotNull(updateType);
+   this.stateVisibility = checkNotNull(stateVisibility);
+   this.ttlTimeCharacteristic = 
checkNotNull(ttlTimeCharacteristic);
+   this.ttl = checkNotNull(ttl);
this.cleanupStrategies = cleanupStrategies;
-   Preconditions.checkArgument(ttl.toMilliseconds() > 0,
- 

[flink] branch master updated (6a66f7a -> 339ca78)

2019-03-20 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 6a66f7a  [FLINK-11825][StateBackends] Resolve name class of StateTTL 
TimeCharacteristic class.
 new 81afd21  [hotfix][table] Move UnresolvedCallExpression & 
TableReferenceExpression to table-api-java module
 new 5776709  [hotfix][table] Removed unnecessary casts in 
ApiExpressionUtils
 new 339ca78  [hotfix][table] Introduced UnresolvedFieldReferenceExpression 
& FieldReferenceExpression expressions

The 16150 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../table/expressions/ApiExpressionUtils.java  |  16 ++-
 .../table/expressions/ApiExpressionVisitor.java|   4 +
 .../expressions/TableReferenceExpression.java  |   0
 .../expressions/UnresolvedCallExpression.java  |   0
 .../UnresolvedFieldReferenceExpression.java}   |  21 ++--
 .../expressions/FieldReferenceExpression.java  |  50 +++---
 .../flink/table/expressions/ExpressionTest.java|   2 +-
 .../flink/table/expressions/RexNodeConverter.java  |   8 +-
 .../flink/table/functions/AvgAggFunction.java  |  10 +-
 .../functions/DeclarativeAggregateFunction.java|  36 +++
 .../table/codegen/agg/DeclarativeAggCodeGen.scala  | 108 +++--
 .../flink/table/api/scala/expressionDsl.scala  |   4 +-
 .../flink/table/expressions/ExpressionBridge.scala |   5 +
 .../expressions/PlannerExpressionConverter.scala   |  35 ---
 .../expressions/PlannerExpressionParserImpl.scala  |   4 +-
 .../flink/table/expressions/KeywordParseTest.scala |  12 +--
 16 files changed, 176 insertions(+), 139 deletions(-)
 rename flink-table/{flink-table-planner => 
flink-table-api-java}/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 (88%)
 rename flink-table/{flink-table-planner => 
flink-table-api-java}/src/main/java/org/apache/flink/table/expressions/ApiExpressionVisitor.java
 (87%)
 copy flink-table/{flink-table-planner => 
flink-table-api-java}/src/main/java/org/apache/flink/table/expressions/TableReferenceExpression.java
 (100%)
 rename flink-table/{flink-table-planner => 
flink-table-api-java}/src/main/java/org/apache/flink/table/expressions/UnresolvedCallExpression.java
 (100%)
 rename 
flink-table/{flink-table-planner/src/main/java/org/apache/flink/table/expressions/TableReferenceExpression.java
 => 
flink-table-api-java/src/main/java/org/apache/flink/table/expressions/UnresolvedFieldReferenceExpression.java}
 (72%)



[flink] branch master updated (339ca78 -> 9ffd931)

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 339ca78  [hotfix][table] Introduced UnresolvedFieldReferenceExpression 
& FieldReferenceExpression expressions
 new 7f66eb5  [FLINK-11786][travis] Simplify stage selection
 new 1c65dcf  [FLINK-11786][travis] Run main profile only on pr/push
 new a5d64e6  [FLINK-11786][travis] Merge cron jobs
 new 9ffd931  [FLINK-11786][travis] Setup notifications

The 16154 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .travis.yml| 209 +++--
 tools/{test_deploy_to_maven.sh => travis/docs.sh}  |  17 +-
 tools/travis/nightly.sh|  82 
 tools/travis/splits/split_checkpoints.sh   |  78 
 .../travis/splits/split_container.sh   |  29 +--
 tools/travis/splits/split_ha.sh|  59 ++
 .../travis/splits/split_heavy.sh   |  31 +--
 tools/travis/splits/split_misc.sh  |  77 
 tools/travis/splits/split_misc_hadoopfree.sh   |  71 +++
 .../travis/splits/split_sticky.sh  |  32 ++--
 tools/travis/stage.sh  |  66 ++-
 tools/travis_controller.sh |  44 +
 tools/travis_mvn_watchdog.sh   |  40 ++--
 13 files changed, 692 insertions(+), 143 deletions(-)
 copy tools/{test_deploy_to_maven.sh => travis/docs.sh} (83%)
 create mode 100755 tools/travis/nightly.sh
 create mode 100755 tools/travis/splits/split_checkpoints.sh
 copy flink-end-to-end-tests/run-single-test.sh => 
tools/travis/splits/split_container.sh (64%)
 create mode 100755 tools/travis/splits/split_ha.sh
 copy flink-end-to-end-tests/run-single-test.sh => 
tools/travis/splits/split_heavy.sh (61%)
 create mode 100755 tools/travis/splits/split_misc.sh
 create mode 100755 tools/travis/splits/split_misc_hadoopfree.sh
 copy flink-end-to-end-tests/run-single-test.sh => 
tools/travis/splits/split_sticky.sh (55%)



[flink] branch ee created (now 9ffd931)

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch ee
in repository https://gitbox.apache.org/repos/asf/flink.git.


  at 9ffd931  [FLINK-11786][travis] Setup notifications

No new revisions were added by this update.



[flink] branch cron-master-jdk9 deleted (was 6e46ff5)

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch cron-master-jdk9
in repository https://gitbox.apache.org/repos/asf/flink.git.


 was 6e46ff5  [FLINK-11064] [table] Setup a new flink-table module structure

This change permanently discards the following revisions:

 discard 6e46ff5  [FLINK-11064] [table] Setup a new flink-table module structure
 discard 1ddb338  [hotfix][travis] Remove stray slash
 discard ffdce45  [FLINK-11628][travis] Cache maven
 discard 700e5a6  skip flink-s3-fs-presto
 discard 08446df  Disable flink-tests
 discard 0021bee  skip flink-s3-fs-hadoop
 discard 2c7d5a5  Exclude projects to make build pass with Java 9
 discard 0017c6c  Setup master java 9 test



[flink] branch cron-master-e2e deleted (was 135ba12)

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch cron-master-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git.


 was 135ba12  [hotfix][e2e] Enable kinesis profile

This change permanently discards the following revisions:

 discard 135ba12  [hotfix][e2e] Enable kinesis profile
 discard 1f2fb9e  [hotfix][travis] Remove stray slash
 discard 76eccb6  [FLINK-11628][travis] Cache maven



[flink] branch cron-master-hadoop24 deleted (was 08d7b51)

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch cron-master-hadoop24
in repository https://gitbox.apache.org/repos/asf/flink.git.


 was 08d7b51  [hotfix][travis] Remove stray slash

This change permanently discards the following revisions:

 discard 08d7b51  [hotfix][travis] Remove stray slash
 discard 8aed4fa  [FLINK-11628][travis] Cache maven



[flink] branch ee deleted (was 9ffd931)

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch ee
in repository https://gitbox.apache.org/repos/asf/flink.git.


 was 9ffd931  [FLINK-11786][travis] Setup notifications

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



[flink] branch master updated: [hotfix][e2e] Enable kinesis profile

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 559b7a2  [hotfix][e2e] Enable kinesis profile
559b7a2 is described below

commit 559b7a26fe865f1e34789c52d254194d26ba69ed
Author: Chesnay Schepler 
AuthorDate: Wed Mar 20 19:27:43 2019 +0100

[hotfix][e2e] Enable kinesis profile
---
 .travis.yml | 36 ++--
 1 file changed, 18 insertions(+), 18 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 7f3d9da..5a070c6 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -222,57 +222,57 @@ jobs:
   name: Documentation links check
 # E2E profile
 - stage: E2E
-  env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -De2e-metrics"
+  env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -De2e-metrics 
-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_misc.sh
   name: misc - hadoop 2.8
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_ha.sh
   name: ha - hadoop 2.8
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_sticky.sh
   name: sticky - hadoop 2.8
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_checkpoints.sh
   name: checkpoints - hadoop 2.8
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_container.sh
   name: container - hadoop 2.8
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_heavy.sh
   name: heavy - hadoop 2.8
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-De2e-metrics"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-De2e-metrics -Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_misc.sh
   name: misc - scala 2.12
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_ha.sh
   name: ha - scala 2.12
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_sticky.sh
   name: sticky - scala 2.12
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_checkpoints.sh
   name: checkpoints - scala 2.12
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_container.sh
   name: container - scala 2.12
-- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12"
+- env: PROFILE="-Pinclude-hadoop -Dhadoop.version=2.8.3 -Dscala-2.12 
-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_heavy.sh
   name: heavy - scala 2.12
-- env: PROFILE="-De2e-metrics"
+- env: PROFILE="-De2e-metrics -Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_misc_hadoopfree.sh
   name: misc
-- env: PROFILE=""
+- env: PROFILE="-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_ha.sh
   name: ha
-- env: PROFILE=""
+- env: PROFILE="-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_sticky.sh
   name: sticky
-- env: PROFILE=""
+- env: PROFILE="-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_checkpoints.sh
   name: checkpoints
-- env: PROFILE=""
+- env: PROFILE="-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_container.sh
   name: container
-- env: PROFILE=""
+- env: PROFILE="-Dinclude-kinesis"
   script: ./tools/travis/nightly.sh split_heavy.sh
   name: heavy



[flink] branch cron-master-scala212 deleted (was a38b5da)

2019-03-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch cron-master-scala212
in repository https://gitbox.apache.org/repos/asf/flink.git.


 was a38b5da  [hotfix][travis] Remove stray slash

This change permanently discards the following revisions:

 discard a38b5da  [hotfix][travis] Remove stray slash
 discard 1b8083d  [FLINK-11628][travis] Cache maven
 discard ad747c0  Fix hadoop issue
 discard 458a371  Use Scala 2.12 and more modern Hadoop version



[flink] branch master updated: [FLINK-11971] Fix kubernetes check in end-to-end test

2019-03-20 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 56d81e0  [FLINK-11971] Fix kubernetes check in end-to-end test
56d81e0 is described below

commit 56d81e0dcadd07d634f92ac464a49d3313f56621
Author: Aljoscha Krettek 
AuthorDate: Wed Mar 20 16:20:05 2019 +0100

[FLINK-11971] Fix kubernetes check in end-to-end test

Before, this was using string parsing but the output of minikube status
depends on the minikube version and the system.

This now simply uses the status code.

This closes #8024.
---
 flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh 
b/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh
index 96320fd..d37deca 100755
--- a/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh
+++ b/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh
@@ -40,8 +40,7 @@ function cleanup {
 }
 
 function check_kubernetes_status {
-local status=`minikube status`
-echo ${status} | grep -q "minikube: Running cluster: Running kubectl: 
Correctly Configured"
+minikube status
 return $?
 }
 
@@ -50,7 +49,8 @@ function start_kubernetes_if_not_running {
 minikube start
 fi
 
-return $(check_kubernetes_status)
+check_kubernetes_status
+return $?
 }
 
 trap cleanup EXIT



[flink] branch release-1.8 updated: [FLINK-11971] Fix kubernetes check in end-to-end test

2019-03-20 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new eb57156  [FLINK-11971] Fix kubernetes check in end-to-end test
eb57156 is described below

commit eb571567ccbf5cb663e91cbbe3d9d8685b6b52fa
Author: Aljoscha Krettek 
AuthorDate: Wed Mar 20 16:20:05 2019 +0100

[FLINK-11971] Fix kubernetes check in end-to-end test

Before, this was using string parsing but the output of minikube status
depends on the minikube version and the system.

This now simply uses the status code.

This closes #8024.
---
 flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh 
b/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh
index 96320fd..d37deca 100755
--- a/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh
+++ b/flink-end-to-end-tests/test-scripts/test_kubernetes_embedded_job.sh
@@ -40,8 +40,7 @@ function cleanup {
 }
 
 function check_kubernetes_status {
-local status=`minikube status`
-echo ${status} | grep -q "minikube: Running cluster: Running kubectl: 
Correctly Configured"
+minikube status
 return $?
 }
 
@@ -50,7 +49,8 @@ function start_kubernetes_if_not_running {
 minikube start
 fi
 
-return $(check_kubernetes_status)
+check_kubernetes_status
+return $?
 }
 
 trap cleanup EXIT



[flink] branch master updated: [FLINK-11872][table-runtime-blink] update lz4 license file. (#7952)

2019-03-20 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 5bd7ed4  [FLINK-11872][table-runtime-blink] update lz4 license file. 
(#7952)
5bd7ed4 is described below

commit 5bd7ed4364497577709e9225f0522adb0b428e7a
Author: Kurt Young 
AuthorDate: Thu Mar 21 08:44:13 2019 +0800

[FLINK-11872][table-runtime-blink] update lz4 license file. (#7952)
---
 flink-table/flink-table-runtime-blink/pom.xml   | 21 +
 .../src/main/resources/META-INF/NOTICE  |  4 
 2 files changed, 25 insertions(+)

diff --git a/flink-table/flink-table-runtime-blink/pom.xml 
b/flink-table/flink-table-runtime-blink/pom.xml
index 17f200a..458438b 100644
--- a/flink-table/flink-table-runtime-blink/pom.xml
+++ b/flink-table/flink-table-runtime-blink/pom.xml
@@ -93,4 +93,25 @@ under the License.
test


+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   
+   
+   
+   
org.lz4:lz4-java
+   
+   
+   
+   
+   
+   
+   
+   
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/resources/META-INF/NOTICE 
b/flink-table/flink-table-runtime-blink/src/main/resources/META-INF/NOTICE
index 518d83e..d68d0d6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/resources/META-INF/NOTICE
+++ b/flink-table/flink-table-runtime-blink/src/main/resources/META-INF/NOTICE
@@ -3,3 +3,7 @@ Copyright 2014-2019 The Apache Software Foundation
 
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- org.lz4:lz4-java:1.5.0