[flink] branch master updated: [FLINK-12177][coordination] Remove legacy FlinkUntypedActor

2019-04-15 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 734686f  [FLINK-12177][coordination] Remove legacy FlinkUntypedActor
734686f is described below

commit 734686ff29c520d930f24c72dc069f18fd1674b7
Author: tison 
AuthorDate: Sat Apr 13 12:17:02 2019 +0800

[FLINK-12177][coordination] Remove legacy FlinkUntypedActor
---
 .../flink/runtime/akka/FlinkUntypedActor.java  | 150 ---
 .../flink/runtime/akka/FlinkUntypedActorTest.java  | 160 -
 2 files changed, 310 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
deleted file mode 100644
index e0279b3..000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.akka;
-
-import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-import akka.actor.UntypedActor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-
-/**
- * Base class for Flink's actors implemented with Java. Actors inheriting from 
this class
- * automatically log received messages when the debug log level is activated. 
Furthermore,
- * they filter out {@link LeaderSessionMessage} with the wrong leader session 
ID. If a message
- * of type {@link RequiresLeaderSessionID} without being wrapped in a 
LeaderSessionMessage is
- * detected, then an Exception is thrown.
- *
- * In order to implement the actor behavior, an implementing subclass has 
to override the method
- * handleMessage, which defines how messages are processed. Furthermore, the 
subclass has to provide
- * a leader session ID option which is returned by getLeaderSessionID.
- */
-public abstract class FlinkUntypedActor extends UntypedActor {
-
-   //CHECKSTYLE.OFF: MemberNameCheck - re-enable after 
JobManager/TaskManager refactoring in FLIP-6?
-   protected final Logger LOG = LoggerFactory.getLogger(getClass());
-   //CHECKSTYLE.ON: MemberNameCheck
-
-   /**
-* This method is called by Akka if a new message has arrived for the 
actor. It logs the
-* processing time of the incoming message if the logging level is set 
to debug. After logging
-* the handleLeaderSessionID method is called.
-*
-* Important: This method cannot be overridden. The actor specific 
message handling logic is
-* implemented by the method handleMessage.
-*
-* @param message Incoming message
-* @throws Exception
-*/
-   @Override
-   public final void onReceive(Object message) throws Exception {
-   if (LOG.isTraceEnabled()) {
-   LOG.trace("Received message {} at {} from {}.", 
message, getSelf().path(), getSender());
-
-   long start = System.nanoTime();
-
-   handleLeaderSessionID(message);
-
-   long duration = (System.nanoTime() - start) / 1_000_000;
-
-   LOG.trace("Handled message {} in {} ms from {}.", 
message, duration, getSender());
-   } else {
-   handleLeaderSessionID(message);
-   }
-   }
-
-   /**
-* This method filters out {@link LeaderSessionMessage} whose leader 
session ID is not equal
-* to the actors leader session ID. If a message of type {@link 
RequiresLeaderSessionID}
-* arrives, then an Exception is thrown, because these messages have to 
be wrapped in a
-* {@link LeaderSessionMessage}.
-*
-* @param message Incoming message
-* @throws Exception
-*/
-   private void handleLeaderSessionID(Object message) throws Exception {
-   if (message instanceof LeaderSessionMessage) {
-

[flink] branch master updated: [hotfix] Delete empty TaskManager.scala file

2019-04-15 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 851ac7d  [hotfix] Delete empty TaskManager.scala file
851ac7d is described below

commit 851ac7d1c0e9661e9b75faaef994bb270a9f112f
Author: Stefan Richter 
AuthorDate: Mon Apr 15 18:36:39 2019 +0200

[hotfix] Delete empty TaskManager.scala file
---
 .../src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala | 0
 1 file changed, 0 insertions(+), 0 deletions(-)

diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
deleted file mode 100644
index e69de29..000



[flink] branch master updated: [hotfix] Remove unused method ExecutionGraph#restoreLatestCheckpointedState

2019-04-15 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 32f0ad7  [hotfix] Remove unused method 
ExecutionGraph#restoreLatestCheckpointedState
32f0ad7 is described below

commit 32f0ad79be4b18774e1506db56ae59658ec211e1
Author: Stefan Richter 
AuthorDate: Mon Apr 15 17:28:02 2019 +0200

[hotfix] Remove unused method ExecutionGraph#restoreLatestCheckpointedState
---
 .../flink/runtime/executiongraph/ExecutionGraph.java | 20 
 1 file changed, 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5e44e94..56e31a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1291,26 +1291,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
}
 
/**
-* Restores the latest checkpointed state.
-*
-* The recovery of checkpoints might block. Make sure that calls to 
this method don't
-* block the job manager actor and run asynchronously.
-*
-* @param errorIfNoCheckpoint Fail if there is no checkpoint available
-* @param allowNonRestoredState Allow to skip checkpoint state that 
cannot be mapped
-* to the ExecutionGraph vertices (if the checkpoint contains state for 
a
-* job vertex that is not part of this ExecutionGraph).
-*/
-   public void restoreLatestCheckpointedState(boolean errorIfNoCheckpoint, 
boolean allowNonRestoredState) throws Exception {
-   assertRunningInJobMasterMainThread();
-   synchronized (progressLock) {
-   if (checkpointCoordinator != null) {
-   
checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), 
errorIfNoCheckpoint, allowNonRestoredState);
-   }
-   }
-   }
-
-   /**
 * Returns the serializable {@link ArchivedExecutionConfig}.
 *
 * @return ArchivedExecutionConfig which may be null in case of errors



[flink] branch master updated: [FLINK-11884][table] Ported calculated table validation on top of Expressions

2019-04-15 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new f726468  [FLINK-11884][table] Ported calculated table validation on 
top of Expressions
f726468 is described below

commit f72646889b5f3afaf6c61645140ca7ae6a552096
Author: Dawid Wysakowicz 
AuthorDate: Mon Apr 1 16:03:01 2019 +0200

[FLINK-11884][table] Ported calculated table validation on top of 
Expressions
---
 .../rules/ResolveCallByArgumentsRule.java  |  15 +-
 .../table/operations/CalculatedTableFactory.java   | 155 +
 .../org/apache/flink/table/expressions/call.scala  |  19 +++
 .../functions/TemporalTableFunctionImpl.scala  |  14 +-
 .../functions/utils/UserDefinedFunctionUtils.scala |  53 ---
 .../table/operations/OperationTreeBuilder.scala|  18 +--
 .../flink/table/plan/logical/operators.scala   |  32 +
 .../LogicalCorrelateToTemporalTableJoinRule.scala  |  23 +--
 .../api/stream/table/TemporalTableJoinTest.scala   |  37 +++--
 .../table/validation/CorrelateValidationTest.scala |   3 +-
 10 files changed, 242 insertions(+), 127 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
index af3b4e3..bebb683 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java
@@ -27,6 +27,8 @@ import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.InputTypeSpec;
 import org.apache.flink.table.expressions.PlannerExpression;
 import org.apache.flink.table.typeutils.TypeCoercion;
+import org.apache.flink.table.validate.ValidationFailure;
+import org.apache.flink.table.validate.ValidationResult;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -84,9 +86,16 @@ final class ResolveCallByArgumentsRule implements 
ResolverRule {
 
private Expression validateArguments(CallExpression call, 
PlannerExpression plannerCall) {
if (!plannerCall.valid()) {
-   throw new 
ValidationException(String.format("Invalid arguments [%s] for call: %s",
-   call.getChildren(),
-   call));
+   final String errorMessage;
+   ValidationResult validationResult = 
plannerCall.validateInput();
+   if (validationResult instanceof 
ValidationFailure) {
+   errorMessage = ((ValidationFailure) 
validationResult).message();
+   } else {
+   errorMessage = String.format("Invalid 
arguments %s for function: %s",
+   call.getChildren(),
+   
call.getFunctionDefinition().getName());
+   }
+   throw new ValidationException(errorMessage);
}
 
return call;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
new file mode 100644
index 000..849d196
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types

[flink] 03/06: [FLINK-11952][2/4] Introduce basic plugin mechanism for Flink

2019-04-15 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c4953c4c5ff583d88f1686cd96fd7e7be9d8f11
Author: Stefan Richter 
AuthorDate: Tue Apr 9 10:37:03 2019 +0200

[FLINK-11952][2/4] Introduce basic plugin mechanism for Flink

The mechanism uses child-first classloading and creates classloaders from 
jars that are discovered
from a directory hierarchy.
---
 .../apache/flink/core/fs/FileSystemFactory.java|  12 +-
 .../flink/core/fs/UnsupportedSchemeFactory.java|   6 -
 .../core/fs/local/LocalFileSystemFactory.java  |   6 -
 .../core/plugin/DirectoryBasedPluginFinder.java| 103 
 .../Plugin.java}   |  33 ++---
 .../apache/flink/core/plugin/PluginDescriptor.java |  67 ++
 .../org/apache/flink/core/plugin/PluginFinder.java |  37 ++
 .../org/apache/flink/core/plugin/PluginLoader.java |  94 ++
 .../apache/flink/core/plugin/PluginManager.java|  77 
 .../org/apache/flink/core/plugin/PluginUtils.java  |  54 
 .../TemporaryClassLoaderContext.java}  |  33 ++---
 .../plugin/DirectoryBasedPluginFinderTest.java | 137 +
 .../plugin/TemporaryClassLoaderContextTest.java|  46 +++
 .../testutils/EntropyInjectingTestFileSystem.java  |   5 -
 .../org/apache/flink/testutils/TestFileSystem.java |  10 +-
 .../flink/runtime/fs/maprfs/MapRFsFactory.java |   6 -
 flink-tests/pom.xml|  31 +
 .../src/test/assembly/test-plugin-a-assembly.xml   |  43 +++
 .../src/test/assembly/test-plugin-b-assembly.xml   |  43 +++
 .../org/apache/flink/test/plugin/OtherTestSpi.java |  28 +
 .../apache/flink/test/plugin/PluginLoaderTest.java |  71 +++
 .../flink/test/plugin/PluginManagerTest.java   | 105 
 .../apache/flink/test/plugin/PluginTestBase.java   |  54 
 .../java/org/apache/flink/test/plugin/TestSpi.java |  28 +
 .../test/plugin/jar/plugina/DynamicClassA.java |  31 ++---
 .../test/plugin/jar/plugina/TestServiceA.java  |  34 +++--
 .../test/plugin/jar/pluginb/OtherTestServiceB.java |  30 ++---
 .../test/plugin/jar/pluginb/TestServiceB.java  |  30 ++---
 .../plugin-a/org.apache.flink.test.plugin.TestSpi  |  16 +++
 .../org.apache.flink.test.plugin.OtherTestSpi  |  16 +++
 .../plugin-b/org.apache.flink.test.plugin.TestSpi  |  16 +++
 31 files changed, 1130 insertions(+), 172 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
index 8a35471..eecf6f1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.core.fs;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.Plugin;
 
 import java.io.IOException;
 import java.net.URI;
@@ -31,7 +32,7 @@ import java.net.URI;
  * creating file systems via {@link #create(URI)}.
  */
 @PublicEvolving
-public interface FileSystemFactory {
+public interface FileSystemFactory extends Plugin {
 
/**
 * Gets the scheme of the file system created by this factory.
@@ -39,15 +40,6 @@ public interface FileSystemFactory {
String getScheme();
 
/**
-* Applies the given configuration to this factory. All future file 
system
-* instantiations via {@link #create(URI)} should take the 
configuration into
-* account.
-*
-* @param config The configuration to apply.
-*/
-   void configure(Configuration config);
-
-   /**
 * Creates a new file system for the given file system URI.
 * The URI describes the type of file system (via its scheme) and 
optionally the
 * authority (for example the host) of the file system.
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
index c2cb2d5..e873e63 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java
@@ -19,7 +19,6 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.Configuration;
 
 import javax.annotation.Nullable;
 
@@ -54,11 +53,6 @@ class UnsupportedSchemeFactory implements FileSystemFactory {
}
 
@Override
-   public void configure(Configuration config) {
-   // nothing to do here
-   }
-
-   @Override
public FileSystem create(URI fsUri) throws IOExc

[flink] 01/06: [hotfix] Remove unused exception from FileSystem#initialize

2019-04-15 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5aee179cb6677aad1e1b6f1808873167c4a05789
Author: Stefan Richter 
AuthorDate: Fri Mar 15 15:02:51 2019 +0100

[hotfix] Remove unused exception from FileSystem#initialize
---
 .../src/main/java/org/apache/flink/client/cli/CliFrontend.java | 10 ++
 .../src/main/java/org/apache/flink/core/fs/FileSystem.java |  2 +-
 .../apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java |  7 +--
 .../apache/flink/runtime/webmonitor/history/HistoryServer.java |  6 +-
 .../org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 10 ++
 .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java   |  8 +---
 .../apache/flink/table/client/gateway/local/LocalExecutor.java |  7 +--
 .../org/apache/flink/yarn/AbstractYarnClusterDescriptor.java   |  7 +--
 8 files changed, 10 insertions(+), 47 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 6b20e78..c7cbe9b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -67,7 +67,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.UndeclaredThrowableException;
@@ -124,16 +123,11 @@ public class CliFrontend {
 
public CliFrontend(
Configuration configuration,
-   List> customCommandLines) throws 
Exception {
+   List> customCommandLines) {
this.configuration = Preconditions.checkNotNull(configuration);
this.customCommandLines = 
Preconditions.checkNotNull(customCommandLines);
 
-   try {
-   FileSystem.initialize(this.configuration);
-   } catch (IOException e) {
-   throw new Exception("Error while setting the default " +
-   "filesystem scheme from configuration.", e);
-   }
+   FileSystem.initialize(this.configuration);
 
this.customCommandLineOptions = new Options();
 
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d451109..e7a3765 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -251,7 +251,7 @@ public abstract class FileSystem {
 *
 * @param config the configuration from where to fetch the parameter.
 */
-   public static void initialize(Configuration config) throws IOException, 
IllegalConfigurationException {
+   public static void initialize(Configuration config) throws 
IllegalConfigurationException {
LOCK.lock();
try {
// make sure file systems are re-instantiated after 
re-configuration
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
index cc1289f..98d2157 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -40,7 +40,6 @@ import org.apache.commons.cli.PosixParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
 
@@ -86,11 +85,7 @@ public class MesosTaskExecutorRunner {
final Map envs = System.getenv();
 
// configure the filesystems
-   try {
-   FileSystem.initialize(configuration);
-   } catch (IOException e) {
-   throw new IOException("Error while configuring the 
filesystems.", e);
-   }
+   FileSystem.initialize(configuration);
 
// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, 
true);
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index a93fe93..8d5183b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flin

[flink] 04/06: [FLINK-11952][3/4] Integrate plugin mechanism with FileSystem

2019-04-15 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c7b141b4b8767c9b7c8c72ef21055fd65908e848
Author: Stefan Richter 
AuthorDate: Fri Mar 22 14:57:09 2019 +0100

[FLINK-11952][3/4] Integrate plugin mechanism with FileSystem
---
 .../java/org/apache/flink/core/fs/FileSystem.java  | 109 ++---
 1 file changed, 76 insertions(+), 33 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index e7a3765..d159e70 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.fs.local.LocalFileSystemFactory;
+import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -44,12 +45,14 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ServiceLoader;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -217,11 +220,8 @@ public abstract class FileSystem {
/** Cache for file systems, by scheme + authority. */
private static final HashMap CACHE = new HashMap<>();
 
-   /** All available file system factories. */
-   private static final List RAW_FACTORIES = 
loadFileSystems();
-
/** Mapping of file system schemes to the corresponding factories,
-* populated in {@link FileSystem#initialize(Configuration)}. */
+* populated in {@link FileSystem#initialize(Configuration, 
PluginManager)}. */
private static final HashMap FS_FACTORIES = 
new HashMap<>();
 
/** The default factory that is used when no scheme matches. */
@@ -249,17 +249,57 @@ public abstract class FileSystem {
 * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
 * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
 *
+* @deprecated use {@link #initialize(Configuration, PluginManager)} 
instead.
+*
 * @param config the configuration from where to fetch the parameter.
 */
+   @Deprecated
public static void initialize(Configuration config) throws 
IllegalConfigurationException {
+   initializeWithoutPlugins(config);
+   }
+
+   private static void initializeWithoutPlugins(Configuration config) 
throws IllegalConfigurationException {
+   initialize(config, null);
+   }
+
+   /**
+* Initializes the shared file system settings.
+*
+* The given configuration is passed to each file system factory to 
initialize the respective
+* file systems. Because the configuration of file systems may be 
different subsequent to the call
+* of this method, this method clears the file system instance cache.
+*
+* This method also reads the default file system URI from the 
configuration key
+* {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link 
FileSystem#get(URI)} where
+* the URI has no scheme will be interpreted as relative to that URI.
+* As an example, assume the default file system URI is set to {@code 
'hdfs://localhost:9000/'}.
+* A file path of {@code '/user/USERNAME/in.txt'} is interpreted as
+* {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}.
+*
+* @param config the configuration from where to fetch the parameter.
+* @param pluginManager optional plugin manager that is used to 
initialized filesystems provided as plugins.
+*/
+   public static void initialize(
+   Configuration config,
+   PluginManager pluginManager) throws 
IllegalConfigurationException {
+
LOCK.lock();
try {
// make sure file systems are re-instantiated after 
re-configuration
CACHE.clear();
FS_FACTORIES.clear();
 
+   Collection>> 
factorySuppliers = new ArrayList<>(2);
+   factorySuppliers.add(() -> 
ServiceLoader.load(FileSystemFactory.class).iterator());
+
+   if (pluginManager != null) {
+   factorySuppliers.add(() -> 
pluginManager.load(FileSystemFactory.class));
+   }
+
+

[flink] branch master updated (2c503c2 -> 703dc8d)

2019-04-15 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2c503c2  [FLINK-11884][table] Ported sort & limit validation on top of 
Expressions
 new 5aee179  [hotfix] Remove unused exception from FileSystem#initialize
 new 1a10fbe  [FLINK-11952][1/3] Make ChildFirstClassLoader a top-level 
class in flink-core
 new 0c4953c  [FLINK-11952][2/4] Introduce basic plugin mechanism for Flink
 new c7b141b  [FLINK-11952][3/4] Integrate plugin mechanism with FileSystem
 new c6307b5  [FLINK-11952][4/4] Integrate plugin mechanism with FileSystem 
initialization in process entry points
 new 703dc8d  [hotfix] Use TemporaryClassLoaderContext in other appropriate 
places in the codebase

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/flink/client/cli/CliFrontend.java   |  13 +-
 .../java/org/apache/flink/core/fs/FileSystem.java  | 111 -
 .../apache/flink/core/fs/FileSystemFactory.java|  12 +-
 .../flink/core/fs/UnsupportedSchemeFactory.java|   6 -
 .../core/fs/local/LocalFileSystemFactory.java  |   6 -
 .../core/plugin/DirectoryBasedPluginFinder.java| 103 
 .../Plugin.java}   |  33 ++---
 .../apache/flink/core/plugin/PluginDescriptor.java |  67 ++
 .../org/apache/flink/core/plugin/PluginFinder.java |  37 ++
 .../org/apache/flink/core/plugin/PluginLoader.java |  94 ++
 .../apache/flink/core/plugin/PluginManager.java|  77 
 .../org/apache/flink/core/plugin/PluginUtils.java  |  54 
 .../TemporaryClassLoaderContext.java}  |  33 ++---
 .../apache/flink/util/ChildFirstClassLoader.java   | 123 ++
 .../java/org/apache/flink/util/LambdaUtil.java |  19 +--
 .../plugin/DirectoryBasedPluginFinderTest.java | 137 +
 .../plugin/TemporaryClassLoaderContextTest.java|  46 +++
 .../testutils/EntropyInjectingTestFileSystem.java  |   5 -
 .../org/apache/flink/testutils/TestFileSystem.java |  10 +-
 .../flink/runtime/fs/maprfs/MapRFsFactory.java |   6 -
 .../mesos/entrypoint/MesosTaskExecutorRunner.java  |  10 +-
 .../runtime/webmonitor/history/HistoryServer.java  |   9 +-
 .../runtime/entrypoint/ClusterEntrypoint.java  |  14 +--
 .../librarycache/FlinkUserCodeClassLoaders.java| 103 +---
 .../runtime/taskexecutor/TaskManagerRunner.java|  11 +-
 .../flink/runtime/taskmanager/TaskManager.scala|   0
 .../client/gateway/local/ExecutionContext.java |   7 +-
 .../table/client/gateway/local/LocalExecutor.java  |  10 +-
 flink-tests/pom.xml|  31 +
 .../src/test/assembly/test-plugin-a-assembly.xml   |  43 +++
 .../src/test/assembly/test-plugin-b-assembly.xml   |  43 +++
 .../org/apache/flink/test/plugin/OtherTestSpi.java |  28 +
 .../apache/flink/test/plugin/PluginLoaderTest.java |  71 +++
 .../flink/test/plugin/PluginManagerTest.java   | 105 
 .../apache/flink/test/plugin/PluginTestBase.java   |  54 
 .../java/org/apache/flink/test/plugin/TestSpi.java |  28 +
 .../test/plugin/jar/plugina/DynamicClassA.java |  31 ++---
 .../test/plugin/jar/plugina/TestServiceA.java  |  34 +++--
 .../test/plugin/jar/pluginb/OtherTestServiceB.java |  30 ++---
 .../test/plugin/jar/pluginb/TestServiceB.java  |  30 ++---
 .../plugin-a/org.apache.flink.test.plugin.TestSpi  |  16 +++
 .../org.apache.flink.test.plugin.OtherTestSpi  |  16 +++
 .../plugin-b/org.apache.flink.test.plugin.TestSpi  |  16 +++
 .../flink/yarn/AbstractYarnClusterDescriptor.java  |  10 +-
 .../apache/flink/yarn/YarnTaskExecutorRunner.java  |   6 +-
 45 files changed, 1373 insertions(+), 375 deletions(-)
 create mode 100644 
flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java
 copy 
flink-core/src/main/java/org/apache/flink/core/{fs/local/LocalFileSystemFactory.java
 => plugin/Plugin.java} (57%)
 create mode 100644 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginDescriptor.java
 create mode 100644 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginFinder.java
 create mode 100644 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
 create mode 100644 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java
 create mode 100644 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
 copy 
flink-core/src/main/java/org/apache/flink/core/{fs/local/LocalFileSystemFactory.java
 => plugin/TemporaryClassLoaderContext.java} (51%)
 create mode 100644 
flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.j

[flink] 05/06: [FLINK-11952][4/4] Integrate plugin mechanism with FileSystem initialization in process entry points

2019-04-15 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6307b59902c21744ceb57cb34ee04d0b660cb5d
Author: Stefan Richter 
AuthorDate: Tue Apr 9 10:38:01 2019 +0200

[FLINK-11952][4/4] Integrate plugin mechanism with FileSystem 
initialization in process entry points

This integration currently still does not provide a proper plugin root 
folder because this
requires more changes, in particular FLINK-11952.

This closes #8038.
---
 .../src/main/java/org/apache/flink/client/cli/CliFrontend.java  | 5 -
 .../org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java  | 5 -
 .../org/apache/flink/runtime/webmonitor/history/HistoryServer.java  | 5 -
 .../java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 6 +-
 .../org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java| 5 -
 .../scala/org/apache/flink/runtime/taskmanager/TaskManager.scala| 0
 .../org/apache/flink/table/client/gateway/local/LocalExecutor.java  | 5 -
 .../java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java   | 5 -
 .../src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java | 6 +-
 9 files changed, 34 insertions(+), 8 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index c7cbe9b..4c4da69 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -39,6 +39,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -80,6 +81,7 @@ import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -127,7 +129,8 @@ public class CliFrontend {
this.configuration = Preconditions.checkNotNull(configuration);
this.customCommandLines = 
Preconditions.checkNotNull(customCommandLines);
 
-   FileSystem.initialize(this.configuration);
+   //TODO provide plugin path.
+   FileSystem.initialize(this.configuration, 
PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
 
this.customCommandLineOptions = new Options();
 
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
index 98d2157..e2d0161 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.mesos.entrypoint;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * The entry point for running a TaskManager in a Mesos container.
@@ -85,7 +87,8 @@ public class MesosTaskExecutorRunner {
final Map envs = System.getenv();
 
// configure the filesystems
-   FileSystem.initialize(configuration);
+   //TODO provide plugin path.
+   FileSystem.initialize(configuration, 
PluginUtils.createPluginManagerFromRootFolder(Optional.empty()));
 
// tell akka to die in case of an error
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, 
true);
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 8d5183b..0bb98e8 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ 

[flink] 02/06: [FLINK-11952][1/3] Make ChildFirstClassLoader a top-level class in flink-core

2019-04-15 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a10fbef644ad32a3358711bfa5a167118186482
Author: Stefan Richter 
AuthorDate: Thu Mar 14 14:50:18 2019 +0100

[FLINK-11952][1/3] Make ChildFirstClassLoader a top-level class in 
flink-core
---
 .../apache/flink/util/ChildFirstClassLoader.java   | 123 +
 .../librarycache/FlinkUserCodeClassLoaders.java| 103 +
 2 files changed, 125 insertions(+), 101 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java 
b/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java
new file mode 100644
index 000..3942b1b
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A variant of the URLClassLoader that first loads from the URLs and only 
after that from the parent.
+ *
+ * {@link #getResourceAsStream(String)} uses {@link #getResource(String)} 
internally so we
+ * don't override that.
+ */
+public final class ChildFirstClassLoader extends URLClassLoader {
+
+   /**
+* The classes that should always go through the parent ClassLoader. 
This is relevant
+* for Flink classes, for example, to avoid loading Flink classes that 
cross the
+* user-code/system-code barrier in the user-code ClassLoader.
+*/
+   private final String[] alwaysParentFirstPatterns;
+
+   public ChildFirstClassLoader(URL[] urls, ClassLoader parent, String[] 
alwaysParentFirstPatterns) {
+   super(urls, parent);
+   this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;
+   }
+
+   @Override
+   protected synchronized Class loadClass(
+   String name, boolean resolve) throws ClassNotFoundException {
+
+   // First, check if the class has already been loaded
+   Class c = findLoadedClass(name);
+
+   if (c == null) {
+   // check whether the class should go parent-first
+   for (String alwaysParentFirstPattern : 
alwaysParentFirstPatterns) {
+   if (name.startsWith(alwaysParentFirstPattern)) {
+   return super.loadClass(name, resolve);
+   }
+   }
+
+   try {
+   // check the URLs
+   c = findClass(name);
+   } catch (ClassNotFoundException e) {
+   // let URLClassLoader do it, which will 
eventually call the parent
+   c = super.loadClass(name, resolve);
+   }
+   }
+
+   if (resolve) {
+   resolveClass(c);
+   }
+
+   return c;
+   }
+
+   @Override
+   public URL getResource(String name) {
+   // first, try and find it via the URLClassloader
+   URL urlClassLoaderResource = findResource(name);
+
+   if (urlClassLoaderResource != null) {
+   return urlClassLoaderResource;
+   }
+
+   // delegate to super
+   return super.getResource(name);
+   }
+
+   @Override
+   public Enumeration getResources(String name) throws IOException {
+   // first get resources from URLClassloader
+   Enumeration urlClassLoaderResources = findResources(name);
+
+   final List result = new ArrayList<>();
+
+   while (urlClassLoaderResources.hasMoreElements()) {
+   result.add(urlClassLoaderResources.nextElement());
+   }
+
+   // get parent urls
+   Enumerati

[flink] 06/06: [hotfix] Use TemporaryClassLoaderContext in other appropriate places in the codebase

2019-04-15 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 703dc8dab1ee35b2532b472eec40fdc7dd124cc3
Author: Stefan Richter 
AuthorDate: Mon Apr 8 17:38:26 2019 +0200

[hotfix] Use TemporaryClassLoaderContext in other appropriate places in the 
codebase
---
 .../main/java/org/apache/flink/util/LambdaUtil.java   | 19 +++
 .../table/client/gateway/local/ExecutionContext.java  |  7 ++-
 2 files changed, 5 insertions(+), 21 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
index fdb6943..163b583 100644
--- a/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.util;
 
+import org.apache.flink.core.plugin.TemporaryClassLoaderContext;
 import org.apache.flink.util.function.SupplierWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 import org.apache.flink.util.function.ThrowingRunnable;
@@ -76,16 +77,9 @@ public final class LambdaUtil {
final ClassLoader cl,
final ThrowingRunnable r) throws E {
 
-   final Thread currentThread = Thread.currentThread();
-   final ClassLoader oldClassLoader = 
currentThread.getContextClassLoader();
-
-   try {
-   currentThread.setContextClassLoader(cl);
+   try (TemporaryClassLoaderContext tmpCl = new 
TemporaryClassLoaderContext(cl)) {
r.run();
}
-   finally {
-   currentThread.setContextClassLoader(oldClassLoader);
-   }
}
 
/**
@@ -99,15 +93,8 @@ public final class LambdaUtil {
final ClassLoader cl,
final SupplierWithException s) throws E {
 
-   final Thread currentThread = Thread.currentThread();
-   final ClassLoader oldClassLoader = 
currentThread.getContextClassLoader();
-
-   try {
-   currentThread.setContextClassLoader(cl);
+   try (TemporaryClassLoaderContext tmpCl = new 
TemporaryClassLoaderContext(cl)) {
return s.get();
}
-   finally {
-   currentThread.setContextClassLoader(oldClassLoader);
-   }
}
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index dab6bbe..1649c23 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -29,6 +29,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.plugin.TemporaryClassLoaderContext;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -185,12 +186,8 @@ public class ExecutionContext {
 * Executes the given supplier using the execution context's 
classloader as thread classloader.
 */
public  R wrapClassLoader(Supplier supplier) {
-   final ClassLoader previousClassloader = 
Thread.currentThread().getContextClassLoader();
-   Thread.currentThread().setContextClassLoader(classLoader);
-   try {
+   try (TemporaryClassLoaderContext tmpCl = new 
TemporaryClassLoaderContext(classLoader)){
return supplier.get();
-   } finally {
-   
Thread.currentThread().setContextClassLoader(previousClassloader);
}
}
 



[flink] branch master updated: [FLINK-11884][table] Ported sort & limit validation on top of Expressions

2019-04-15 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 2c503c2  [FLINK-11884][table] Ported sort & limit validation on top of 
Expressions
2c503c2 is described below

commit 2c503c2488f786ae684fa956b23d4e2f8e1797de
Author: Dawid Wysakowicz 
AuthorDate: Mon Apr 1 15:17:25 2019 +0200

[FLINK-11884][table] Ported sort & limit validation on top of Expressions
---
 .../expressions/BuiltInFunctionDefinitions.java|   2 +
 .../table/operations/SortOperationFactory.java | 118 +
 .../flink/table/operations/SortOperationUtils.java |  71 -
 .../table/operations/OperationTreeBuilder.scala|  12 +--
 .../flink/table/plan/logical/operators.scala   |  23 +---
 5 files changed, 128 insertions(+), 98 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
index db6fcd3..801fad2 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java
@@ -333,6 +333,8 @@ public final class BuiltInFunctionDefinitions {
WINDOW_START, WINDOW_END, PROCTIME, ROWTIME
));
 
+   public static final List ORDERING = 
Arrays.asList(ORDER_ASC, ORDER_DESC);
+
public static List getDefinitions() {
final Field[] fields = 
BuiltInFunctionDefinitions.class.getFields();
final List list = new 
ArrayList<>(fields.length);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java
new file mode 100644
index 000..8540f7b
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionBridge;
+import org.apache.flink.table.expressions.PlannerExpression;
+import org.apache.flink.table.plan.logical.Limit;
+import org.apache.flink.table.plan.logical.LogicalNode;
+import org.apache.flink.table.plan.logical.Sort;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDERING;
+import static 
org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_ASC;
+
+/**
+ * Utility class for creating a valid {@link Sort} operation.
+ */
+@Internal
+public class SortOperationFactory {
+
+   private final boolean isStreaming;
+   private final ExpressionBridge expressionBridge;
+   private final OrderWrapper orderWrapper = new OrderWrapper();
+
+   public SortOperationFactory(
+   ExpressionBridge expressionBridge,
+   boolean isStreaming) {
+   this.expressionBridge = expressionBridge;
+   this.isStreaming = isStreaming;
+   }
+
+   /**
+* Creates a valid {@link Sort} operation.
+*
+* NOTE: if the collation is not explicitly specified for any 
expression, it is wrapped in a
+* default ascending order
+*
+* @param orders expressions describing order,
+* @param child relational expression on top of which to apply the sort 
operation
+* @return valid so

[flink] branch master updated: [FLINK-12154][network] Remove legacy fields for SingleInputGate (#8136)

2019-04-15 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c7ef6db  [FLINK-12154][network] Remove legacy fields for 
SingleInputGate (#8136)
c7ef6db is described below

commit c7ef6db71a807cf21cc1ef3a13f4d95520dfc0fe
Author: zhijiang 
AuthorDate: Mon Apr 15 19:01:08 2019 +0800

[FLINK-12154][network] Remove legacy fields for SingleInputGate (#8136)

This work is a preparation for FLINK-11726.

In SingleInputGate#create, we could remove unused parameter 
ExecutionAttemptID.
And for the constructor of SingleInputGate, we could remove unused 
parameter TaskIOMetricGroup.
Then we introduce createSingleInputGate for reusing the process of creating 
SingleInputGate in related tests.
---
 .../partition/consumer/SingleInputGate.java|  5 +--
 .../org/apache/flink/runtime/taskmanager/Task.java |  1 -
 .../runtime/io/network/NetworkEnvironmentTest.java | 18 ++--
 ...editBasedPartitionRequestClientHandlerTest.java | 10 ++---
 .../netty/PartitionRequestClientHandlerTest.java   | 25 +--
 .../network/netty/PartitionRequestClientTest.java  |  6 +--
 .../network/partition/InputChannelTestUtils.java   | 24 ++
 .../network/partition/InputGateConcurrentTest.java | 33 ++
 .../network/partition/InputGateFairnessTest.java   | 52 +++---
 .../partition/consumer/LocalInputChannelTest.java  | 30 +
 .../partition/consumer/RemoteInputChannelTest.java | 36 +--
 .../partition/consumer/SingleInputGateTest.java| 16 ++-
 .../partition/consumer/TestSingleInputGate.java| 21 +
 .../partition/consumer/UnionInputGateTest.java | 24 ++
 .../StreamNetworkBenchmarkEnvironment.java |  1 -
 15 files changed, 87 insertions(+), 215 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index e7822fe..9c0196e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -193,7 +192,6 @@ public class SingleInputGate implements InputGate {
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
-   TaskIOMetricGroup metrics,
boolean isCreditBased) {
 
this.owningTaskName = checkNotNull(owningTaskName);
@@ -664,7 +662,6 @@ public class SingleInputGate implements InputGate {
public static SingleInputGate create(
String owningTaskName,
JobID jobId,
-   ExecutionAttemptID executionId,
InputGateDeploymentDescriptor igdd,
NetworkEnvironment networkEnvironment,
TaskEventPublisher taskEventPublisher,
@@ -683,7 +680,7 @@ public class SingleInputGate implements InputGate {
 
final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, consumedResultId, 
consumedPartitionType, consumedSubpartitionIndex,
-   icdd.length, taskActions, metrics, 
networkConfig.isCreditBased());
+   icdd.length, taskActions, 
networkConfig.isCreditBased());
 
// Create the input channels. There is one input channel for 
each consumed partition.
final InputChannel[] inputChannels = new 
InputChannel[icdd.length];
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b3d952d..5e7174c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -404,7 +404,6 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
SingleInputGate gate = SingleInputGate.create(
taskNameWithSubtaskAndId,
jobId,
-

[flink] branch master updated: [FLINK-11884][table] Ported alias validation on top of Expressions

2019-04-15 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 807bce2  [FLINK-11884][table] Ported alias validation on top of 
Expressions
807bce2 is described below

commit 807bce2ff7be7f2c3ca09aab9a01363b8eb72a3b
Author: Dawid Wysakowicz 
AuthorDate: Mon Apr 1 15:06:44 2019 +0200

[FLINK-11884][table] Ported alias validation on top of Expressions
---
 .../table/operations/AliasOperationUtils.java  | 110 +
 .../table/operations/OperationTreeBuilder.scala|   6 +-
 .../flink/table/plan/logical/operators.scala   |  30 +-
 3 files changed, 114 insertions(+), 32 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
new file mode 100644
index 000..ba8e87a
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor;
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.UnresolvedReferenceExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Utility class for creating valid alias expressions that can be later used 
as a projection.
+ */
+@Internal
+public final class AliasOperationUtils {
+
+   private static final AliasLiteralValidator aliasLiteralValidator = new 
AliasLiteralValidator();
+   private static final String ALL_REFERENCE = "*";
+
+   /**
+* Creates a list of valid alias expressions. Resulting expression 
might still contain
+* {@link UnresolvedReferenceExpression}.
+*
+* @param aliases aliases to validate
+* @param child relational operation on top of which to apply the 
aliases
+* @return validated list of aliases
+*/
+   public static List createAliasList(List 
aliases, TableOperation child) {
+   TableSchema childSchema = child.getTableSchema();
+
+   if (aliases.size() > childSchema.getFieldCount()) {
+   throw new ValidationException("Aliasing more fields 
than we actually have.");
+   }
+
+   List fieldAliases = aliases.stream()
+   .map(f -> f.accept(aliasLiteralValidator))
+   .collect(Collectors.toList());
+
+   String[] childNames = childSchema.getFieldNames();
+   return IntStream.range(0, childNames.length)
+   .mapToObj(idx -> {
+   UnresolvedReferenceExpression oldField = new 
UnresolvedReferenceExpression(childNames[idx]);
+   if (idx < fieldAliases.size()) {
+   ValueLiteralExpression alias = 
fieldAliases.get(idx);
+   return new 
CallExpression(BuiltInFunctionDefinitions.AS, Arrays.asList(oldField, alias));
+   } else {
+   return oldField;
+   }
+   }).collect(Collectors.toList());
+   }
+
+   private static class AliasLiteralValidator extends 
ApiExpressionDefaultVisitor {
+
+   @Override
+  

[flink] branch master updated: [FLINK-12168][table-planner-blink] Support e2e limit, sortLimit, rank, union in blink batch (#8156)

2019-04-15 Thread kurt
This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 068a0ed  [FLINK-12168][table-planner-blink] Support e2e limit, 
sortLimit, rank, union in blink batch (#8156)
068a0ed is described below

commit 068a0ede2463db9c92e4dc097a26796bd97fbffa
Author: Jingsong Lee 
AuthorDate: Mon Apr 15 16:56:34 2019 +0800

[FLINK-12168][table-planner-blink] Support e2e limit, sortLimit, rank, 
union in blink batch (#8156)
---
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   |   3 +-
 .../codegen/sort/ComparatorCodeGenerator.scala |  84 
 .../codegen/{ => sort}/SortCodeGenerator.scala |  33 +
 .../plan/nodes/physical/batch/BatchExecLimit.scala |  42 +-
 .../plan/nodes/physical/batch/BatchExecRank.scala  |  71 ++-
 .../plan/nodes/physical/batch/BatchExecSort.scala  |   2 +-
 .../nodes/physical/batch/BatchExecSortLimit.scala  |  58 -
 .../physical/batch/BatchExecSortMergeJoin.scala|   3 +-
 .../plan/nodes/physical/batch/BatchExecUnion.scala |  21 ++-
 .../flink/table/codegen/SortCodeGeneratorTest.java |   1 +
 .../apache/flink/table/util/BaseRowTestUtil.java   |   5 +
 .../flink/table/runtime/batch/sql/CalcITCase.scala |   3 -
 .../table/runtime/batch/sql/CorrelateITCase.scala  |   1 -
 .../table/runtime/batch/sql/CorrelateITCase2.scala |   4 +-
 .../table/runtime/batch/sql/DecimalITCase.scala|  53 ++--
 .../table/runtime/batch/sql/LimitITCase.scala  | 120 +
 .../flink/table/runtime/batch/sql/RankITCase.scala | 142 +
 .../table/runtime/batch/sql/SortLimitITCase.scala  | 129 +++
 .../table/runtime/batch/sql/UnionITCase.scala  | 141 
 .../batch/sql/agg/AggregateITCaseBase.scala|   1 -
 .../table/runtime/batch/sql/join/JoinITCase.scala  |   1 -
 .../runtime/batch/sql/join/OuterJoinITCase.scala   |   5 +-
 .../flink/table/runtime/utils/BatchTestBase.scala  |  26 +++-
 .../flink/table/runtime/sort/LimitOperator.java|  63 +
 .../flink/table/runtime/sort/RankOperator.java | 112 
 .../table/runtime/sort/SortLimitOperator.java  | 110 
 26 files changed, 1167 insertions(+), 67 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala
index 64aeb81..f968c06 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -25,7 +25,8 @@ import org.apache.flink.table.`type`.{InternalType, RowType}
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.codegen.CodeGenUtils.{binaryRowFieldSetAccess, 
binaryRowSetNull}
 import 
org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.buildAggregateArgsMapping
-import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, 
ExprCodeGenerator, GenerateUtils, GeneratedExpression, OperatorCodeGenerator, 
SortCodeGenerator}
+import org.apache.flink.table.codegen.sort.SortCodeGenerator
+import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, 
ExprCodeGenerator, GenerateUtils, GeneratedExpression, OperatorCodeGenerator}
 import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, GenericRow, 
JoinedRow}
 import org.apache.flink.table.expressions.{CallExpression, Expression, 
ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, 
RexNodeConverter, SymbolExpression, TypeLiteralExpression, 
UnresolvedReferenceExpression, ValueLiteralExpression}
 import 
org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/ComparatorCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/ComparatorCodeGenerator.scala
new file mode 100644
index 000..02c3778
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/ComparatorCodeGenerator.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.or