Build failed in Jenkins: flink-snapshot-deployment #1110

2019-02-20 Thread Apache Jenkins Server
See 


--
[...truncated 81.43 KB...]
[WARNING] maven-shade-plugin has detected that some class files are
[WARNING] present in two or more JARs. When this happens, only one
[WARNING] single version of the class is copied to the uber jar.
[WARNING] Usually this is not harmful and you can skip these warnings,
[WARNING] otherwise try to manually exclude artifacts based on
[WARNING] mvn dependency:tree -Ddetail=true and the above output.
[WARNING] See http://maven.apache.org/plugins/maven-shade-plugin/
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing 

 with 

[INFO] Dependency-reduced POM written at: 

[INFO] 
[INFO] >>> maven-source-plugin:2.2.1:jar (attach-sources) > generate-sources @ 
flink-shaded-hadoop2 >>>
[INFO] 
[INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ flink-shaded-hadoop2 
---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ 
flink-shaded-hadoop2 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven) @ 
flink-shaded-hadoop2 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-versions) @ 
flink-shaded-hadoop2 ---
[INFO] 
[INFO] --- directory-maven-plugin:0.1:highest-basedir (directories) @ 
flink-shaded-hadoop2 ---
[INFO] Highest basedir set to: 

[INFO] 
[INFO] <<< maven-source-plugin:2.2.1:jar (attach-sources) < generate-sources @ 
flink-shaded-hadoop2 <<<
[INFO] 
[INFO] --- maven-source-plugin:2.2.1:jar (attach-sources) @ 
flink-shaded-hadoop2 ---
[INFO] Building jar: 

[INFO] 
[INFO] --- maven-javadoc-plugin:2.9.1:jar (attach-javadocs) @ 
flink-shaded-hadoop2 ---
[INFO] 
[INFO] --- maven-surefire-plugin:2.22.1:test (integration-tests) @ 
flink-shaded-hadoop2 ---
[INFO] Tests are skipped.
[INFO] 
[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ 
flink-shaded-hadoop2 ---
[INFO] Installing 

 to 
/home/jenkins/.m2/repository/org/apache/flink/flink-shaded-hadoop2/1.8-SNAPSHOT-2.4.1/flink-shaded-hadoop2-1.8-SNAPSHOT-2.4.1.jar
[INFO] Installing 

 to 
/home/jenkins/.m2/repository/org/apache/flink/flink-shaded-hadoop2/1.8-SNAPSHOT-2.4.1/flink-shaded-hadoop2-1.8-SNAPSHOT-2.4.1.pom
[INFO] Installing 

 to 
/home/jenkins/.m2/repository/org/apache/flink/flink-shaded-hadoop2/1.8-SNAPSHOT-2.4.1/flink-shaded-hadoop2-1.8-SNAPSHOT-2.4.1-sources.jar
[INFO] 
[INFO] --- maven-deploy-plugin:2.8.2:deploy (default-deploy) @ 
flink-shaded-hadoop2 ---
[INFO] Uploading: 
https://repository.apache.org/service/local/staging/deploy/maven2/org/apache/flink/flink-shaded-hadoop2/1.8-SNAPSHOT-2.4.1/flink-shaded-hadoop2-1.8-SNAPSHOT-2.4.1.jar
[INFO] Uploading: 
https://repository.apache.org/service/local/staging/deploy/maven2/org/apache/flink/flink-shaded-hadoop2/1.8-SNAPSHOT-2.4.1/flink-shaded-hadoop2-1.8-SNAPSHOT-2.4.1.pom
[WARNING] Encountered issue during deployment: Failed to deploy artifacts: 
Could not transfer artifact 
org.apache.flink:flink-shaded-hadoop2:jar:1.8-SNAPSHOT-2.4.1 from/to 
apache.releases.https 
(https://repository.apache.org/service/local/staging/deploy/maven2): Failed to 
transfer file: 
https://repository.apache.org/service/local/staging/deploy/maven2/org/apache/flink/flink-shaded-hadoop2/1.8-SNAPSHOT-2.4.1/flink-shaded-hadoop2-1.8-SNAPSHOT-2.4.1.jar.
 Return code is: 401, ReasonPhrase: Unauthorized.
[INFO] Retrying deployment attempt 2 of 10
[INFO] Uploading: 
https://repository.apache.org/service/local/staging/deploy/maven2/org/apache/flink/flink-shaded-hadoop2/1.8-SNAPSHOT-2.4.1/flink-shaded-hadoop2-1.8-SNAPSHOT-2.4.1.jar
[INFO] Uploading: 
https://repository.apache.org/service/local/staging/deploy/maven2/org/apache/flink/flink-shaded-hadoop2/1.8-SNAPSHOT-2.4.1/flink-shaded-hadoop2-1.8-SNAPSHOT-2.4.1.pom
[WARNING] Encountered issue during deployment: 

buildbot failure in on flink-docs-release-1.7

2019-02-20 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-release-1.7 while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-1.7/builds/83

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave2_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.7' 
triggered this build
Build Source Stamp: [branch release-1.7] HEAD
Blamelist: 

BUILD FAILED: failed

Sincerely,
 -The Buildbot





[flink] 09/13: [hotfix] Fix checkstyle violations in ArchivedExecutionGraphTest

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a54f8c8604db181ec2452c064fb6aa87f7651b2f
Author: Till Rohrmann 
AuthorDate: Tue Feb 19 15:53:51 2019 +0100

[hotfix] Fix checkstyle violations in ArchivedExecutionGraphTest
---
 .../flink/runtime/executiongraph/ArchivedExecutionGraphTest.java   | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index a646ac5..41a517b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -36,12 +36,12 @@ import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.OptionalFailure;
@@ -67,6 +67,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+/**
+ * Tests for the {@link ArchivedExecutionGraph}.
+ */
 public class ArchivedExecutionGraphTest extends TestLogger {
 
private static ExecutionGraph runtimeGraph;
@@ -117,7 +120,7 @@ public class ArchivedExecutionGraphTest extends TestLogger {
List jobVertices = new ArrayList<>();
jobVertices.add(runtimeGraph.getJobVertex(v1ID));
jobVertices.add(runtimeGraph.getJobVertex(v2ID));
-   
+
CheckpointStatsTracker statsTracker = new 
CheckpointStatsTracker(
0,
jobVertices,



[flink] branch master updated (c763c35 -> 397c9c6)

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c763c35  [FLINK-11691] Introduce Classloader in the methods of 
StateBackendFactory and ConfigurableStateBackend
 new ad55d89  [hotfix] Use lambda for heartbeat timeout handler in 
ResourceManager
 new 4eabeb9  [hotfix] Null resourceManagerAddress when suspending JobMaster
 new d3b1d18  [hotfix] Fix checkstyle violation in JobMaster
 new c58a784  [hotfix] Fix checkstyle violations in 
ExecutionGraphSuspendTest
 new f3758f3  [hotfix] Remove unused methods from TaskManagerGateway
 new b9cd88a  [hotfix][tests] Remove mocking from ExecutionGraphSuspendTest
 new 7d41825  [hotfix] Fix checkstyle violation in ExecutionGraph
 new 6fa35ea  [hotfix] Fix checkstyle violations in JobStatus
 new a54f8c8  [hotfix] Fix checkstyle violations in 
ArchivedExecutionGraphTest
 new c9e392b  [FLINK-11537] Make ExecutionGraph#suspend terminate 
ExecutionGraph atomically
 new db98eab  [hotfix][tests] Remove mocking from 
ExecutionGraphSchedulingTest
 new ce49a90  [FLINK-11678] Change RestfulGateway#requestJob return type to 
CompletableFuture
 new 397c9c6  [FLINK-11678] Let ExecutionGraphCache store only 
ArchivedExecutionGraphs

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/client/cli/CliFrontendListTest.java  |   4 +-
 .../flink/runtime/executiongraph/Execution.java| 104 +--
 .../runtime/executiongraph/ExecutionGraph.java |  77 +--
 .../runtime/executiongraph/ExecutionJobVertex.java |  19 ++-
 .../runtime/executiongraph/ExecutionVertex.java|   4 +
 .../apache/flink/runtime/jobgraph/JobStatus.java   |  33 +++--
 .../jobmanager/slots/ActorTaskManagerGateway.java  |  37 --
 .../jobmanager/slots/TaskManagerGateway.java   |  44 ---
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   3 +-
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |  33 -
 .../runtime/resourcemanager/ResourceManager.java   |  19 ++-
 .../rest/handler/legacy/ExecutionGraphCache.java   |  57 +++--
 .../flink/runtime/webmonitor/RestfulGateway.java   |  10 +-
 .../executiongraph/ArchivedExecutionGraphTest.java |   8 +-
 ...ncurrentFailoverStrategyExecutionGraphTest.java |   6 +-
 .../ExecutionGraphCoLocationRestartTest.java   |   2 +-
 .../ExecutionGraphDeploymentTest.java  |   6 +-
 .../executiongraph/ExecutionGraphRestartTest.java  |  10 +-
 .../ExecutionGraphSchedulingTest.java  |  59 -
 .../executiongraph/ExecutionGraphSuspendTest.java  | 103 ---
 .../executiongraph/ExecutionGraphTestUtils.java|   4 +-
 .../runtime/executiongraph/ExecutionTest.java  |   6 +-
 .../executiongraph/ExecutionVertexCancelTest.java  |   4 +-
 .../ExecutionVertexInputConstraintTest.java|   2 +-
 .../runtime/executiongraph/FailoverRegionTest.java |  14 +-
 .../executiongraph/GlobalModVersionTest.java   |   4 +-
 .../InteractionsCountingTaskManagerGateway.java|  63 +
 .../utils/SimpleAckingTaskManagerGateway.java  |  37 --
 .../flink/runtime/jobmaster/JobMasterTest.java | 141 ++---
 .../handler/legacy/ExecutionGraphCacheTest.java| 104 +--
 .../taskexecutor/TestingTaskExecutorGateway.java   |  17 ++-
 .../TestingTaskExecutorGatewayBuilder.java |  20 ++-
 .../webmonitor/TestingDispatcherGateway.java   |   5 +-
 .../runtime/webmonitor/TestingRestfulGateway.java  |  14 +-
 34 files changed, 428 insertions(+), 645 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/InteractionsCountingTaskManagerGateway.java



[flink] 04/13: [hotfix] Fix checkstyle violations in ExecutionGraphSuspendTest

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c58a784f07d6cac3009636dcd4ec0dac6906908e
Author: Till Rohrmann 
AuthorDate: Tue Feb 19 11:35:42 2019 +0100

[hotfix] Fix checkstyle violations in ExecutionGraphSuspendTest
---
 .../flink/runtime/executiongraph/ExecutionGraphSuspendTest.java  | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 5377cdc..c0cfc8a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -25,17 +25,16 @@ import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy
 import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
@@ -133,7 +132,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
}
 
/**
-* Suspending from FAILING goes to SUSPENDING and sends no additional 
RPC calls
+* Suspending from FAILING goes to SUSPENDING and sends no additional 
RPC calls.
 */
@Test
public void testSuspendedOutOfFailing() throws Exception {



[flink] 05/13: [hotfix] Remove unused methods from TaskManagerGateway

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f3758f3d2f898706bd3cb40699e3df75e34f112d
Author: Till Rohrmann 
AuthorDate: Tue Feb 19 11:53:00 2019 +0100

[hotfix] Remove unused methods from TaskManagerGateway
---
 .../jobmanager/slots/ActorTaskManagerGateway.java  | 37 --
 .../jobmanager/slots/TaskManagerGateway.java   | 44 --
 .../runtime/jobmaster/RpcTaskManagerGateway.java   | 33 
 .../utils/SimpleAckingTaskManagerGateway.java  | 37 --
 4 files changed, 151 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index 6b88752..54c11ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -22,18 +22,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.StackTrace;
 import org.apache.flink.runtime.messages.StackTraceSampleMessages;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
@@ -71,28 +66,6 @@ public class ActorTaskManagerGateway implements 
TaskManagerGateway {
}
 
@Override
-   public void disconnectFromJobManager(InstanceID instanceId, Exception 
cause) {
-   actorGateway.tell(new Messages.Disconnect(instanceId, cause));
-   }
-
-   @Override
-   public void stopCluster(final ApplicationStatus applicationStatus, 
final String message) {
-   actorGateway.tell(new StopCluster(applicationStatus, message));
-   }
-
-   @Override
-   public CompletableFuture requestStackTrace(final Time 
timeout) {
-   Preconditions.checkNotNull(timeout);
-
-   scala.concurrent.Future stackTraceFuture = 
actorGateway.ask(
-   TaskManagerMessages.SendStackTrace$.MODULE$.get(),
-   new FiniteDuration(timeout.getSize(), 
timeout.getUnit()))
-   
.mapTo(ClassTag$.MODULE$.apply(StackTrace.class));
-
-   return FutureUtils.toJava(stackTraceFuture);
-   }
-
-   @Override
public CompletableFuture 
requestStackTraceSample(
ExecutionAttemptID executionAttemptID,
int sampleId,
@@ -210,16 +183,6 @@ public class ActorTaskManagerGateway implements 
TaskManagerGateway {
}
 
@Override
-   public CompletableFuture requestTaskManagerLog(Time 
timeout) {
-   return 
requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) 
TaskManagerMessages.getRequestTaskManagerLog(), timeout);
-   }
-
-   @Override
-   public CompletableFuture 
requestTaskManagerStdout(Time timeout) {
-   return 
requestTaskManagerLog((TaskManagerMessages.RequestTaskManagerLog) 
TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
-   }
-
-   @Override
public CompletableFuture freeSlot(AllocationID 
allocationId, Throwable cause, Time timeout) {
throw new UnsupportedOperationException("The old TaskManager 
does not support freeing slots");
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index b2aca32..1a8eda5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -20,16 +20,12 @@ package org.apache.flink.runtime.jobmanager.slots;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;

[flink] 02/13: [hotfix] Null resourceManagerAddress when suspending JobMaster

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4eabeb9ae7e3b96a3a9490aa12f9016ea9ecc768
Author: Till Rohrmann 
AuthorDate: Sun Feb 17 13:19:10 2019 +0100

[hotfix] Null resourceManagerAddress when suspending JobMaster

Nulling the resourceManagerAddress when suspending the JobMaster prevents 
that the
JobMaster tries to reconnect to the ResourceManager if it receives a 
disconnectResourceManager
message.
---
 .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java  | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 00cd9db..8f51431 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1098,6 +1098,7 @@ public class JobMaster extends 
FencedRpcEndpoint implements JobMast
 
try {
resourceManagerLeaderRetriever.stop();
+   resourceManagerAddress = null;
} catch (Throwable t) {
log.warn("Failed to stop resource manager leader 
retriever when suspending.", t);
}



[flink] 13/13: [FLINK-11678] Let ExecutionGraphCache store only ArchivedExecutionGraphs

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 397c9c60e9d24556945d9c59507bfb0dbd6e31a6
Author: Till Rohrmann 
AuthorDate: Wed Feb 20 12:12:30 2019 +0100

[FLINK-11678] Let ExecutionGraphCache store only ArchivedExecutionGraphs

The ExecutionGraphCache now only stores ArchivedExecutionGraphs since it 
will never receive an
ExecutionGraph from the RestfulGateway.

- Remove ExecutionGraphTest#testCacheInvalidationIfSuspended
- Remove ExecutionGraphTest#testCacheInvalidationIfSwitchToSuspended

This closes #7769.
---
 .../rest/handler/legacy/ExecutionGraphCache.java   | 55 +--
 .../handler/legacy/ExecutionGraphCacheTest.java| 81 --
 .../runtime/webmonitor/TestingRestfulGateway.java  |  1 -
 3 files changed, 19 insertions(+), 118 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
index 7e9e6ee..f634a62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
@@ -21,21 +21,21 @@ package org.apache.flink.runtime.rest.handler.legacy;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Closeable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Cache for {@link AccessExecutionGraph} which are obtained from the Flink 
cluster. Every cache entry
+ * Cache for {@link ArchivedExecutionGraph} which are obtained from the Flink 
cluster. Every cache entry
  * has an associated time to live after which a new request will trigger the 
reloading of the
- * {@link AccessExecutionGraph} from the cluster.
+ * {@link ArchivedExecutionGraph} from the cluster.
  */
 public class ExecutionGraphCache implements Closeable {
 
@@ -76,12 +76,15 @@ public class ExecutionGraphCache implements Closeable {
 * {@link AccessExecutionGraph} will be requested again after the 
refresh interval has passed
 * or if the graph could not be retrieved from the given gateway.
 *
-* @param jobId identifying the {@link AccessExecutionGraph} to get
-* @param restfulGateway to request the {@link AccessExecutionGraph} 
from
-* @return Future containing the requested {@link AccessExecutionGraph}
+* @param jobId identifying the {@link ArchivedExecutionGraph} to get
+* @param restfulGateway to request the {@link ArchivedExecutionGraph} 
from
+* @return Future containing the requested {@link 
ArchivedExecutionGraph}
 */
public CompletableFuture getExecutionGraph(JobID 
jobId, RestfulGateway restfulGateway) {
+   return getExecutionGraphInternal(jobId, 
restfulGateway).thenApply(Function.identity());
+   }
 
+   private CompletableFuture 
getExecutionGraphInternal(JobID jobId, RestfulGateway restfulGateway) {
Preconditions.checkState(running, "ExecutionGraphCache is no 
longer running");
 
while (true) {
@@ -89,26 +92,12 @@ public class ExecutionGraphCache implements Closeable {
 
final long currentTime = System.currentTimeMillis();
 
-   if (oldEntry != null) {
-   if (currentTime < oldEntry.getTTL()) {
-   final 
CompletableFuture executionGraphFuture = 
oldEntry.getExecutionGraphFuture();
-   if (executionGraphFuture.isDone() && 
!executionGraphFuture.isCompletedExceptionally()) {
-
-   // TODO: Remove once we no 
longer request the actual ExecutionGraph from the JobManager but only the 
ArchivedExecutionGraph
-   try {
-   final 
AccessExecutionGraph executionGraph = executionGraphFuture.get();
-   if 
(executionGraph.getState() != JobStatus.SUSPENDED) {
-   return 
executionGraphFuture;
-   

[flink] 10/13: [FLINK-11537] Make ExecutionGraph#suspend terminate ExecutionGraph atomically

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c9e392b53b48c8ae0b189905a9b4cf878bf741e4
Author: Till Rohrmann 
AuthorDate: Tue Feb 19 14:51:06 2019 +0100

[FLINK-11537] Make ExecutionGraph#suspend terminate ExecutionGraph 
atomically

This commit makes the suspend call transition the ExecutionGraph atomically 
into the
SUSPENDED state without requiring a round-trip to cancel all task properly. 
Instead it
simply notifies the TaskExecutors about the suspension and then transitions 
the Executions
into a terminal state. Moreover, this commit removes JobStatus#SUSPENDING.

This closes #7756.
---
 .../flink/client/cli/CliFrontendListTest.java  |   4 +-
 .../flink/runtime/executiongraph/Execution.java| 104 +--
 .../runtime/executiongraph/ExecutionGraph.java |  75 +--
 .../runtime/executiongraph/ExecutionJobVertex.java |  19 ++-
 .../runtime/executiongraph/ExecutionVertex.java|   4 +
 .../apache/flink/runtime/jobgraph/JobStatus.java   |   3 -
 .../rest/handler/legacy/ExecutionGraphCache.java   |   6 +-
 .../executiongraph/ArchivedExecutionGraphTest.java |   1 -
 ...ncurrentFailoverStrategyExecutionGraphTest.java |   6 +-
 .../ExecutionGraphCoLocationRestartTest.java   |   2 +-
 .../ExecutionGraphDeploymentTest.java  |   6 +-
 .../executiongraph/ExecutionGraphRestartTest.java  |  10 +-
 .../ExecutionGraphSchedulingTest.java  |   2 +-
 .../executiongraph/ExecutionGraphSuspendTest.java  |  61 +
 .../executiongraph/ExecutionGraphTestUtils.java|   4 +-
 .../runtime/executiongraph/ExecutionTest.java  |   6 +-
 .../executiongraph/ExecutionVertexCancelTest.java  |   4 +-
 .../ExecutionVertexInputConstraintTest.java|   2 +-
 .../runtime/executiongraph/FailoverRegionTest.java |  14 +-
 .../executiongraph/GlobalModVersionTest.java   |   4 +-
 .../flink/runtime/jobmaster/JobMasterTest.java | 141 ++---
 .../handler/legacy/ExecutionGraphCacheTest.java|  21 +--
 .../taskexecutor/TestingTaskExecutorGateway.java   |  17 ++-
 .../TestingTaskExecutorGatewayBuilder.java |  20 ++-
 24 files changed, 260 insertions(+), 276 deletions(-)

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java
index be37cccd..a8ba7a5 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendListTest.java
@@ -116,9 +116,7 @@ public class CliFrontendListTest extends 
CliFrontendTestBase {

when(clusterClient.listJobs()).thenReturn(CompletableFuture.completedFuture(Arrays.asList(
new JobStatusMessage(new JobID(), "job1", 
JobStatus.RUNNING, 1L),
new JobStatusMessage(new JobID(), "job2", 
JobStatus.CREATED, 1L),
-   new JobStatusMessage(new JobID(), "job3", 
JobStatus.SUSPENDING, 3L),
-   new JobStatusMessage(new JobID(), "job4", 
JobStatus.SUSPENDING, 2L),
-   new JobStatusMessage(new JobID(), "job5", 
JobStatus.FINISHED, 3L)
+   new JobStatusMessage(new JobID(), "job3", 
JobStatus.FINISHED, 3L)
)));
return clusterClient;
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 6e1e8d2..7874f0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -701,8 +701,7 @@ public class Execution implements AccessExecution, 
Archiveable suspend() {
+   switch(state) {
+   case RUNNING:
+   case DEPLOYING:
+   case CREATED:
+   case SCHEDULED:
+   if (!cancelAtomically()) {
+   throw new IllegalStateException(
+   String.format("Could not 
directly go to %s from %s.", CANCELED.name(), state.name()));
+   }
+   break;
+   case CANCELING:
+   completeCancelling();
+   break;
+   case FINISHED:
+   case FAILED:
+   sendFailIntermediateResultPartitionsRpcCall();
+   break;
+   case CANCELED:
+   break;
+   default:
+   

[flink] 06/13: [hotfix][tests] Remove mocking from ExecutionGraphSuspendTest

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b9cd88a79c58810c95b078560168e3ab80d42093
Author: Till Rohrmann 
AuthorDate: Tue Feb 19 12:07:08 2019 +0100

[hotfix][tests] Remove mocking from ExecutionGraphSuspendTest
---
 .../executiongraph/ExecutionGraphSuspendTest.java  | 87 +++---
 1 file changed, 62 insertions(+), 25 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index c0cfc8a..097dfe9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
@@ -29,18 +30,18 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.junit.Assert.assertThat;
 
 /**
  * Validates that suspending out of various states works correctly.
@@ -53,7 +54,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 */
@Test
public void testSuspendedOutOfCreated() throws Exception {
-   final TaskManagerGateway gateway = spy(new 
SimpleAckingTaskManagerGateway());
+   final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway();
final int parallelism = 10;
final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
 
@@ -75,7 +76,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 */
@Test
public void testSuspendedOutOfDeploying() throws Exception {
-   final TaskManagerGateway gateway = spy(new 
SimpleAckingTaskManagerGateway());
+   final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway();
final int parallelism = 10;
final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
 
@@ -104,7 +105,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 */
@Test
public void testSuspendedOutOfRunning() throws Exception {
-   final TaskManagerGateway gateway = spy(new 
SimpleAckingTaskManagerGateway());
+   final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway();
final int parallelism = 10;
final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
 
@@ -136,7 +137,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 */
@Test
public void testSuspendedOutOfFailing() throws Exception {
-   final TaskManagerGateway gateway = spy(new 
SimpleAckingTaskManagerGateway());
+   final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway();
final int parallelism = 10;
final ExecutionGraph eg = createExecutionGraph(gateway, 
parallelism);
 
@@ -166,7 +167,7 @@ public class ExecutionGraphSuspendTest extends TestLogger {
 */
@Test
public void testSuspendedOutOfFailed() throws Exception {
-   final TaskManagerGateway gateway = spy(new 
SimpleAckingTaskManagerGateway());
+   final InteractionsCountingTaskManagerGateway gateway = new 
InteractionsCountingTaskManagerGateway();
final int parallelism = 10;
final ExecutionGraph eg = 

[flink] 11/13: [hotfix][tests] Remove mocking from ExecutionGraphSchedulingTest

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit db98eabf76fe35201b18a85fee9dbc13e2dd3379
Author: Till Rohrmann 
AuthorDate: Wed Feb 20 10:11:59 2019 +0100

[hotfix][tests] Remove mocking from ExecutionGraphSchedulingTest
---
 .../ExecutionGraphSchedulingTest.java  | 57 
 .../executiongraph/ExecutionGraphSuspendTest.java  | 38 -
 .../InteractionsCountingTaskManagerGateway.java| 63 ++
 3 files changed, 85 insertions(+), 73 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 90adf09..a7c6947 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -49,7 +48,6 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -58,7 +56,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Test;
-import org.mockito.verification.Timeout;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -80,12 +77,6 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the scheduling of the execution graph. This tests that
@@ -105,7 +96,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger 
{
//  Tests
// 

 
-   
/**
 * Tests that with scheduling futures and pipelined deployment, the 
target vertex will
 * not deploy its task before the source vertex does.
@@ -143,8 +133,8 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
 
//  set up two TaskManager gateways and slots
 
-   final TaskManagerGateway gatewaySource = createTaskManager();
-   final TaskManagerGateway gatewayTarget = createTaskManager();
+   final InteractionsCountingTaskManagerGateway gatewaySource = 
createTaskManager();
+   final InteractionsCountingTaskManagerGateway gatewayTarget = 
createTaskManager();
 
final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId);
final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId);
@@ -160,15 +150,15 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
// that should not cause a deployment or deployment related 
failure
targetFuture.complete(targetSlot);
 
-   verify(gatewayTarget, new Timeout(50, 
times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+   assertThat(gatewayTarget.getSubmitTaskCount(), is(0));
assertEquals(JobStatus.RUNNING, eg.getState());
 
// now supply the source slot
sourceFuture.complete(sourceSlot);
 
// by now, all deployments should have happened
-   verify(gatewaySource, 
timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
-   verify(gatewayTarget, 
timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+   assertThat(gatewaySource.getSubmitTaskCount(), is(1));
+   

[flink] 08/13: [hotfix] Fix checkstyle violations in JobStatus

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6fa35eabdc2aaa28ce20e37753f5cc8ab1cac290
Author: Till Rohrmann 
AuthorDate: Tue Feb 19 15:41:22 2019 +0100

[hotfix] Fix checkstyle violations in JobStatus
---
 .../apache/flink/runtime/jobgraph/JobStatus.java   | 32 +++---
 1 file changed, 16 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
index c04528e..ef2cf1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobStatus.java
@@ -29,25 +29,25 @@ public enum JobStatus {
/** Some tasks are scheduled or running, some may be pending, some may 
be finished. */
RUNNING(TerminalState.NON_TERMINAL),
 
-   /** The job has failed and is currently waiting for the cleanup to 
complete */
+   /** The job has failed and is currently waiting for the cleanup to 
complete. */
FAILING(TerminalState.NON_TERMINAL),
-   
-   /** The job has failed with a non-recoverable task failure */
+
+   /** The job has failed with a non-recoverable task failure. */
FAILED(TerminalState.GLOBALLY),
 
-   /** Job is being cancelled */
+   /** Job is being cancelled. */
CANCELLING(TerminalState.NON_TERMINAL),
-   
-   /** Job has been cancelled */
+
+   /** Job has been cancelled. */
CANCELED(TerminalState.GLOBALLY),
 
/** All of the job's tasks have successfully finished. */
FINISHED(TerminalState.GLOBALLY),
-   
-   /** The job is currently undergoing a reset and total restart */
+
+   /** The job is currently undergoing a reset and total restart. */
RESTARTING(TerminalState.NON_TERMINAL),
 
-   /** The job has been suspended and is currently waiting for the cleanup 
to complete */
+   /** The job has been suspended and is currently waiting for the cleanup 
to complete. */
SUSPENDING(TerminalState.NON_TERMINAL),
 
/**
@@ -58,7 +58,7 @@ public enum JobStatus {
 
/** The job is currently reconciling and waits for task execution 
report to recover state. */
RECONCILING(TerminalState.NON_TERMINAL);
-   
+
// 

 
private enum TerminalState {
@@ -66,9 +66,9 @@ public enum JobStatus {
LOCALLY,
GLOBALLY
}
-   
+
private final TerminalState terminalState;
-   
+
JobStatus(TerminalState terminalState) {
this.terminalState = terminalState;
}
@@ -77,10 +77,10 @@ public enum JobStatus {
 * Checks whether this state is globally terminal. A globally 
terminal job
 * is complete and cannot fail any more and will not be restarted or 
recovered by another
 * standby master node.
-* 
+*
 * When a globally terminal state has been reached, all recovery 
data for the job is
 * dropped from the high-availability services.
-* 
+*
 * @return True, if this job status is globally terminal, false 
otherwise.
 */
public boolean isGloballyTerminalState() {
@@ -90,11 +90,11 @@ public enum JobStatus {
/**
 * Checks whether this state is locally terminal. Locally 
terminal refers to the
 * state of a job's execution graph within an executing JobManager. If 
the execution graph
-* is locally terminal, the JobManager will not continue executing or 
recovering the job. 
+* is locally terminal, the JobManager will not continue executing or 
recovering the job.
 *
 * The only state that is locally terminal, but not globally 
terminal is {@link #SUSPENDED},
 * which is typically entered when the executing JobManager looses its 
leader status.
-* 
+*
 * @return True, if this job status is terminal, false otherwise.
 */
public boolean isTerminalState() {



[flink] 07/13: [hotfix] Fix checkstyle violation in ExecutionGraph

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7d41825742e1607345243f5b6ba61da09592edf2
Author: Till Rohrmann 
AuthorDate: Tue Feb 19 14:51:49 2019 +0100

[hotfix] Fix checkstyle violation in ExecutionGraph
---
 .../java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 3a414a6..204187c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1028,7 +1028,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph {

executionMessageBuilder.append("completed exceptionally: " + 
completionException + "/" + executionFuture);
}
 
-   if (i < 
allAllocationFutures.size() - 1 ) {
+   if (i < 
allAllocationFutures.size() - 1) {

executionMessageBuilder.append(", ");
}
}



[flink] 03/13: [hotfix] Fix checkstyle violation in JobMaster

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d3b1d18065bc7faad62cc584cc78c2f3edc9847e
Author: Till Rohrmann 
AuthorDate: Tue Feb 19 11:31:52 2019 +0100

[hotfix] Fix checkstyle violation in JobMaster
---
 .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 8f51431..50c391e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.core.io.InputSplit;



[flink] 01/13: [hotfix] Use lambda for heartbeat timeout handler in ResourceManager

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ad55d8924c6bda919ec4db851151ec03e51d0fe5
Author: Till Rohrmann 
AuthorDate: Sun Feb 17 12:38:52 2019 +0100

[hotfix] Use lambda for heartbeat timeout handler in ResourceManager
---
 .../runtime/resourcemanager/ResourceManager.java  | 19 ---
 1 file changed, 8 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 4ef1858..396dd4e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -1172,19 +1172,16 @@ public abstract class ResourceManager
 
@Override
public void notifyHeartbeatTimeout(final ResourceID resourceID) 
{
-   runAsync(new Runnable() {
-   @Override
-   public void run() {
-   log.info("The heartbeat of JobManager 
with id {} timed out.", resourceID);
+   runAsync(() -> {
+   log.info("The heartbeat of JobManager with id 
{} timed out.", resourceID);
 
-   if 
(jmResourceIdRegistrations.containsKey(resourceID)) {
-   JobManagerRegistration 
jobManagerRegistration = jmResourceIdRegistrations.get(resourceID);
+   if 
(jmResourceIdRegistrations.containsKey(resourceID)) {
+   JobManagerRegistration 
jobManagerRegistration = jmResourceIdRegistrations.get(resourceID);
 
-   if (jobManagerRegistration != 
null) {
-   
closeJobManagerConnection(
-   
jobManagerRegistration.getJobID(),
-   new 
TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed 
out."));
-   }
+   if (jobManagerRegistration != null) {
+   closeJobManagerConnection(
+   
jobManagerRegistration.getJobID(),
+   new 
TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed 
out."));
}
}
});



[flink] 12/13: [FLINK-11678] Change RestfulGateway#requestJob return type to CompletableFuture

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ce49a9040d255ebda3dd3b4512c9d9b00746ac12
Author: Till Rohrmann 
AuthorDate: Wed Feb 20 11:55:58 2019 +0100

[FLINK-11678] Change RestfulGateway#requestJob return type to 
CompletableFuture
---
 .../org/apache/flink/runtime/webmonitor/RestfulGateway.java | 10 +-
 .../rest/handler/legacy/ExecutionGraphCacheTest.java| 10 +-
 .../flink/runtime/webmonitor/TestingDispatcherGateway.java  |  5 ++---
 .../flink/runtime/webmonitor/TestingRestfulGateway.java | 13 +++--
 4 files changed, 19 insertions(+), 19 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index fb665ab..ab6a781 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobResult;
@@ -67,14 +67,14 @@ public interface RestfulGateway extends RpcGateway {
CompletableFuture stopJob(JobID jobId, @RpcTimeout Time 
timeout);
 
/**
-* Requests the {@link AccessExecutionGraph} for the given jobId. If 
there is no such graph, then
+* Requests the {@link ArchivedExecutionGraph} for the given jobId. If 
there is no such graph, then
 * the future is completed with a {@link FlinkJobNotFoundException}.
 *
-* @param jobId identifying the job whose AccessExecutionGraph is 
requested
+* @param jobId identifying the job whose {@link 
ArchivedExecutionGraph} is requested
 * @param timeout for the asynchronous operation
-* @return Future containing the AccessExecutionGraph for the given 
jobId, otherwise {@link FlinkJobNotFoundException}
+* @return Future containing the {@link ArchivedExecutionGraph} for the 
given jobId, otherwise {@link FlinkJobNotFoundException}
 */
-   CompletableFuture requestJob(JobID 
jobId, @RpcTimeout Time timeout);
+   CompletableFuture requestJob(JobID jobId, 
@RpcTimeout Time timeout);
 
/**
 * Requests the {@link JobResult} of a job specified by the given jobId.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
index ff28a4a..bd0e659 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java
@@ -259,7 +259,7 @@ public class ExecutionGraphCacheTest extends TestLogger {
final JobID expectedJobId = new JobID();
 
final ArchivedExecutionGraph suspendedExecutionGraph = new 
ArchivedExecutionGraphBuilder().setState(JobStatus.SUSPENDED).build();
-   final ConcurrentLinkedQueue> requestJobAnswers = new ConcurrentLinkedQueue<>();
+   final 
ConcurrentLinkedQueue> 
requestJobAnswers = new ConcurrentLinkedQueue<>();
 

requestJobAnswers.offer(CompletableFuture.completedFuture(suspendedExecutionGraph));

requestJobAnswers.offer(CompletableFuture.completedFuture(expectedExecutionGraph));
@@ -325,8 +325,8 @@ public class ExecutionGraphCacheTest extends TestLogger {
}
}
 
-   private CountingRestfulGateway createCountingRestfulGateway(JobID 
jobId, CompletableFuture... 
accessExecutionGraphs) {
-   final ConcurrentLinkedQueue> queue = new 
ConcurrentLinkedQueue<>(Arrays.asList(accessExecutionGraphs));
+   private CountingRestfulGateway createCountingRestfulGateway(JobID 
jobId, CompletableFuture... accessExecutionGraphs) {
+   final 
ConcurrentLinkedQueue> queue = new 
ConcurrentLinkedQueue<>(Arrays.asList(accessExecutionGraphs));
return new CountingRestfulGateway(
jobId,
ignored -> queue.poll());
@@ -341,13 +341,13 @@ public class ExecutionGraphCacheTest extends TestLogger {
 
private AtomicInteger numRequestJobCalls = new AtomicInteger(0);
 
-   private 

[flink] branch master updated: [FLINK-11691] Introduce Classloader in the methods of StateBackendFactory and ConfigurableStateBackend

2019-02-20 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c763c35  [FLINK-11691] Introduce Classloader in the methods of 
StateBackendFactory and ConfigurableStateBackend
c763c35 is described below

commit c763c353300953f22cad19afe0d987e016a8773d
Author: Yun Tang 
AuthorDate: Thu Feb 21 00:50:46 2019 +0800

[FLINK-11691] Introduce Classloader in the methods of StateBackendFactory 
and ConfigurableStateBackend

This closes #7779.
---
 .../flink/runtime/state/ConfigurableStateBackend.java  |  5 -
 .../flink/runtime/state/StateBackendFactory.java   |  3 ++-
 .../apache/flink/runtime/state/StateBackendLoader.java | 18 +-
 .../flink/runtime/state/filesystem/FsStateBackend.java |  8 
 .../state/filesystem/FsStateBackendFactory.java|  6 +++---
 .../flink/runtime/state/memory/MemoryStateBackend.java | 12 +++-
 .../state/memory/MemoryStateBackendFactory.java|  6 +++---
 .../flink/runtime/state/StateBackendLoadingTest.java   |  2 +-
 .../runtime/state/testutils/BackendForTestStream.java  |  2 +-
 .../contrib/streaming/state/RocksDBStateBackend.java   | 12 +++-
 .../streaming/state/RocksDBStateBackendFactory.java|  4 ++--
 .../streaming/state/RocksDBStateBackendConfigTest.java |  4 ++--
 .../state/RocksDBStateBackendMigrationTest.java|  2 +-
 .../streaming/state/RocksDBStateBackendTest.java   |  2 +-
 .../streaming/state/ttl/RocksDBTtlStateTestBase.java   |  2 +-
 .../flink/streaming/runtime/tasks/StreamTaskTest.java  |  2 +-
 .../runtime/tasks/TaskCheckpointingBehaviourTest.java  |  4 ++--
 17 files changed, 51 insertions(+), 43 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
index f509e8d..dd48467 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 
 /**
  * An interface for state backends that pick up additional parameters from a 
configuration.
  */
+@Internal
 public interface ConfigurableStateBackend {
 
/**
@@ -37,9 +39,10 @@ public interface ConfigurableStateBackend {
 * Otherwise it typically returns a modified copy.
 *
 * @param config The configuration to pick the values from.
+* @param classLoader The class loader that should be used to load the 
state backend.
 * @return A reconfigured state backend.
 *
 * @throws IllegalConfigurationException Thrown if the configuration 
contained invalid entries.
 */
-   StateBackend configure(Configuration config) throws 
IllegalConfigurationException;
+   StateBackend configure(Configuration config, ClassLoader classLoader) 
throws IllegalConfigurationException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
index c0f0976..c9d7ef4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
@@ -40,6 +40,7 @@ public interface StateBackendFactory {
 * Creates the state backend, optionally using the given configuration.
 * 
 * @param config The Flink configuration (loaded by the TaskManager).
+* @param classLoader The class loader that should be used to load the 
state backend.
 * @return The created state backend. 
 * 
 * @throws IllegalConfigurationException
@@ -47,5 +48,5 @@ public interface StateBackendFactory {
 * @throws IOException
 * If the state backend initialization failed due to an I/O 
exception
 */
-   T createFromConfig(Configuration config) throws 
IllegalConfigurationException, IOException;
+   T createFromConfig(Configuration config, ClassLoader classLoader) 
throws IllegalConfigurationException, IOException;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
index 857dfc1..dbb20b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java
+++ 

[flink] branch master updated: [hotfix][runtime] Fix typo in IOManagerAsync

2019-02-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 17ebd5f  [hotfix][runtime] Fix typo in IOManagerAsync
17ebd5f is described below

commit 17ebd5fac8650c6a123d813a3c201ae1ac77f572
Author: leesf <490081...@qq.com>
AuthorDate: Wed Feb 20 22:33:14 2019 +0800

[hotfix][runtime] Fix typo in IOManagerAsync
---
 .../runtime/io/disk/iomanager/IOManagerAsync.java| 20 ++--
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 2b8e7f3..0c3c8f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -54,14 +54,14 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
// 
-
 
/**
-* Constructs a new asynchronous I/O manger, writing files to the 
system 's temp directory.
+* Constructs a new asynchronous I/O manager, writing files to the 
system 's temp directory.
 */
public IOManagerAsync() {
this(EnvironmentInformation.getTemporaryFileDirectory());
}

/**
-* Constructs a new asynchronous I/O manger, writing file to the given 
directory.
+* Constructs a new asynchronous I/O manager, writing file to the given 
directory.
 * 
 * @param tempDir The directory to write temporary files to.
 */
@@ -70,7 +70,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
}
 
/**
-* Constructs a new asynchronous I/O manger, writing file round robin 
across the given directories.
+* Constructs a new asynchronous I/O manager, writing file round robin 
across the given directories.
 * 
 * @param tempDirs The directories to write temporary files to.
 */
@@ -197,13 +197,13 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
public BlockChannelWriter 
createBlockChannelWriter(FileIOChannel.ID channelID,

LinkedBlockingQueue returnQueue) throws IOException
{
-   checkState(!isShutdown.get(), "I/O-Manger is shut down.");
+   checkState(!isShutdown.get(), "I/O-Manager is shut down.");
return new AsynchronousBlockWriter(channelID, 
this.writers[channelID.getThreadNum()].requestQueue, returnQueue);
}

@Override
public BlockChannelWriterWithCallback 
createBlockChannelWriter(FileIOChannel.ID channelID, 
RequestDoneCallback callback) throws IOException {
-   checkState(!isShutdown.get(), "I/O-Manger is shut down.");
+   checkState(!isShutdown.get(), "I/O-Manager is shut down.");
return new AsynchronousBlockWriterWithCallback(channelID, 
this.writers[channelID.getThreadNum()].requestQueue, callback);
}

@@ -221,27 +221,27 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
public BlockChannelReader 
createBlockChannelReader(FileIOChannel.ID channelID,

LinkedBlockingQueue returnQueue) throws IOException
{
-   checkState(!isShutdown.get(), "I/O-Manger is shut down.");
+   checkState(!isShutdown.get(), "I/O-Manager is shut down.");
return new AsynchronousBlockReader(channelID, 
this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
}
 
@Override
public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID) throws IOException {
-   checkState(!isShutdown.get(), "I/O-Manger is shut down.");
+   checkState(!isShutdown.get(), "I/O-Manager is shut down.");
 
return new AsynchronousBufferFileWriter(channelID, 
writers[channelID.getThreadNum()].requestQueue);
}
 
@Override
public BufferFileReader createBufferFileReader(FileIOChannel.ID 
channelID, RequestDoneCallback callback) throws IOException {
-   checkState(!isShutdown.get(), "I/O-Manger is shut down.");
+   checkState(!isShutdown.get(), "I/O-Manager is shut down.");
 
return new AsynchronousBufferFileReader(channelID, 
readers[channelID.getThreadNum()].requestQueue, callback);
}
 
@Override
public BufferFileSegmentReader 

[flink] branch master updated: [FLINK-11677] Remove ResourceManagerRunner

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 0446b67  [FLINK-11677] Remove ResourceManagerRunner
0446b67 is described below

commit 0446b6780e3eb0d677e25d75ac329825a598612d
Author: tison 
AuthorDate: Thu Feb 14 07:24:35 2019 +0800

[FLINK-11677] Remove ResourceManagerRunner

This closes #7699.
---
 .../resourcemanager/ResourceManagerRunner.java | 119 -
 1 file changed, 119 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
deleted file mode 100644
index e3e6d72..000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.entrypoint.ClusterInformation;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.util.AutoCloseableAsync;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Simple {@link StandaloneResourceManager} runner. It instantiates the 
resource manager's services
- * and handles fatal errors by shutting the resource manager down.
- */
-public class ResourceManagerRunner implements FatalErrorHandler, 
AutoCloseableAsync {
-
-   private static final Logger LOG = 
LoggerFactory.getLogger(ResourceManagerRunner.class);
-
-   private final ResourceManagerRuntimeServices 
resourceManagerRuntimeServices;
-
-   private final ResourceManager resourceManager;
-
-   public ResourceManagerRunner(
-   final ResourceID resourceId,
-   final String resourceManagerEndpointId,
-   final Configuration configuration,
-   final RpcService rpcService,
-   final HighAvailabilityServices highAvailabilityServices,
-   final HeartbeatServices heartbeatServices,
-   final MetricRegistry metricRegistry,
-   final ClusterInformation clusterInformation,
-   final JobManagerMetricGroup jobManagerMetricGroup) 
throws Exception {
-
-   Preconditions.checkNotNull(resourceId);
-   Preconditions.checkNotNull(configuration);
-   Preconditions.checkNotNull(rpcService);
-   Preconditions.checkNotNull(highAvailabilityServices);
-   Preconditions.checkNotNull(heartbeatServices);
-   Preconditions.checkNotNull(metricRegistry);
-
-   final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
-
-   resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
-   resourceManagerRuntimeServicesConfiguration,
-   highAvailabilityServices,
-   rpcService.getScheduledExecutor());
-
-   this.resourceManager = new StandaloneResourceManager(
-   rpcService,
-   resourceManagerEndpointId,
-   resourceId,
-   highAvailabilityServices,
-   heartbeatServices,
-   

[flink] branch master updated: [FLINK-11655][rpc] Remove serializable interface from CallAsync

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 091f448  [FLINK-11655][rpc] Remove serializable interface from 
CallAsync
091f448 is described below

commit 091f4482d70dc8b6f9de77672346ca063f37ae29
Author: libenchao 
AuthorDate: Wed Feb 20 10:35:40 2019 +0800

[FLINK-11655][rpc] Remove serializable interface from CallAsync

CallAsync is a message which should only send to the local actor. Hence it 
is
not necessary to be serializable.

This closes #7760.
---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java   | 64 --
 .../flink/runtime/rpc/messages/CallAsync.java  |  6 +-
 .../flink/runtime/rpc/messages/RunAsync.java   |  1 -
 3 files changed, 24 insertions(+), 47 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index ad3b441..0ba2b5a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -362,23 +362,12 @@ class AkkaRpcActor 
extends UntypedActor {
 * @param callAsync Call async message
 */
private void handleCallAsync(CallAsync callAsync) {
-   if (callAsync.getCallable() == null) {
-   final String result = "Received a " + 
callAsync.getClass().getName() + " message with an empty " +
-   "callable field. This indicates that this 
message has been serialized " +
-   "prior to sending the message. The " + 
callAsync.getClass().getName() +
-   " is only supported with local communication.";
-
-   log.warn(result);
-
-   getSender().tell(new Status.Failure(new 
AkkaRpcException(result)), getSelf());
-   } else {
-   try {
-   Object result = callAsync.getCallable().call();
+   try {
+   Object result = callAsync.getCallable().call();
 
-   getSender().tell(new Status.Success(result), 
getSelf());
-   } catch (Throwable e) {
-   getSender().tell(new Status.Failure(e), 
getSelf());
-   }
+   getSender().tell(new Status.Success(result), getSelf());
+   } catch (Throwable e) {
+   getSender().tell(new Status.Failure(e), getSelf());
}
}
 
@@ -389,36 +378,27 @@ class AkkaRpcActor 
extends UntypedActor {
 * @param runAsync Run async message
 */
private void handleRunAsync(RunAsync runAsync) {
-   if (runAsync.getRunnable() == null) {
-   log.warn("Received a {} message with an empty runnable 
field. This indicates " +
-   "that this message has been serialized prior to 
sending the message. The " +
-   "{} is only supported with local 
communication.",
-   runAsync.getClass().getName(),
-   runAsync.getClass().getName());
+   final long timeToRun = runAsync.getTimeNanos();
+   final long delayNanos;
+
+   if (timeToRun == 0 || (delayNanos = timeToRun - 
System.nanoTime()) <= 0) {
+   // run immediately
+   try {
+   runAsync.getRunnable().run();
+   } catch (Throwable t) {
+   log.error("Caught exception while executing 
runnable in main thread.", t);
+   ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+   }
}
else {
-   final long timeToRun = runAsync.getTimeNanos();
-   final long delayNanos;
-
-   if (timeToRun == 0 || (delayNanos = timeToRun - 
System.nanoTime()) <= 0) {
-   // run immediately
-   try {
-   runAsync.getRunnable().run();
-   } catch (Throwable t) {
-   log.error("Caught exception while 
executing runnable in main thread.", t);
-   
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-   }
-   }
-   else {
-   // schedule for later. send a new message after 
the delay, which will then be immediately executed
-  

[flink] 03/04: [hotfix] [doc] Add "Avoid Java serialization" to best practices section in "Custom Serialization for Managed State" doc

2019-02-20 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e6544c5a1c1bf092263d28818a0da406fa2d7aa5
Author: Tzu-Li (Gordon) Tai 
AuthorDate: Wed Feb 20 17:22:16 2019 +0800

[hotfix] [doc] Add "Avoid Java serialization" to best practices section in 
"Custom Serialization for Managed State" doc
---
 docs/dev/stream/state/custom_serialization.md | 11 +++
 1 file changed, 11 insertions(+)

diff --git a/docs/dev/stream/state/custom_serialization.md 
b/docs/dev/stream/state/custom_serialization.md
index dd5dcf1..79f05a3 100644
--- a/docs/dev/stream/state/custom_serialization.md
+++ b/docs/dev/stream/state/custom_serialization.md
@@ -403,6 +403,17 @@ the same `TypeSerializerSnapshot` class as their snapshot 
would complicate the i
 This would also be a bad separation of concerns; a single serializer's 
serialization schema,
 configuration, as well as how to restore it, should be consolidated in its own 
dedicated `TypeSerializerSnapshot` class.
 
+ 3. Avoid using Java serialization for serializer snapshot content
+
+Java serialization should not be used at all when writing contents of a 
persisted serializer snapshot.
+Take for example, a serializer which needs to persist a class of its target 
type as part of its snapshot.
+Information about the class should be persisted by writing the class name, 
instead of directly serializing the class
+using Java. When reading the snapshot, the class name is read, and used to 
dynamically load the class via the name.
+
+This practice ensures that serializer snapshots can always be safely read. In 
the above example, if the type class
+was persisted using Java serialization, the snapshot may no longer be readable 
once the class implementation has changed
+and is no longer binary compatible according to Java serialization specifics.
+
 ## Migrating from deprecated serializer snapshot APIs before Flink 1.7
 
 This section is a guide for API migration from serializers and serializer 
snapshots that existed before Flink 1.7.



[flink] 01/04: [FLINK-11292] [doc] Update document about how to use new CompositeTypeSerializerSnapshot

2019-02-20 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a3a7f24b4d8b5bf8f62de1df7b54a6c7e0ae555
Author: Yun Tang 
AuthorDate: Sun Jan 13 18:14:03 2019 +0800

[FLINK-11292] [doc] Update document about how to use new 
CompositeTypeSerializerSnapshot
---
 docs/dev/stream/state/custom_serialization.md | 44 +--
 1 file changed, 35 insertions(+), 9 deletions(-)

diff --git a/docs/dev/stream/state/custom_serialization.md 
b/docs/dev/stream/state/custom_serialization.md
index cc727be..fbfee72 100644
--- a/docs/dev/stream/state/custom_serialization.md
+++ b/docs/dev/stream/state/custom_serialization.md
@@ -201,6 +201,10 @@ to the implementation of state serializers and their 
serializer snapshots.
 
 ## Implementation notes and best practices
 
+Apart from the following two points, we also recommend that serializers with 
nested serializers implement snapshotting
+by extending `CompositeTypeSerializerSnapshot` as a subclass.
+Please refer to the [next section]({{ site.baseurl 
}}/dev/stream/state/custom_serialization.html#extending-compositetypeserializersnapshot-for-snapshots-of-serializers-with-nested-serializers)
 for more details.
+
  1. Flink restores serializer snapshots by instantiating them with their 
classname
 
 A serializer's snapshot, being the single source of truth for how a registered 
state was serialized, serves as an
@@ -223,15 +227,37 @@ the same `TypeSerializerSnapshot` class as their snapshot 
would complicate the i
 This would also be a bad separation of concerns; a single serializer's 
serialization schema,
 configuration, as well as how to restore it, should be consolidated in its own 
dedicated `TypeSerializerSnapshot` class.
 
- 3. Use the `CompositeSerializerSnapshot` utility for serializers that 
contain nested serializers
-
-There may be cases where a `TypeSerializer` relies on other nested 
`TypeSerializer`s; take for example Flink's
-`TupleSerializer`, where it is configured with nested `TypeSerializer`s for 
the tuple fields. In this case,
-the snapshot of the most outer serializer should also contain snapshots of the 
nested serializers.
-
-The `CompositeSerializerSnapshot` can be used specifically for this scenario. 
It wraps the logic of resolving
-the overall schema compatibility check result for the composite serializer.
-For an example of how it should be used, one can refer to Flink's
+### Extending `CompositeTypeSerializerSnapshot` for snapshots of serializers 
with nested serializers
+
+Before further explanation, we call the serializer, which relies on other 
nested serializer(s), as "outer" serializer in this context.
+Examples for this could be `MapSerializer`, `ListSerializer`, 
`GenericArraySerializer`, etc.. Considering `MapSerializer`, for example,
+the map-key and map-value serializers would be the nested serializers, while 
`MapSerialize` itself is the "outer" serializer.
+In this case, the snapshot of the most outer serializer should also contain 
snapshots of the nested serializers.
+Also, note that unlike the other two serializers mentioned above, 
`GenericArraySerializer` contains some additional static information
+(a class of the component type) that needs to be persisted along with the 
nested component serializer.
+
+`CompositeTypeSerializerSnapshot` is designed to assist in the implementation 
of snapshotting for serializers which delegate their serialization
+to multiple nested serializers. It's also helpful in cases where some extra 
static information needs to be persisted.
+`CompositeTypeSerializerSnapshot` takes care of the the overall schema 
compatibility check for the composite serializer.
+
+When adding a new serializer snapshot as a subclass of 
`CompositeTypeSerializerSnapshot`,
+the following three methods must be implemented:
+ * `#getCurrentOuterSnapshotVersion()`. This method defines the version of
+ the current outer serializer snapshot's written binary format.
+ * `#getNestedSerializers(TypeSerializer)`. Given the outer serializer, 
returns the nested serializers.
+ * `#createOuterSerializerWithNestedSerializers(TypeSerializer[])`.
+ Given the nested serializers, create an instance of the outer serializer.
+
+For serializers needing to contain some extra static information, the 
following two methods must also be implemented:
+ * `#writeOuterSnapshot(DataOutputView)`. This method writes the outer 
serializer snapshot, i.e. any information beyond the nested serializers.
+ The base implementation of this method writes nothing, i.e. it assumes that 
the outer serializer only has nested serializers and no extra information.
+ * `#readOuterSnapshot(int, DataInputView, ClassLoader)`. This method reads 
the outer serializer snapshot,
+ i.e. any information beyond the nested serializers of the outer serializer. 
The base implementaion of this method reads nothing,
+ i.e. it assumes that 

[flink] branch master updated (773f711 -> 337e167)

2019-02-20 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 773f711  [FLINK-10569][runtime] Remove Instance usage in 
ExecutionVertexSchedulingTest
 new 7a3a7f2  [FLINK-11292] [doc] Update document about how to use new 
CompositeTypeSerializerSnapshot
 new c0a7182  [FLINK-11292] [doc] Move predefined TypeSerializerSnapshot 
class docs to a top-level subsection
 new e6544c5  [hotfix] [doc] Add "Avoid Java serialization" to best 
practices section in "Custom Serialization for Managed State" doc
 new 337e167  [hotfix] [core] GenericArraySerializerSnapshot should 
implement isOuterSnapshotCompatible

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/stream/state/custom_serialization.md  | 196 -
 .../base/GenericArraySerializerSnapshot.java   |   5 +
 2 files changed, 193 insertions(+), 8 deletions(-)



[flink] 02/04: [FLINK-11292] [doc] Move predefined TypeSerializerSnapshot class docs to a top-level subsection

2019-02-20 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c0a71820dc01d70d710c847dec98100ebdd1fee7
Author: Tzu-Li (Gordon) Tai 
AuthorDate: Wed Feb 20 17:03:51 2019 +0800

[FLINK-11292] [doc] Move predefined TypeSerializerSnapshot class docs to a 
top-level subsection

This commit adds an independent, top-level subsection named "Predefined
convenient TypeSerializerSnapshot classes" to the "Custom Serialization
for Managed State" page.

This section covers details about using the SimpleTypeSerializerSnapshot
and CompositeTypeSerializerSnapshot classes, what they do, and in which
cases they are suitable to be used by a serializer.

This closes #7475.
---
 docs/dev/stream/state/custom_serialization.md | 217 +-
 1 file changed, 180 insertions(+), 37 deletions(-)

diff --git a/docs/dev/stream/state/custom_serialization.md 
b/docs/dev/stream/state/custom_serialization.md
index fbfee72..dd5dcf1 100644
--- a/docs/dev/stream/state/custom_serialization.md
+++ b/docs/dev/stream/state/custom_serialization.md
@@ -199,11 +199,187 @@ to the implementation of state serializers and their 
serializer snapshots.
  5. **Take another savepoint, serializing all state with schema _B_**
   - Same as step 2., but now state bytes are all in schema _B_.
 
-## Implementation notes and best practices
+## Predefined convenient `TypeSerializerSnapshot` classes
+
+Flink provides two abstract base `TypeSerializerSnapshot` classes that can be 
used for typical scenarios:
+`SimpleTypeSerializerSnapshot` and `CompositeTypeSerializerSnapshot`.
+
+Serializers that provide these predefined snapshots as their serializer 
snapshot must always have their own, independent
+subclass implementation. This corresponds to the best practice of not sharing 
snapshot classes
+across different serializers, which is more thoroughly explained in the next 
section.
+
+### Implementing a `SimpleTypeSerializerSnapshot`
+
+The `SimpleTypeSerializerSnapshot` is intended for serializers that do not 
have any state or configuration,
+essentially meaning that the serialization schema of the serializer is solely 
defined by the serializer's class.
+
+There will only be 2 possible results of the compatibility resolution when 
using the `SimpleTypeSerializerSnapshot`
+as your serializer's snapshot class:
+
+ - `TypeSerializerSchemaCompatibility.compatibleAsIs()`, if the new serializer 
class remains identical, or
+ - `TypeSerializerSchemaCompatibility.incompatible()`, if the new serializer 
class is different then the previous one.
+ 
+Below is an example of how the `SimpleTypeSerializerSnapshot` is used, using 
Flink's `IntSerializer` as an example:
+
+{% highlight java %}
+public class IntSerializerSnapshot extends 
SimpleTypeSerializerSnapshot {
+public IntSerializerSnapshot() {
+super(() -> IntSerializer.INSTANCE);
+}
+}
+{% endhighlight %}
+
+
+The `IntSerializer` has no state or configurations. Serialization format is 
solely defined by the serializer
+class itself, and can only be read by another `IntSerializer`. Therefore, it 
suits the use case of the
+`SimpleTypeSerializerSnapshot`.
+
+The base super constructor of the `SimpleTypeSerializerSnapshot` expects a 
`Supplier` of instances
+of the corresponding serializer, regardless of whether the snapshot is 
currently being restored or being written during
+snapshots. That supplier is used to create the restore serializer, as well as 
type checks to verify that the
+new serializer is of the same expected serializer class.
+
+### Implementing a `CompositeTypeSerializerSnapshot`
+
+The `CompositeTypeSerializerSnapshot` is intended for serializers that rely on 
multiple nested serializers for serialization.
+
+Before further explanation, we call the serializer, which relies on multiple 
nested serializer(s), as the "outer" serializer in this context.
+Examples for this could be `MapSerializer`, `ListSerializer`, 
`GenericArraySerializer`, etc.
+Consider the `MapSerializer`, for example - the key and value serializers 
would be the nested serializers,
+while `MapSerializer` itself is the "outer" serializer.
+
+In this case, the snapshot of the outer serializer should also contain 
snapshots of the nested serializers, so that
+the compatibility of the nested serializers can be independently checked. When 
resolving the compatibility of the
+outer serializer, the compatibility of each nested serializer needs to be 
considered.
+
+`CompositeTypeSerializerSnapshot` is provided to assist in the implementation 
of snapshots for these kind of
+composite serializers. It deals with reading and writing the nested serializer 
snapshots, as well as resolving
+the final compatibilty result taking into account the compatibility of all 
nested serializers.
+
+Below is an example of how the `CompositeTypeSerializerSnapshot` is 

[flink] 04/04: [hotfix] [core] GenericArraySerializerSnapshot should implement isOuterSnapshotCompatible

2019-02-20 Thread tzulitai
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 337e16782b983e4bd203f670362825d2a0bebe0f
Author: Tzu-Li (Gordon) Tai 
AuthorDate: Wed Feb 20 17:23:44 2019 +0800

[hotfix] [core] GenericArraySerializerSnapshot should implement 
isOuterSnapshotCompatible
---
 .../api/common/typeutils/base/GenericArraySerializerSnapshot.java| 5 +
 1 file changed, 5 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
index d39b58e..bb99ed5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -77,6 +77,11 @@ public final class GenericArraySerializerSnapshot extends 
CompositeTypeSerial
}
 
@Override
+   protected boolean isOuterSnapshotCompatible(GenericArraySerializer 
newSerializer) {
+   return this.componentClass == newSerializer.getComponentClass();
+   }
+
+   @Override
protected GenericArraySerializer 
createOuterSerializerWithNestedSerializers(TypeSerializer[] 
nestedSerializers) {
@SuppressWarnings("unchecked")
TypeSerializer componentSerializer = (TypeSerializer) 
nestedSerializers[0];



[flink] branch master updated: [FLINK-10569][runtime] Remove Instance usage in ExecutionVertexSchedulingTest

2019-02-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 773f711  [FLINK-10569][runtime] Remove Instance usage in 
ExecutionVertexSchedulingTest
773f711 is described below

commit 773f711ba45bc7388453980c258c2501ad014fa3
Author: ZiLi Chen 
AuthorDate: Wed Feb 20 18:38:21 2019 +0800

[FLINK-10569][runtime] Remove Instance usage in 
ExecutionVertexSchedulingTest
---
 .../ExecutionVertexSchedulingTest.java | 32 --
 1 file changed, 11 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index abb58c7..855d2a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -20,14 +20,10 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -36,9 +32,8 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
-import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 public class ExecutionVertexSchedulingTest extends TestLogger {
@@ -51,11 +46,10 @@ public class ExecutionVertexSchedulingTest extends 
TestLogger {
AkkaUtils.getDefaultTimeout());
 
// a slot than cannot be deployed to
-   final Instance instance = getInstance(new 
ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-   final SimpleSlot slot = instance.allocateSimpleSlot();
-   
-   slot.releaseSlot();
-   assertTrue(slot.isReleased());
+   final LogicalSlot slot = new TestingLogicalSlot();
+   slot.releaseSlot(new Exception("Test Exception"));
+
+   assertFalse(slot.isAlive());
 
CompletableFuture future = new 
CompletableFuture<>();
future.complete(slot);
@@ -81,11 +75,10 @@ public class ExecutionVertexSchedulingTest extends 
TestLogger {
AkkaUtils.getDefaultTimeout());
 
// a slot than cannot be deployed to
-   final Instance instance = getInstance(new 
ActorTaskManagerGateway(DummyActorGateway.INSTANCE));
-   final SimpleSlot slot = instance.allocateSimpleSlot();
+   final LogicalSlot slot = new TestingLogicalSlot();
+   slot.releaseSlot(new Exception("Test Exception"));
 
-   slot.releaseSlot();
-   assertTrue(slot.isReleased());
+   assertFalse(slot.isAlive());
 
final CompletableFuture future = new 
CompletableFuture<>();
 
@@ -118,12 +111,9 @@ public class ExecutionVertexSchedulingTest extends 
TestLogger {
final ExecutionVertex vertex = new ExecutionVertex(ejv, 
0, new IntermediateResult[0],
AkkaUtils.getDefaultTimeout());
 
-   final Instance instance = getInstance(new 
ActorTaskManagerGateway(
-   new 
ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext(;
-   final SimpleSlot slot = instance.allocateSimpleSlot();
+   final LogicalSlot slot = new TestingLogicalSlot();
 
-   CompletableFuture future = new 
CompletableFuture<>();
-   future.complete(slot);
+   

[flink] branch master updated: [hotfix][docs] Fix typo in sourceSinks.md.

2019-02-20 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 21c9ec0  [hotfix][docs] Fix typo in sourceSinks.md.
21c9ec0 is described below

commit 21c9ec061c43fb427e5da02e2c25c58247844fe5
Author: wenhuitang 
AuthorDate: Wed Feb 20 16:42:20 2019 +0800

[hotfix][docs] Fix typo in sourceSinks.md.
---
 docs/dev/table/sourceSinks.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 483ef20..b27d42d 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -413,7 +413,7 @@ AppendStreamTableSink implements TableSink {
 {% highlight scala %}
 AppendStreamTableSink[T] extends TableSink[T] {
 
-  def emitDataStream(dataStream: DataStream): Unit
+  def emitDataStream(dataStream: DataStream[T]): Unit
 }
 {% endhighlight %}
 



svn commit: r32568 - /release/flink/flink-1.7.1/

2019-02-20 Thread tzulitai
Author: tzulitai
Date: Wed Feb 20 10:02:57 2019
New Revision: 32568

Log:
Remove outdated Apache Flink 1.7.1 release

Removed:
release/flink/flink-1.7.1/



[flink] branch master updated: [hotfix][tests] Fix reference in PackagedProgramTest

2019-02-20 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 04db556  [hotfix][tests] Fix reference in PackagedProgramTest
04db556 is described below

commit 04db556dce3206e5037b3e1ecbc202637b796b2b
Author: TANG Wen-hui <42363767+wenhuit...@users.noreply.github.com>
AuthorDate: Wed Feb 20 17:28:35 2019 +0800

[hotfix][tests] Fix reference in PackagedProgramTest
---
 .../test/java/org/apache/flink/client/program/PackagedProgramTest.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
index 355e663..c95b609 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/PackagedProgramTest.java
@@ -35,7 +35,7 @@ import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
 
 /**
- * Tests for the {@link PackagedProgramTest}.
+ * Tests for the {@link PackagedProgram}.
  */
 public class PackagedProgramTest {
 



[flink] branch master updated: [FLINK-11663] Remove control flow break point from Execution#releaseAssignedResource

2019-02-20 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 76ecd7a  [FLINK-11663] Remove control flow break point from 
Execution#releaseAssignedResource
76ecd7a is described below

commit 76ecd7abd239d6690e5e6ea9afe8262c7a389f26
Author: Till Rohrmann 
AuthorDate: Tue Feb 19 16:59:38 2019 +0100

[FLINK-11663] Remove control flow break point from 
Execution#releaseAssignedResource

Since LogicalSlot#releaseSlot returns a future which is always completed 
from the main thread
it is no longer necessary to merge the future back into the main thread by 
using FutureUtils#
whenCompleteAsyncIfNotDone. The advantage is that LogicalSlot#releaseSlot 
will atomically release
the assigned Execution.

This closes #7755.
---
 .../flink/runtime/executiongraph/Execution.java|  7 ++--
 .../executiongraph/ExecutionGraphRestartTest.java  | 10 +++--
 .../runtime/executiongraph/ExecutionTest.java  | 45 ++
 3 files changed, 55 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 91b3d2d..6e1e8d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -1307,10 +1307,9 @@ public class Execution implements AccessExecution, 
Archiveable {
+   slot.releaseSlot(cause)
+   .whenComplete((Object ignored, Throwable 
throwable) -> {
+   
jobMasterMainThreadExecutor.assertRunningInMainThread();
if (throwable != null) {

releaseFuture.completeExceptionally(throwable);
} else {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index a5477fe..6192426 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -195,6 +195,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
new InfiniteDelayRestartStrategy(),
scheduler);
 
+   
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+
JobVertex jobVertex = new JobVertex("NoOpInvokable");
jobVertex.setInvokableClass(NoOpInvokable.class);
jobVertex.setParallelism(NUM_TASKS);
@@ -212,8 +214,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
// Kill the instance and wait for the job to restart
instance.markDead();
 
-   Assert.assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
-
assertEquals(JobStatus.RESTARTING, executionGraph.getState());
 
// If we fail when being in RESTARTING, then we should try to 
restart again
@@ -797,7 +797,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
}
 
private static ExecutionGraph newExecutionGraph(RestartStrategy 
restartStrategy, SlotProvider slotProvider) throws IOException {
-   return new ExecutionGraph(
+   final ExecutionGraph executionGraph = new ExecutionGraph(
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
new JobID(),
@@ -807,6 +807,10 @@ public class ExecutionGraphRestartTest extends TestLogger {
AkkaUtils.getDefaultTimeout(),
restartStrategy,
slotProvider);
+
+   
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+   return executionGraph;
}
 
private void restartAfterFailure(ExecutionGraph eg, FiniteDuration 
timeout, boolean haltAfterRestart) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 4881384..d69b0c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -39,6 +39,7 @@ import 

[flink] branch master updated: [FLINK-9803] Drop canEqual() from TypeSerializer

2019-02-20 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 09bb7bb  [FLINK-9803] Drop canEqual() from TypeSerializer
09bb7bb is described below

commit 09bb7bbc0f2535dab90c59a3362dfe53a70055ef
Author: Aljoscha Krettek 
AuthorDate: Fri Feb 8 15:10:38 2019 +0100

[FLINK-9803] Drop canEqual() from TypeSerializer

This change removes TypeSerializer.canEqual() because it is not useful
on that class hierachy. Serializers can only equals() on an exact
match, not with different serializers up and down the hierarchy.

This adds a serialVersionUID to InstantSerializer, to ensure that it
doesn't change from removing the canEqual() method.

Plus we also remove canEqual() from PojoField, where it is not needed.
---
 .../connectors/kafka/FlinkKafkaProducer011.java | 10 --
 .../streaming/connectors/kafka/FlinkKafkaProducer.java  | 10 --
 .../api/java/typeutils/runtime/WritableSerializer.java  |  7 +--
 .../flink/api/common/typeutils/CompositeSerializer.java | 15 +++
 .../flink/api/common/typeutils/TypeDeserializer.java|  8 
 .../api/common/typeutils/TypeDeserializerAdapter.java   |  4 
 .../flink/api/common/typeutils/TypeSerializer.java  |  8 
 .../common/typeutils/UnloadableDummyTypeSerializer.java |  5 -
 .../api/common/typeutils/base/BigDecSerializer.java |  5 -
 .../api/common/typeutils/base/BigIntSerializer.java |  5 -
 .../api/common/typeutils/base/BooleanSerializer.java|  5 -
 .../common/typeutils/base/BooleanValueSerializer.java   |  5 -
 .../flink/api/common/typeutils/base/ByteSerializer.java |  5 -
 .../api/common/typeutils/base/ByteValueSerializer.java  |  5 -
 .../flink/api/common/typeutils/base/CharSerializer.java |  5 -
 .../api/common/typeutils/base/CharValueSerializer.java  |  5 -
 .../flink/api/common/typeutils/base/DateSerializer.java |  5 -
 .../api/common/typeutils/base/DoubleSerializer.java |  5 -
 .../common/typeutils/base/DoubleValueSerializer.java|  5 -
 .../flink/api/common/typeutils/base/EnumSerializer.java |  7 +--
 .../api/common/typeutils/base/FloatSerializer.java  |  5 -
 .../api/common/typeutils/base/FloatValueSerializer.java |  5 -
 .../common/typeutils/base/GenericArraySerializer.java   |  8 +---
 .../api/common/typeutils/base/InstantSerializer.java|  8 +++-
 .../flink/api/common/typeutils/base/IntSerializer.java  |  5 -
 .../api/common/typeutils/base/IntValueSerializer.java   |  5 -
 .../flink/api/common/typeutils/base/ListSerializer.java |  5 -
 .../flink/api/common/typeutils/base/LongSerializer.java |  5 -
 .../api/common/typeutils/base/LongValueSerializer.java  |  5 -
 .../flink/api/common/typeutils/base/MapSerializer.java  |  5 -
 .../api/common/typeutils/base/NullValueSerializer.java  |  5 -
 .../api/common/typeutils/base/ShortSerializer.java  |  5 -
 .../api/common/typeutils/base/ShortValueSerializer.java |  5 -
 .../api/common/typeutils/base/SqlDateSerializer.java|  5 -
 .../api/common/typeutils/base/SqlTimeSerializer.java|  5 -
 .../common/typeutils/base/SqlTimestampSerializer.java   |  5 -
 .../api/common/typeutils/base/StringSerializer.java |  5 -
 .../common/typeutils/base/StringValueSerializer.java|  5 -
 .../common/typeutils/base/TypeSerializerSingleton.java  |  8 +---
 .../flink/api/common/typeutils/base/VoidSerializer.java |  5 -
 .../base/array/BooleanPrimitiveArraySerializer.java |  5 -
 .../base/array/BytePrimitiveArraySerializer.java|  5 -
 .../base/array/CharPrimitiveArraySerializer.java|  5 -
 .../base/array/DoublePrimitiveArraySerializer.java  |  5 -
 .../base/array/FloatPrimitiveArraySerializer.java   |  5 -
 .../base/array/IntPrimitiveArraySerializer.java |  5 -
 .../base/array/LongPrimitiveArraySerializer.java|  5 -
 .../base/array/ShortPrimitiveArraySerializer.java   |  5 -
 .../typeutils/base/array/StringArraySerializer.java |  5 -
 .../org/apache/flink/api/java/typeutils/PojoField.java  |  6 +-
 .../java/typeutils/runtime/CopyableValueSerializer.java | 11 ++-
 .../api/java/typeutils/runtime/EitherSerializer.java|  8 +---
 .../api/java/typeutils/runtime/NullableSerializer.java  |  6 --
 .../api/java/typeutils/runtime/PojoSerializer.java  |  8 +---
 .../flink/api/java/typeutils/runtime/RowSerializer.java |  7 +--
 .../api/java/typeutils/runtime/Tuple0Serializer.java| 11 ---
 .../api/java/typeutils/runtime/TupleSerializerBase.java |  8 +---
 .../api/java/typeutils/runtime/ValueSerializer.java |  7 +--
 

[flink-web] 01/04: [FLINK-11666] Add Amazon Kinesis Data Analytics to poweredby.zh.md for Chinese

2019-02-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 2f9522f104d1261cf6b30d033ede001a6ab349b1
Author: XuQianJin-Stars 
AuthorDate: Wed Feb 20 13:17:23 2019 +0800

[FLINK-11666] Add Amazon Kinesis Data Analytics to poweredby.zh.md for 
Chinese

This closes #173
---
 content/zh/index.html | 10 --
 content/zh/poweredby.html |  4 
 index.zh.md   | 10 --
 poweredby.zh.md   |  4 
 4 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/content/zh/index.html b/content/zh/index.html
index 8f012c2..f22781c 100644
--- a/content/zh/index.html
+++ b/content/zh/index.html
@@ -288,9 +288,9 @@
 
 
   
-
+
   
- 
+
 
 
   
@@ -342,6 +342,12 @@
 
 
   
+
+  
+ 
+
+
+  
 
   
 
diff --git a/content/zh/poweredby.html b/content/zh/poweredby.html
index d2124ae..5704bf0 100644
--- a/content/zh/poweredby.html
+++ b/content/zh/poweredby.html
@@ -166,6 +166,10 @@
   全球最大的零售商阿里巴巴(Alibaba)使用 Flink 的分支版本 Blink 来优化实时搜索排名。https://data-artisans.com/blog/blink-flink-alibaba-search; 
target="_blank"> 阅读更多有关 Flink 在阿里巴巴扮演角色的信息
   
   
+
+ Amazon Kinesis Data Analytics 是一种用于流处理完全托管的云服务,它部分地使用 Apache Flink 来增加其 
Java 应用程序功能。
+  
+  
 
  BetterCloud 是一个多 SaaS 管理平台,它使用 Flink 从 SaaS 应用程序活动中获取近乎实时的智能。https://www.youtube.com/watch?v=_yHds9SvMfElist=PLDX4T_cnKjD2UC6wJr_wRbIvtlMtkc-n2index=10;
 target="_blank"> 请参阅 BetterCloud 在 Flink Forward SF 2017 
上的分享
   
diff --git a/index.zh.md b/index.zh.md
index 3b963dd..a4b7ac0 100755
--- a/index.zh.md
+++ b/index.zh.md
@@ -146,9 +146,9 @@ layout: base
 
 
   
-
+
   
- 
+
 
 
   
@@ -200,6 +200,12 @@ layout: base
 
 
   
+
+  
+ 
+
+
+  
 
   
 
diff --git a/poweredby.zh.md b/poweredby.zh.md
index c34326b..14344ca 100755
--- a/poweredby.zh.md
+++ b/poweredby.zh.md
@@ -20,6 +20,10 @@ Apache Flink 为全球许多公司和企业的关键业务提供支持。在这
   全球最大的零售商阿里巴巴(Alibaba)使用 Flink 的分支版本 Blink 来优化实时搜索排名。https://data-artisans.com/blog/blink-flink-alibaba-search; 
target='_blank'> 阅读更多有关 Flink 在阿里巴巴扮演角色的信息
   
   
+
+ Amazon Kinesis Data Analytics 是一种用于流处理完全托管的云服务,它部分地使用 Apache Flink 来增加其 
Java 应用程序功能。
+  
+  
 
  BetterCloud 是一个多 SaaS 管理平台,它使用 Flink 从 SaaS 应用程序活动中获取近乎实时的智能。https://www.youtube.com/watch?v=_yHds9SvMfE=PLDX4T_cnKjD2UC6wJr_wRbIvtlMtkc-n2=10;
 target='_blank'> 请参阅 BetterCloud 在 Flink Forward SF 2017 
上的分享
   



[flink-web] 03/04: [FLINK-11564] Translate the "How To Contribute" page into Chinese

2019-02-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit bf1374cf5cf7ebe6c5bebc5b79d4208a74e0b409
Author: XuQianJin-Stars 
AuthorDate: Mon Feb 11 15:52:21 2019 +0800

[FLINK-11564] Translate the "How To Contribute" page into Chinese

This closes #155
---
 how-to-contribute.zh.md | 131 +++-
 1 file changed, 64 insertions(+), 67 deletions(-)

diff --git a/how-to-contribute.zh.md b/how-to-contribute.zh.md
index 95abf04..a91195a 100644
--- a/how-to-contribute.zh.md
+++ b/how-to-contribute.zh.md
@@ -4,146 +4,143 @@ title: "如何参与贡献"
 
 
 
-Apache Flink is developed by an open and friendly community. Everybody is 
cordially welcome to join the community and contribute to Apache Flink. There 
are several ways to interact with the community and to contribute to Flink 
including asking questions, filing bug reports, proposing new features, joining 
discussions on the mailing lists, contributing code or documentation, improving 
the website, or testing release candidates.
+Apache Flink 社区是一个开放友好的社区。我们诚挚地欢迎每个人加入社区并为 Apache Flink 做出贡献。有许多方式可以参与社区并为 
Flink 做出贡献,包括提问、提交错误报告、提出新功能、加入邮件列表上的讨论、贡献代码或文档、改进网站、以及测试候选版本。
 
 {% toc %}
 
-## Ask questions!
+## 问问题!
 
-The Apache Flink community is eager to help and to answer your questions. We 
have a [user mailing list]({{ site.baseurl }}/community.html#mailing-lists ) 
and watch Stack Overflow on the 
[[apache-flink]](http://stackoverflow.com/questions/tagged/apache-flink) tag.
+Apache Flink 社区非常乐意帮助并回答你的问题。我们提供了[用户邮件列表]({{ site.baseurl 
}}/zh/community.html#mailing-lists)并在 Stack Overflow 网站上创建了 
[[apache-flink]](http://stackoverflow.com/questions/tagged/apache-flink) 标签。
 
 -
 
-## File a bug report
+## 提交错误报告
 
-Please let us know if you experienced a problem with Flink and file a bug 
report. Open [Flink's Jira](http://issues.apache.org/jira/browse/FLINK), log in 
if necessary, and click on the red **Create** button at the top. Please give 
detailed information about the problem you encountered and, if possible, add a 
description that helps to reproduce the problem. Thank you very much.
+如果你在使用 Flink 时遇到了问题,请告知我们并提交错误报告。打开 [Flink 
Jira](http://issues.apache.org/jira/browse/FLINK),登录并点击顶部红色的 **Create** 
按钮。请提供你遇到的问题的详细信息,如果可能,请添加有助于重现问题的描述。非常感谢。
 
 -
 
-## Propose an improvement or a new feature
+## 提出改进或新功能
 
-Our community is constantly looking for feedback to improve Apache Flink. If 
you have an idea how to improve Flink or have a new feature in mind that would 
be beneficial for Flink users, please open an issue in [Flink's 
Jira](http://issues.apache.org/jira/browse/FLINK). The improvement or new 
feature should be described in appropriate detail and include the scope and its 
requirements if possible. Detailed information is important for a few reasons:
+我们的社区一直在寻找反馈来改进 Apache Flink。如果你对如何改进 Flink 有一个想法或者想到了一个新功能,这绝对能帮到 Flink 用户,请在 
[Flink Jira](http://issues.apache.org/jira/browse/FLINK) 
中提交一个问题。改进或新功能最好能详细描述下,并尽可能地加上支持的范围和需求。详细信息很重要,原因如下:
 
-- It ensures your requirements are met when the improvement or feature is 
implemented.
-- It helps to estimate the effort and to design a solution that addresses your 
needs.
-- It allow for constructive discussions that might arise around this issue.
+- 它可确保在实现改进或功能时满足你的需求。
+- 它有助于估算工作量并设计满足你需求的解决方案。
+- 它允许围绕这个问题展开建设性的讨论。
 
-Detailed information is also required, if you plan to contribute the 
improvement or feature you proposed yourself. Please read the [Contribute 
code]({{ site.base }}/contribute-code.html) guide in this case as well.
+如果你计划自己贡献改进或功能,也需要提供详细信息。在这种情况下,请阅读[贡献代码]({{ site.base 
}}/zh/contribute-code.html)指南。
 
+在开始实现之前,我们建议首先与社区就是否需要新功能以及如何实现新功能达成共识。某些功能可能超出了项目的范围,最好尽早发现。
 
-We recommend to first reach consensus with the community on whether a new 
feature is required and how to implement a new feature, before starting with 
the implementation. Some features might be out of scope of the project, and 
it's best to discover this early.
-
-For very big features that change Flink in a fundamental way we have another 
process in place:
-[Flink Improvement 
Proposals](https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals).
 If you are interested you can propose a new feature there or follow the
-discussions on existing proposals.
+对于从根本上改变 Flink 的非常大的功能,我们有另一个流程:[Flink 改进提案(Flink Improvement Proposals, 
FLIP)](https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals)。如果你有兴趣,可以在那里提出新功能,或者加入现有提案的讨论。
 
 -
 
-## Help others and join the discussions
+## 帮助他人并加入讨论
 
-Most communication in the Apache Flink community happens on two mailing lists:
+Apache Flink 社区中的大多数通信都发生在两个邮件列表中:
 
-- The user mailing list `u...@flink.apache.org` is the place where users of 
Apache Flink ask questions and seek help or advice. Joining the user list and 
helping other users is a very good way to contribute 

[flink-web] branch asf-site updated (2917889 -> 483d732)

2019-02-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git.


from 2917889  Add Amazon Kinesis Data Analytics to poweredby.md
 new 2f9522f  [FLINK-11666] Add Amazon Kinesis Data Analytics to 
poweredby.zh.md for Chinese
 new baab1ca  [FLINK-11559] Translate the "FAQ" page into Chinese
 new bf1374c  [FLINK-11564] Translate the "How To Contribute" page into 
Chinese
 new 483d732  Rebuild website

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 content/zh/faq.html   |  87 ++
 content/zh/how-to-contribute.html | 150 +++---
 content/zh/index.html |  10 ++-
 content/zh/poweredby.html |   4 +
 faq.zh.md |  69 --
 how-to-contribute.zh.md   | 131 -
 index.zh.md   |  10 ++-
 poweredby.zh.md   |   4 +
 8 files changed, 229 insertions(+), 236 deletions(-)



[flink-web] 02/04: [FLINK-11559] Translate the "FAQ" page into Chinese

2019-02-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit baab1cafa73b395f5180034026b27c7deb9e89c1
Author: 宝牛 
AuthorDate: Wed Feb 13 14:05:15 2019 +0800

[FLINK-11559] Translate the "FAQ" page into Chinese

This closes #165
---
 faq.zh.md | 69 +++
 1 file changed, 29 insertions(+), 40 deletions(-)

diff --git a/faq.zh.md b/faq.zh.md
index f93e5c8..73ad12e 100755
--- a/faq.zh.md
+++ b/faq.zh.md
@@ -22,69 +22,58 @@ under the License.
 
 
 
-The following questions are frequently asked with regard to the Flink project 
**in general**. 
+以下这些是 Flink 项目中经常会被问到的**常见**问题。
 
-If you have further questions, make sure to consult the 
[documentation]({{site.docs-stable}}) or [ask the community]({{ site.baseurl 
}}/gettinghelp.html).
+如果你还有其他问题,请先查阅[文档]({{site.docs-stable}})或[咨询社区]({{ site.baseurl 
}}/zh/gettinghelp.html)。
 
 {% toc %}
 
 
-# General
+# 常见问题
 
-## Is Apache Flink only for (near) real-time processing use cases?
+## Apache Flink 仅适用于(近)实时处理场景吗?
 
-Flink is a very general system for data processing and data-driven 
applications with *data streams* as
-the core building block. These data streams can be streams of real-time data, 
or stored streams of historic data.
-For example, in Flink's view a file is a stored stream of bytes. Because of 
that, Flink
-supports both real-time data processing and applications, as well as batch 
processing applications.
+Flink 是一个非常通用的系统,它以 *数据流* 为核心,用于数据处理和数据驱动的应用程序。这些数据流可以是实时数据流或存储的历史数据流。例如,Flink 
认为文件是存储的字节流。因此,Flink 同时支持实时数据处理和批处理应用程序。
 
-Streams can be *unbounded* (have no end, events continuously keep coming) or 
be *bounded* (streams have a beginning
-and an end). For example, a Twitter feed or a stream of events from a message 
queue are generally unbounded streams,
-whereas a stream of bytes from a file is a bounded stream.
+流可以是 *无界的* (不会结束,源源不断地发生事件)或 *有界的* (流有开始和结束)。例如,来自消息队列的 Twitter 
信息流或事件流通常是无界的流,而来自文件的字节流是有界的流。
 
-## If everything is a stream, why are there a DataStream and a DataSet API in 
Flink?
+## 如果一切都是流,为什么 Flink 中同时有 DataStream 和 DataSet API?
 
-Bounded streams are often more efficient to process than unbounded streams. 
Processing unbounded streams of events
-in (near) real-time requires the system to be able to immediately act on 
events and to produce intermediate
-results (often with low latency). Processing bounded streams usually does not 
require producing low latency results, because the data is a while old
-anyway (in relative terms). That allows Flink to process the data in a simple 
and more efficient way.
+处理有界流的数据通常比无界流更有效。在(近)实时要求的系统中,处理无限的事件流要求系统能够立即响应事件并产生中间结果(通常具有低延迟)。处理有界流通常不需要产生低延迟结果,因为无论如何数据都有点旧(相对而言)。这样
 Flink 就能以更加简单有效的方式去处理数据。
 
-The *DataStream* API captures the continuous processing of unbounded and 
bounded streams, with a model that supports
-low latency results and flexible reaction to events and time (including event 
time).
+*DataStream* API 基于一个支持低延迟和对事件和时间(包括事件时间)灵活反应的模型,用来连续处理无界流和有界流。
 
-The *DataSet* API has techniques that often speed up the processing of bounded 
data streams. In the future, the community
-plans to combine these optimizations with the techniques in the DataStream API.
+*DataSet* API 具有通常可加速有界数据流处理的技术。在未来,社区计划将这些优化与 DataStream API 中的技术相结合。
 
-## How does Flink relate to the Hadoop Stack?
+## Flink 与 Hadoop 软件栈是什么关系?
 
-Flink is independent of [Apache Hadoop](https://hadoop.apache.org/) and runs 
without any Hadoop dependencies.
+Flink 独立于[Apache Hadoop](https://hadoop.apache.org/),且能在没有任何 Hadoop 依赖的情况下运行。
 
-However, Flink integrates very well with many Hadoop components, for example, 
*HDFS*, *YARN*, or *HBase*.
-When running together with these components, Flink can use HDFS to read data, 
or write results and checkpoints/snapshots.
-Flink can be easily deployed via YARN and integrates with the YARN and HDFS 
Kerberos security modules.
+但是,Flink 可以很好的集成很多 Hadoop 组件,例如 *HDFS*、*YARN* 或 *HBase*。
+当与这些组件一起运行时,Flink 可以从 HDFS 读取数据,或写入结果和检查点(checkpoint)/快照(snapshot)数据到 HDFS 。
+Flink 还可以通过 YARN 轻松部署,并与 YARN 和 HDFS Kerberos 安全模块集成。
 
-## What other stacks does Flink run in?
+## Flink 运行的其他软件栈是什么?
 
-Users run Flink on [Kubernetes](https://kubernetes.io), 
[Mesos](https://mesos.apache.org/),
-[Docker](https://www.docker.com/), or even as standalone services.
+用户还可以在 [Kubernetes](https://kubernetes.io)、 [Mesos](https://mesos.apache.org/) 
或 [Docker](https://www.docker.com/) 上运行 Flink,甚至可以独立部署。
 
-## What are the prerequisites to use Flink?
+## 使用Flink的先决条件是什么?
 
-  - You need *Java 8* to run Flink jobs/applications.
-  - The Scala API (optional) depends on Scala 2.11.
-  - Highly-available setups with no single point of failure require [Apache 
ZooKeeper](https://zookeeper.apache.org/).
-  - For highly-available stream processing setups that can recover from 
failures, Flink requires some form of distributed 

[flink-web] 04/04: Rebuild website

2019-02-20 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git

commit 483d732103349b336b69d10e50d408630338f38e
Author: 云邪 
AuthorDate: Wed Feb 20 16:39:08 2019 +0800

Rebuild website
---
 content/zh/faq.html   |  87 ++
 content/zh/how-to-contribute.html | 150 +++---
 2 files changed, 112 insertions(+), 125 deletions(-)

diff --git a/content/zh/faq.html b/content/zh/faq.html
index 7a92808..017314b 100644
--- a/content/zh/faq.html
+++ b/content/zh/faq.html
@@ -169,88 +169,77 @@ under the License.
 
 
 
-The following questions are frequently asked with regard to the Flink 
project in general.
+以下这些是 Flink 项目中经常会被问到的常见问题。
 
-If you have further questions, make sure to consult the https://ci.apache.org/projects/flink/flink-docs-release-1.7;>documentation
 or ask the community.
+如果你还有其他问题,请先查阅https://ci.apache.org/projects/flink/flink-docs-release-1.7;>文档或咨询社区。
 
 
 
-  General
-  Is
 Apache Flink only for (near) real-time processing use cases?
-  If
 everything is a stream, why are there a DataStream and a DataSet API in 
Flink?
-  How does Flink 
relate to the Hadoop Stack?
-  What other stacks does 
Flink run in?
-  What are the 
prerequisites to use Flink?
-  What scale does Flink 
support?
-  Is Flink limited to 
in-memory data sets?
+  常见问题
+  Apache 
Flink 仅适用于(近)实时处理场景吗?
+  如果一切都是流,为什么 Flink 中同时有 
DataStream 和 DataSet API?
+  Flink 与 
Hadoop 软件栈是什么关系?
+  Flink 
运行的其他软件栈是什么?
+  使用Flink的先决条件是什么?
+  Flink支持多大的规模?
+  Flink是否仅限于内存数据集?
 
   
-  Common Error Messages
+  常见错误消息
 
 
 
 
-General
+常见问题
 
-Is 
Apache Flink only for (near) real-time processing use cases?
+Apache Flink 仅适用于(近)实时处理场景吗?
 
-Flink is a very general system for data processing and data-driven 
applications with data streams as
-the core building block. These data streams can be streams of real-time data, 
or stored streams of historic data.
-For example, in Flink’s view a file is a stored stream of bytes. Because of 
that, Flink
-supports both real-time data processing and applications, as well as batch 
processing applications.
+Flink 是一个非常通用的系统,它以 数据流 
为核心,用于数据处理和数据驱动的应用程序。这些数据流可以是实时数据流或存储的历史数据流。例如,Flink 认为文件是存储的字节流。因此,Flink 
同时支持实时数据处理和批处理应用程序。
 
-Streams can be unbounded (have no end, events continuously keep 
coming) or be bounded (streams have a beginning
-and an end). For example, a Twitter feed or a stream of events from a message 
queue are generally unbounded streams,
-whereas a stream of bytes from a file is a bounded stream.
+流可以是 无界的 (不会结束,源源不断地发生事件)或 有界的 (流有开始和结束)。例如,来自消息队列的 
Twitter 信息流或事件流通常是无界的流,而来自文件的字节流是有界的流。
 
-If
 everything is a stream, why are there a DataStream and a DataSet API in 
Flink?
+如果一切都是流,为什么 Flink 中同时有 DataStream 和 
DataSet API?
 
-Bounded streams are often more efficient to process than unbounded streams. 
Processing unbounded streams of events
-in (near) real-time requires the system to be able to immediately act on 
events and to produce intermediate
-results (often with low latency). Processing bounded streams usually does not 
require producing low latency results, because the data is a while old
-anyway (in relative terms). That allows Flink to process the data in a simple 
and more efficient way.
+处理有界流的数据通常比无界流更有效。在(近)实时要求的系统中,处理无限的事件流要求系统能够立即响应事件并产生中间结果(通常具有低延迟)。处理有界流通常不需要产生低延迟结果,因为无论如何数据都有点旧(相对而言)。这样
 Flink 就能以更加简单有效的方式去处理数据。
 
-The DataStream API captures the continuous processing of unbounded 
and bounded streams, with a model that supports
-low latency results and flexible reaction to events and time (including event 
time).
+DataStream API 基于一个支持低延迟和对事件和时间(包括事件时间)灵活反应的模型,用来连续处理无界流和有界流。
 
-The DataSet API has techniques that often speed up the processing 
of bounded data streams. In the future, the community
-plans to combine these optimizations with the techniques in the DataStream 
API.
+DataSet API 具有通常可加速有界数据流处理的技术。在未来,社区计划将这些优化与 DataStream API 
中的技术相结合。
 
-How does Flink relate to 
the Hadoop Stack?
+Flink 与 Hadoop 软件栈是什么关系?
 
-Flink is independent of https://hadoop.apache.org/;>Apache 
Hadoop and runs without any Hadoop dependencies.
+Flink 独立于https://hadoop.apache.org/;>Apache Hadoop,且能在没有任何 
Hadoop 依赖的情况下运行。
 
-However, Flink integrates very well with many Hadoop components, for 
example, HDFS, YARN, or HBase.
-When running together with these components, Flink can use HDFS to read data, 
or write results and checkpoints/snapshots.
-Flink can be easily deployed via YARN and integrates with the YARN and HDFS 
Kerberos security modules.
+但是,Flink 可以很好的集成很多 Hadoop 组件,例如 HDFS、YARN 或 
HBase。
+当与这些组件一起运行时,Flink 可以从 HDFS 读取数据,或写入结果和检查点(checkpoint)/快照(snapshot)数据到 HDFS 。
+Flink 还可以通过 YARN 轻松部署,并与 YARN 和 HDFS Kerberos 安全模块集成。
 
-What other stacks does Flink run 
in?
+Flink 运行的其他软件栈是什么?
 
-Users run Flink on https://kubernetes.io;>Kubernetes,