[flink] branch master updated: [FLINK-12177][coordination] Remove legacy FlinkUntypedActor
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 734686f [FLINK-12177][coordination] Remove legacy FlinkUntypedActor 734686f is described below commit 734686ff29c520d930f24c72dc069f18fd1674b7 Author: tison AuthorDate: Sat Apr 13 12:17:02 2019 +0800 [FLINK-12177][coordination] Remove legacy FlinkUntypedActor --- .../flink/runtime/akka/FlinkUntypedActor.java | 150 --- .../flink/runtime/akka/FlinkUntypedActorTest.java | 160 - 2 files changed, 310 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java deleted file mode 100644 index e0279b3..000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.akka; - -import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; -import org.apache.flink.runtime.messages.RequiresLeaderSessionID; - -import akka.actor.UntypedActor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.UUID; - -/** - * Base class for Flink's actors implemented with Java. Actors inheriting from this class - * automatically log received messages when the debug log level is activated. Furthermore, - * they filter out {@link LeaderSessionMessage} with the wrong leader session ID. If a message - * of type {@link RequiresLeaderSessionID} without being wrapped in a LeaderSessionMessage is - * detected, then an Exception is thrown. - * - * In order to implement the actor behavior, an implementing subclass has to override the method - * handleMessage, which defines how messages are processed. Furthermore, the subclass has to provide - * a leader session ID option which is returned by getLeaderSessionID. - */ -public abstract class FlinkUntypedActor extends UntypedActor { - - //CHECKSTYLE.OFF: MemberNameCheck - re-enable after JobManager/TaskManager refactoring in FLIP-6? - protected final Logger LOG = LoggerFactory.getLogger(getClass()); - //CHECKSTYLE.ON: MemberNameCheck - - /** -* This method is called by Akka if a new message has arrived for the actor. It logs the -* processing time of the incoming message if the logging level is set to debug. After logging -* the handleLeaderSessionID method is called. -* -* Important: This method cannot be overridden. The actor specific message handling logic is -* implemented by the method handleMessage. -* -* @param message Incoming message -* @throws Exception -*/ - @Override - public final void onReceive(Object message) throws Exception { - if (LOG.isTraceEnabled()) { - LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender()); - - long start = System.nanoTime(); - - handleLeaderSessionID(message); - - long duration = (System.nanoTime() - start) / 1_000_000; - - LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender()); - } else { - handleLeaderSessionID(message); - } - } - - /** -* This method filters out {@link LeaderSessionMessage} whose leader session ID is not equal -* to the actors leader session ID. If a message of type {@link RequiresLeaderSessionID} -* arrives, then an Exception is thrown, because these messages have to be wrapped in a -* {@link LeaderSessionMessage}. -* -* @param message Incoming message -* @throws Exception -*/ - private void handleLeaderSessionID(Object message) throws Exception { - if (message instanceof LeaderSessionMessage) { -
[flink] branch master updated: [hotfix] Delete empty TaskManager.scala file
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 851ac7d [hotfix] Delete empty TaskManager.scala file 851ac7d is described below commit 851ac7d1c0e9661e9b75faaef994bb270a9f112f Author: Stefan Richter AuthorDate: Mon Apr 15 18:36:39 2019 +0200 [hotfix] Delete empty TaskManager.scala file --- .../src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala | 0 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala deleted file mode 100644 index e69de29..000
[flink] branch master updated: [hotfix] Remove unused method ExecutionGraph#restoreLatestCheckpointedState
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 32f0ad7 [hotfix] Remove unused method ExecutionGraph#restoreLatestCheckpointedState 32f0ad7 is described below commit 32f0ad79be4b18774e1506db56ae59658ec211e1 Author: Stefan Richter AuthorDate: Mon Apr 15 17:28:02 2019 +0200 [hotfix] Remove unused method ExecutionGraph#restoreLatestCheckpointedState --- .../flink/runtime/executiongraph/ExecutionGraph.java | 20 1 file changed, 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 5e44e94..56e31a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1291,26 +1291,6 @@ public class ExecutionGraph implements AccessExecutionGraph { } /** -* Restores the latest checkpointed state. -* -* The recovery of checkpoints might block. Make sure that calls to this method don't -* block the job manager actor and run asynchronously. -* -* @param errorIfNoCheckpoint Fail if there is no checkpoint available -* @param allowNonRestoredState Allow to skip checkpoint state that cannot be mapped -* to the ExecutionGraph vertices (if the checkpoint contains state for a -* job vertex that is not part of this ExecutionGraph). -*/ - public void restoreLatestCheckpointedState(boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception { - assertRunningInJobMasterMainThread(); - synchronized (progressLock) { - if (checkpointCoordinator != null) { - checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), errorIfNoCheckpoint, allowNonRestoredState); - } - } - } - - /** * Returns the serializable {@link ArchivedExecutionConfig}. * * @return ArchivedExecutionConfig which may be null in case of errors
[flink] branch master updated: [FLINK-11884][table] Ported calculated table validation on top of Expressions
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f726468 [FLINK-11884][table] Ported calculated table validation on top of Expressions f726468 is described below commit f72646889b5f3afaf6c61645140ca7ae6a552096 Author: Dawid Wysakowicz AuthorDate: Mon Apr 1 16:03:01 2019 +0200 [FLINK-11884][table] Ported calculated table validation on top of Expressions --- .../rules/ResolveCallByArgumentsRule.java | 15 +- .../table/operations/CalculatedTableFactory.java | 155 + .../org/apache/flink/table/expressions/call.scala | 19 +++ .../functions/TemporalTableFunctionImpl.scala | 14 +- .../functions/utils/UserDefinedFunctionUtils.scala | 53 --- .../table/operations/OperationTreeBuilder.scala| 18 +-- .../flink/table/plan/logical/operators.scala | 32 + .../LogicalCorrelateToTemporalTableJoinRule.scala | 23 +-- .../api/stream/table/TemporalTableJoinTest.scala | 37 +++-- .../table/validation/CorrelateValidationTest.scala | 3 +- 10 files changed, 242 insertions(+), 127 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java index af3b4e3..bebb683 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/expressions/rules/ResolveCallByArgumentsRule.java @@ -27,6 +27,8 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.InputTypeSpec; import org.apache.flink.table.expressions.PlannerExpression; import org.apache.flink.table.typeutils.TypeCoercion; +import org.apache.flink.table.validate.ValidationFailure; +import org.apache.flink.table.validate.ValidationResult; import java.util.List; import java.util.stream.Collectors; @@ -84,9 +86,16 @@ final class ResolveCallByArgumentsRule implements ResolverRule { private Expression validateArguments(CallExpression call, PlannerExpression plannerCall) { if (!plannerCall.valid()) { - throw new ValidationException(String.format("Invalid arguments [%s] for call: %s", - call.getChildren(), - call)); + final String errorMessage; + ValidationResult validationResult = plannerCall.validateInput(); + if (validationResult instanceof ValidationFailure) { + errorMessage = ((ValidationFailure) validationResult).message(); + } else { + errorMessage = String.format("Invalid arguments %s for function: %s", + call.getChildren(), + call.getFunctionDefinition().getName()); + } + throw new ValidationException(errorMessage); } return call; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java new file mode 100644 index 000..849d196 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/CalculatedTableFactory.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types
[flink] 03/06: [FLINK-11952][2/4] Introduce basic plugin mechanism for Flink
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 0c4953c4c5ff583d88f1686cd96fd7e7be9d8f11 Author: Stefan Richter AuthorDate: Tue Apr 9 10:37:03 2019 +0200 [FLINK-11952][2/4] Introduce basic plugin mechanism for Flink The mechanism uses child-first classloading and creates classloaders from jars that are discovered from a directory hierarchy. --- .../apache/flink/core/fs/FileSystemFactory.java| 12 +- .../flink/core/fs/UnsupportedSchemeFactory.java| 6 - .../core/fs/local/LocalFileSystemFactory.java | 6 - .../core/plugin/DirectoryBasedPluginFinder.java| 103 .../Plugin.java} | 33 ++--- .../apache/flink/core/plugin/PluginDescriptor.java | 67 ++ .../org/apache/flink/core/plugin/PluginFinder.java | 37 ++ .../org/apache/flink/core/plugin/PluginLoader.java | 94 ++ .../apache/flink/core/plugin/PluginManager.java| 77 .../org/apache/flink/core/plugin/PluginUtils.java | 54 .../TemporaryClassLoaderContext.java} | 33 ++--- .../plugin/DirectoryBasedPluginFinderTest.java | 137 + .../plugin/TemporaryClassLoaderContextTest.java| 46 +++ .../testutils/EntropyInjectingTestFileSystem.java | 5 - .../org/apache/flink/testutils/TestFileSystem.java | 10 +- .../flink/runtime/fs/maprfs/MapRFsFactory.java | 6 - flink-tests/pom.xml| 31 + .../src/test/assembly/test-plugin-a-assembly.xml | 43 +++ .../src/test/assembly/test-plugin-b-assembly.xml | 43 +++ .../org/apache/flink/test/plugin/OtherTestSpi.java | 28 + .../apache/flink/test/plugin/PluginLoaderTest.java | 71 +++ .../flink/test/plugin/PluginManagerTest.java | 105 .../apache/flink/test/plugin/PluginTestBase.java | 54 .../java/org/apache/flink/test/plugin/TestSpi.java | 28 + .../test/plugin/jar/plugina/DynamicClassA.java | 31 ++--- .../test/plugin/jar/plugina/TestServiceA.java | 34 +++-- .../test/plugin/jar/pluginb/OtherTestServiceB.java | 30 ++--- .../test/plugin/jar/pluginb/TestServiceB.java | 30 ++--- .../plugin-a/org.apache.flink.test.plugin.TestSpi | 16 +++ .../org.apache.flink.test.plugin.OtherTestSpi | 16 +++ .../plugin-b/org.apache.flink.test.plugin.TestSpi | 16 +++ 31 files changed, 1130 insertions(+), 172 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java index 8a35471..eecf6f1 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.core.fs; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.plugin.Plugin; import java.io.IOException; import java.net.URI; @@ -31,7 +32,7 @@ import java.net.URI; * creating file systems via {@link #create(URI)}. */ @PublicEvolving -public interface FileSystemFactory { +public interface FileSystemFactory extends Plugin { /** * Gets the scheme of the file system created by this factory. @@ -39,15 +40,6 @@ public interface FileSystemFactory { String getScheme(); /** -* Applies the given configuration to this factory. All future file system -* instantiations via {@link #create(URI)} should take the configuration into -* account. -* -* @param config The configuration to apply. -*/ - void configure(Configuration config); - - /** * Creates a new file system for the given file system URI. * The URI describes the type of file system (via its scheme) and optionally the * authority (for example the host) of the file system. diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java index c2cb2d5..e873e63 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/UnsupportedSchemeFactory.java @@ -19,7 +19,6 @@ package org.apache.flink.core.fs; import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.Configuration; import javax.annotation.Nullable; @@ -54,11 +53,6 @@ class UnsupportedSchemeFactory implements FileSystemFactory { } @Override - public void configure(Configuration config) { - // nothing to do here - } - - @Override public FileSystem create(URI fsUri) throws IOExc
[flink] 01/06: [hotfix] Remove unused exception from FileSystem#initialize
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 5aee179cb6677aad1e1b6f1808873167c4a05789 Author: Stefan Richter AuthorDate: Fri Mar 15 15:02:51 2019 +0100 [hotfix] Remove unused exception from FileSystem#initialize --- .../src/main/java/org/apache/flink/client/cli/CliFrontend.java | 10 ++ .../src/main/java/org/apache/flink/core/fs/FileSystem.java | 2 +- .../apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java | 7 +-- .../apache/flink/runtime/webmonitor/history/HistoryServer.java | 6 +- .../org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 10 ++ .../apache/flink/runtime/taskexecutor/TaskManagerRunner.java | 8 +--- .../apache/flink/table/client/gateway/local/LocalExecutor.java | 7 +-- .../org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 7 +-- 8 files changed, 10 insertions(+), 47 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 6b20e78..c7cbe9b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -67,7 +67,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileNotFoundException; -import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.UndeclaredThrowableException; @@ -124,16 +123,11 @@ public class CliFrontend { public CliFrontend( Configuration configuration, - List> customCommandLines) throws Exception { + List> customCommandLines) { this.configuration = Preconditions.checkNotNull(configuration); this.customCommandLines = Preconditions.checkNotNull(customCommandLines); - try { - FileSystem.initialize(this.configuration); - } catch (IOException e) { - throw new Exception("Error while setting the default " + - "filesystem scheme from configuration.", e); - } + FileSystem.initialize(this.configuration); this.customCommandLineOptions = new Options(); diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index d451109..e7a3765 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -251,7 +251,7 @@ public abstract class FileSystem { * * @param config the configuration from where to fetch the parameter. */ - public static void initialize(Configuration config) throws IOException, IllegalConfigurationException { + public static void initialize(Configuration config) throws IllegalConfigurationException { LOCK.lock(); try { // make sure file systems are re-instantiated after re-configuration diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java index cc1289f..98d2157 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java @@ -40,7 +40,6 @@ import org.apache.commons.cli.PosixParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.util.Map; @@ -86,11 +85,7 @@ public class MesosTaskExecutorRunner { final Map envs = System.getenv(); // configure the filesystems - try { - FileSystem.initialize(configuration); - } catch (IOException e) { - throw new IOException("Error while configuring the filesystems.", e); - } + FileSystem.initialize(configuration); // tell akka to die in case of an error configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index a93fe93..8d5183b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flin
[flink] 04/06: [FLINK-11952][3/4] Integrate plugin mechanism with FileSystem
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c7b141b4b8767c9b7c8c72ef21055fd65908e848 Author: Stefan Richter AuthorDate: Fri Mar 22 14:57:09 2019 +0100 [FLINK-11952][3/4] Integrate plugin mechanism with FileSystem --- .../java/org/apache/flink/core/fs/FileSystem.java | 109 ++--- 1 file changed, 76 insertions(+), 33 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index e7a3765..d159e70 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -31,6 +31,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.core.fs.local.LocalFileSystemFactory; +import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; @@ -44,12 +45,14 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.ServiceLoader; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -217,11 +220,8 @@ public abstract class FileSystem { /** Cache for file systems, by scheme + authority. */ private static final HashMap CACHE = new HashMap<>(); - /** All available file system factories. */ - private static final List RAW_FACTORIES = loadFileSystems(); - /** Mapping of file system schemes to the corresponding factories, -* populated in {@link FileSystem#initialize(Configuration)}. */ +* populated in {@link FileSystem#initialize(Configuration, PluginManager)}. */ private static final HashMap FS_FACTORIES = new HashMap<>(); /** The default factory that is used when no scheme matches. */ @@ -249,17 +249,57 @@ public abstract class FileSystem { * A file path of {@code '/user/USERNAME/in.txt'} is interpreted as * {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}. * +* @deprecated use {@link #initialize(Configuration, PluginManager)} instead. +* * @param config the configuration from where to fetch the parameter. */ + @Deprecated public static void initialize(Configuration config) throws IllegalConfigurationException { + initializeWithoutPlugins(config); + } + + private static void initializeWithoutPlugins(Configuration config) throws IllegalConfigurationException { + initialize(config, null); + } + + /** +* Initializes the shared file system settings. +* +* The given configuration is passed to each file system factory to initialize the respective +* file systems. Because the configuration of file systems may be different subsequent to the call +* of this method, this method clears the file system instance cache. +* +* This method also reads the default file system URI from the configuration key +* {@link CoreOptions#DEFAULT_FILESYSTEM_SCHEME}. All calls to {@link FileSystem#get(URI)} where +* the URI has no scheme will be interpreted as relative to that URI. +* As an example, assume the default file system URI is set to {@code 'hdfs://localhost:9000/'}. +* A file path of {@code '/user/USERNAME/in.txt'} is interpreted as +* {@code 'hdfs://localhost:9000/user/USERNAME/in.txt'}. +* +* @param config the configuration from where to fetch the parameter. +* @param pluginManager optional plugin manager that is used to initialized filesystems provided as plugins. +*/ + public static void initialize( + Configuration config, + PluginManager pluginManager) throws IllegalConfigurationException { + LOCK.lock(); try { // make sure file systems are re-instantiated after re-configuration CACHE.clear(); FS_FACTORIES.clear(); + Collection>> factorySuppliers = new ArrayList<>(2); + factorySuppliers.add(() -> ServiceLoader.load(FileSystemFactory.class).iterator()); + + if (pluginManager != null) { + factorySuppliers.add(() -> pluginManager.load(FileSystemFactory.class)); + } + +
[flink] branch master updated (2c503c2 -> 703dc8d)
This is an automated email from the ASF dual-hosted git repository. srichter pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 2c503c2 [FLINK-11884][table] Ported sort & limit validation on top of Expressions new 5aee179 [hotfix] Remove unused exception from FileSystem#initialize new 1a10fbe [FLINK-11952][1/3] Make ChildFirstClassLoader a top-level class in flink-core new 0c4953c [FLINK-11952][2/4] Introduce basic plugin mechanism for Flink new c7b141b [FLINK-11952][3/4] Integrate plugin mechanism with FileSystem new c6307b5 [FLINK-11952][4/4] Integrate plugin mechanism with FileSystem initialization in process entry points new 703dc8d [hotfix] Use TemporaryClassLoaderContext in other appropriate places in the codebase The 6 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../org/apache/flink/client/cli/CliFrontend.java | 13 +- .../java/org/apache/flink/core/fs/FileSystem.java | 111 - .../apache/flink/core/fs/FileSystemFactory.java| 12 +- .../flink/core/fs/UnsupportedSchemeFactory.java| 6 - .../core/fs/local/LocalFileSystemFactory.java | 6 - .../core/plugin/DirectoryBasedPluginFinder.java| 103 .../Plugin.java} | 33 ++--- .../apache/flink/core/plugin/PluginDescriptor.java | 67 ++ .../org/apache/flink/core/plugin/PluginFinder.java | 37 ++ .../org/apache/flink/core/plugin/PluginLoader.java | 94 ++ .../apache/flink/core/plugin/PluginManager.java| 77 .../org/apache/flink/core/plugin/PluginUtils.java | 54 .../TemporaryClassLoaderContext.java} | 33 ++--- .../apache/flink/util/ChildFirstClassLoader.java | 123 ++ .../java/org/apache/flink/util/LambdaUtil.java | 19 +-- .../plugin/DirectoryBasedPluginFinderTest.java | 137 + .../plugin/TemporaryClassLoaderContextTest.java| 46 +++ .../testutils/EntropyInjectingTestFileSystem.java | 5 - .../org/apache/flink/testutils/TestFileSystem.java | 10 +- .../flink/runtime/fs/maprfs/MapRFsFactory.java | 6 - .../mesos/entrypoint/MesosTaskExecutorRunner.java | 10 +- .../runtime/webmonitor/history/HistoryServer.java | 9 +- .../runtime/entrypoint/ClusterEntrypoint.java | 14 +-- .../librarycache/FlinkUserCodeClassLoaders.java| 103 +--- .../runtime/taskexecutor/TaskManagerRunner.java| 11 +- .../flink/runtime/taskmanager/TaskManager.scala| 0 .../client/gateway/local/ExecutionContext.java | 7 +- .../table/client/gateway/local/LocalExecutor.java | 10 +- flink-tests/pom.xml| 31 + .../src/test/assembly/test-plugin-a-assembly.xml | 43 +++ .../src/test/assembly/test-plugin-b-assembly.xml | 43 +++ .../org/apache/flink/test/plugin/OtherTestSpi.java | 28 + .../apache/flink/test/plugin/PluginLoaderTest.java | 71 +++ .../flink/test/plugin/PluginManagerTest.java | 105 .../apache/flink/test/plugin/PluginTestBase.java | 54 .../java/org/apache/flink/test/plugin/TestSpi.java | 28 + .../test/plugin/jar/plugina/DynamicClassA.java | 31 ++--- .../test/plugin/jar/plugina/TestServiceA.java | 34 +++-- .../test/plugin/jar/pluginb/OtherTestServiceB.java | 30 ++--- .../test/plugin/jar/pluginb/TestServiceB.java | 30 ++--- .../plugin-a/org.apache.flink.test.plugin.TestSpi | 16 +++ .../org.apache.flink.test.plugin.OtherTestSpi | 16 +++ .../plugin-b/org.apache.flink.test.plugin.TestSpi | 16 +++ .../flink/yarn/AbstractYarnClusterDescriptor.java | 10 +- .../apache/flink/yarn/YarnTaskExecutorRunner.java | 6 +- 45 files changed, 1373 insertions(+), 375 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginFinder.java copy flink-core/src/main/java/org/apache/flink/core/{fs/local/LocalFileSystemFactory.java => plugin/Plugin.java} (57%) create mode 100644 flink-core/src/main/java/org/apache/flink/core/plugin/PluginDescriptor.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/plugin/PluginFinder.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/plugin/PluginManager.java create mode 100644 flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java copy flink-core/src/main/java/org/apache/flink/core/{fs/local/LocalFileSystemFactory.java => plugin/TemporaryClassLoaderContext.java} (51%) create mode 100644 flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.j
[flink] 05/06: [FLINK-11952][4/4] Integrate plugin mechanism with FileSystem initialization in process entry points
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c6307b59902c21744ceb57cb34ee04d0b660cb5d Author: Stefan Richter AuthorDate: Tue Apr 9 10:38:01 2019 +0200 [FLINK-11952][4/4] Integrate plugin mechanism with FileSystem initialization in process entry points This integration currently still does not provide a proper plugin root folder because this requires more changes, in particular FLINK-11952. This closes #8038. --- .../src/main/java/org/apache/flink/client/cli/CliFrontend.java | 5 - .../org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java | 5 - .../org/apache/flink/runtime/webmonitor/history/HistoryServer.java | 5 - .../java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java | 6 +- .../org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java| 5 - .../scala/org/apache/flink/runtime/taskmanager/TaskManager.scala| 0 .../org/apache/flink/table/client/gateway/local/LocalExecutor.java | 5 - .../java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java | 5 - .../src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java | 6 +- 9 files changed, 34 insertions(+), 8 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index c7cbe9b..4c4da69 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -39,6 +39,7 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.DefaultCostEstimator; @@ -80,6 +81,7 @@ import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -127,7 +129,8 @@ public class CliFrontend { this.configuration = Preconditions.checkNotNull(configuration); this.customCommandLines = Preconditions.checkNotNull(customCommandLines); - FileSystem.initialize(this.configuration); + //TODO provide plugin path. + FileSystem.initialize(this.configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty())); this.customCommandLineOptions = new Options(); diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java index 98d2157..e2d0161 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java @@ -21,6 +21,7 @@ package org.apache.flink.mesos.entrypoint; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginUtils; import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory; import java.lang.reflect.UndeclaredThrowableException; import java.util.Map; +import java.util.Optional; /** * The entry point for running a TaskManager in a Mesos container. @@ -85,7 +87,8 @@ public class MesosTaskExecutorRunner { final Map envs = System.getenv(); // configure the filesystems - FileSystem.initialize(configuration); + //TODO provide plugin path. + FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(Optional.empty())); // tell akka to die in case of an error configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 8d5183b..0bb98e8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@
[flink] 02/06: [FLINK-11952][1/3] Make ChildFirstClassLoader a top-level class in flink-core
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1a10fbef644ad32a3358711bfa5a167118186482 Author: Stefan Richter AuthorDate: Thu Mar 14 14:50:18 2019 +0100 [FLINK-11952][1/3] Make ChildFirstClassLoader a top-level class in flink-core --- .../apache/flink/util/ChildFirstClassLoader.java | 123 + .../librarycache/FlinkUserCodeClassLoaders.java| 103 + 2 files changed, 125 insertions(+), 101 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java b/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java new file mode 100644 index 000..3942b1b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/ChildFirstClassLoader.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.List; + +/** + * A variant of the URLClassLoader that first loads from the URLs and only after that from the parent. + * + * {@link #getResourceAsStream(String)} uses {@link #getResource(String)} internally so we + * don't override that. + */ +public final class ChildFirstClassLoader extends URLClassLoader { + + /** +* The classes that should always go through the parent ClassLoader. This is relevant +* for Flink classes, for example, to avoid loading Flink classes that cross the +* user-code/system-code barrier in the user-code ClassLoader. +*/ + private final String[] alwaysParentFirstPatterns; + + public ChildFirstClassLoader(URL[] urls, ClassLoader parent, String[] alwaysParentFirstPatterns) { + super(urls, parent); + this.alwaysParentFirstPatterns = alwaysParentFirstPatterns; + } + + @Override + protected synchronized Class loadClass( + String name, boolean resolve) throws ClassNotFoundException { + + // First, check if the class has already been loaded + Class c = findLoadedClass(name); + + if (c == null) { + // check whether the class should go parent-first + for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) { + if (name.startsWith(alwaysParentFirstPattern)) { + return super.loadClass(name, resolve); + } + } + + try { + // check the URLs + c = findClass(name); + } catch (ClassNotFoundException e) { + // let URLClassLoader do it, which will eventually call the parent + c = super.loadClass(name, resolve); + } + } + + if (resolve) { + resolveClass(c); + } + + return c; + } + + @Override + public URL getResource(String name) { + // first, try and find it via the URLClassloader + URL urlClassLoaderResource = findResource(name); + + if (urlClassLoaderResource != null) { + return urlClassLoaderResource; + } + + // delegate to super + return super.getResource(name); + } + + @Override + public Enumeration getResources(String name) throws IOException { + // first get resources from URLClassloader + Enumeration urlClassLoaderResources = findResources(name); + + final List result = new ArrayList<>(); + + while (urlClassLoaderResources.hasMoreElements()) { + result.add(urlClassLoaderResources.nextElement()); + } + + // get parent urls + Enumerati
[flink] 06/06: [hotfix] Use TemporaryClassLoaderContext in other appropriate places in the codebase
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 703dc8dab1ee35b2532b472eec40fdc7dd124cc3 Author: Stefan Richter AuthorDate: Mon Apr 8 17:38:26 2019 +0200 [hotfix] Use TemporaryClassLoaderContext in other appropriate places in the codebase --- .../main/java/org/apache/flink/util/LambdaUtil.java | 19 +++ .../table/client/gateway/local/ExecutionContext.java | 7 ++- 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java index fdb6943..163b583 100644 --- a/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/LambdaUtil.java @@ -18,6 +18,7 @@ package org.apache.flink.util; +import org.apache.flink.core.plugin.TemporaryClassLoaderContext; import org.apache.flink.util.function.SupplierWithException; import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; @@ -76,16 +77,9 @@ public final class LambdaUtil { final ClassLoader cl, final ThrowingRunnable r) throws E { - final Thread currentThread = Thread.currentThread(); - final ClassLoader oldClassLoader = currentThread.getContextClassLoader(); - - try { - currentThread.setContextClassLoader(cl); + try (TemporaryClassLoaderContext tmpCl = new TemporaryClassLoaderContext(cl)) { r.run(); } - finally { - currentThread.setContextClassLoader(oldClassLoader); - } } /** @@ -99,15 +93,8 @@ public final class LambdaUtil { final ClassLoader cl, final SupplierWithException s) throws E { - final Thread currentThread = Thread.currentThread(); - final ClassLoader oldClassLoader = currentThread.getContextClassLoader(); - - try { - currentThread.setContextClassLoader(cl); + try (TemporaryClassLoaderContext tmpCl = new TemporaryClassLoaderContext(cl)) { return s.get(); } - finally { - currentThread.setContextClassLoader(oldClassLoader); - } } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index dab6bbe..1649c23 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -29,6 +29,7 @@ import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.plugin.TemporaryClassLoaderContext; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.DefaultCostEstimator; @@ -185,12 +186,8 @@ public class ExecutionContext { * Executes the given supplier using the execution context's classloader as thread classloader. */ public R wrapClassLoader(Supplier supplier) { - final ClassLoader previousClassloader = Thread.currentThread().getContextClassLoader(); - Thread.currentThread().setContextClassLoader(classLoader); - try { + try (TemporaryClassLoaderContext tmpCl = new TemporaryClassLoaderContext(classLoader)){ return supplier.get(); - } finally { - Thread.currentThread().setContextClassLoader(previousClassloader); } }
[flink] branch master updated: [FLINK-11884][table] Ported sort & limit validation on top of Expressions
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 2c503c2 [FLINK-11884][table] Ported sort & limit validation on top of Expressions 2c503c2 is described below commit 2c503c2488f786ae684fa956b23d4e2f8e1797de Author: Dawid Wysakowicz AuthorDate: Mon Apr 1 15:17:25 2019 +0200 [FLINK-11884][table] Ported sort & limit validation on top of Expressions --- .../expressions/BuiltInFunctionDefinitions.java| 2 + .../table/operations/SortOperationFactory.java | 118 + .../flink/table/operations/SortOperationUtils.java | 71 - .../table/operations/OperationTreeBuilder.scala| 12 +-- .../flink/table/plan/logical/operators.scala | 23 +--- 5 files changed, 128 insertions(+), 98 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java index db6fcd3..801fad2 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/BuiltInFunctionDefinitions.java @@ -333,6 +333,8 @@ public final class BuiltInFunctionDefinitions { WINDOW_START, WINDOW_END, PROCTIME, ROWTIME )); + public static final List ORDERING = Arrays.asList(ORDER_ASC, ORDER_DESC); + public static List getDefinitions() { final Field[] fields = BuiltInFunctionDefinitions.class.getFields(); final List list = new ArrayList<>(fields.length); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java new file mode 100644 index 000..8540f7b --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/SortOperationFactory.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionBridge; +import org.apache.flink.table.expressions.PlannerExpression; +import org.apache.flink.table.plan.logical.Limit; +import org.apache.flink.table.plan.logical.LogicalNode; +import org.apache.flink.table.plan.logical.Sort; + +import java.util.List; +import java.util.stream.Collectors; + +import static java.util.Collections.singletonList; +import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDERING; +import static org.apache.flink.table.expressions.BuiltInFunctionDefinitions.ORDER_ASC; + +/** + * Utility class for creating a valid {@link Sort} operation. + */ +@Internal +public class SortOperationFactory { + + private final boolean isStreaming; + private final ExpressionBridge expressionBridge; + private final OrderWrapper orderWrapper = new OrderWrapper(); + + public SortOperationFactory( + ExpressionBridge expressionBridge, + boolean isStreaming) { + this.expressionBridge = expressionBridge; + this.isStreaming = isStreaming; + } + + /** +* Creates a valid {@link Sort} operation. +* +* NOTE: if the collation is not explicitly specified for any expression, it is wrapped in a +* default ascending order +* +* @param orders expressions describing order, +* @param child relational expression on top of which to apply the sort operation +* @return valid so
[flink] branch master updated: [FLINK-12154][network] Remove legacy fields for SingleInputGate (#8136)
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c7ef6db [FLINK-12154][network] Remove legacy fields for SingleInputGate (#8136) c7ef6db is described below commit c7ef6db71a807cf21cc1ef3a13f4d95520dfc0fe Author: zhijiang AuthorDate: Mon Apr 15 19:01:08 2019 +0800 [FLINK-12154][network] Remove legacy fields for SingleInputGate (#8136) This work is a preparation for FLINK-11726. In SingleInputGate#create, we could remove unused parameter ExecutionAttemptID. And for the constructor of SingleInputGate, we could remove unused parameter TaskIOMetricGroup. Then we introduce createSingleInputGate for reusing the process of creating SingleInputGate in related tests. --- .../partition/consumer/SingleInputGate.java| 5 +-- .../org/apache/flink/runtime/taskmanager/Task.java | 1 - .../runtime/io/network/NetworkEnvironmentTest.java | 18 ++-- ...editBasedPartitionRequestClientHandlerTest.java | 10 ++--- .../netty/PartitionRequestClientHandlerTest.java | 25 +-- .../network/netty/PartitionRequestClientTest.java | 6 +-- .../network/partition/InputChannelTestUtils.java | 24 ++ .../network/partition/InputGateConcurrentTest.java | 33 ++ .../network/partition/InputGateFairnessTest.java | 52 +++--- .../partition/consumer/LocalInputChannelTest.java | 30 + .../partition/consumer/RemoteInputChannelTest.java | 36 +-- .../partition/consumer/SingleInputGateTest.java| 16 ++- .../partition/consumer/TestSingleInputGate.java| 21 + .../partition/consumer/UnionInputGateTest.java | 24 ++ .../StreamNetworkBenchmarkEnvironment.java | 1 - 15 files changed, 87 insertions(+), 215 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index e7822fe..9c0196e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionLocation; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.event.TaskEvent; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.TaskEventPublisher; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -193,7 +192,6 @@ public class SingleInputGate implements InputGate { int consumedSubpartitionIndex, int numberOfInputChannels, TaskActions taskActions, - TaskIOMetricGroup metrics, boolean isCreditBased) { this.owningTaskName = checkNotNull(owningTaskName); @@ -664,7 +662,6 @@ public class SingleInputGate implements InputGate { public static SingleInputGate create( String owningTaskName, JobID jobId, - ExecutionAttemptID executionId, InputGateDeploymentDescriptor igdd, NetworkEnvironment networkEnvironment, TaskEventPublisher taskEventPublisher, @@ -683,7 +680,7 @@ public class SingleInputGate implements InputGate { final SingleInputGate inputGate = new SingleInputGate( owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex, - icdd.length, taskActions, metrics, networkConfig.isCreditBased()); + icdd.length, taskActions, networkConfig.isCreditBased()); // Create the input channels. There is one input channel for each consumed partition. final InputChannel[] inputChannels = new InputChannel[icdd.length]; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index b3d952d..5e7174c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -404,7 +404,6 @@ public class Task implements Runnable, TaskActions, CheckpointListener { SingleInputGate gate = SingleInputGate.create( taskNameWithSubtaskAndId, jobId, -
[flink] branch master updated: [FLINK-11884][table] Ported alias validation on top of Expressions
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 807bce2 [FLINK-11884][table] Ported alias validation on top of Expressions 807bce2 is described below commit 807bce2ff7be7f2c3ca09aab9a01363b8eb72a3b Author: Dawid Wysakowicz AuthorDate: Mon Apr 1 15:06:44 2019 +0200 [FLINK-11884][table] Ported alias validation on top of Expressions --- .../table/operations/AliasOperationUtils.java | 110 + .../table/operations/OperationTreeBuilder.scala| 6 +- .../flink/table/plan/logical/operators.scala | 30 +- 3 files changed, 114 insertions(+), 32 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java new file mode 100644 index 000..ba8e87a --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/operations/AliasOperationUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.expressions.ApiExpressionDefaultVisitor; +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.BuiltInFunctionDefinitions; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.UnresolvedReferenceExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Utility class for creating valid alias expressions that can be later used as a projection. + */ +@Internal +public final class AliasOperationUtils { + + private static final AliasLiteralValidator aliasLiteralValidator = new AliasLiteralValidator(); + private static final String ALL_REFERENCE = "*"; + + /** +* Creates a list of valid alias expressions. Resulting expression might still contain +* {@link UnresolvedReferenceExpression}. +* +* @param aliases aliases to validate +* @param child relational operation on top of which to apply the aliases +* @return validated list of aliases +*/ + public static List createAliasList(List aliases, TableOperation child) { + TableSchema childSchema = child.getTableSchema(); + + if (aliases.size() > childSchema.getFieldCount()) { + throw new ValidationException("Aliasing more fields than we actually have."); + } + + List fieldAliases = aliases.stream() + .map(f -> f.accept(aliasLiteralValidator)) + .collect(Collectors.toList()); + + String[] childNames = childSchema.getFieldNames(); + return IntStream.range(0, childNames.length) + .mapToObj(idx -> { + UnresolvedReferenceExpression oldField = new UnresolvedReferenceExpression(childNames[idx]); + if (idx < fieldAliases.size()) { + ValueLiteralExpression alias = fieldAliases.get(idx); + return new CallExpression(BuiltInFunctionDefinitions.AS, Arrays.asList(oldField, alias)); + } else { + return oldField; + } + }).collect(Collectors.toList()); + } + + private static class AliasLiteralValidator extends ApiExpressionDefaultVisitor { + + @Override +
[flink] branch master updated: [FLINK-12168][table-planner-blink] Support e2e limit, sortLimit, rank, union in blink batch (#8156)
This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 068a0ed [FLINK-12168][table-planner-blink] Support e2e limit, sortLimit, rank, union in blink batch (#8156) 068a0ed is described below commit 068a0ede2463db9c92e4dc097a26796bd97fbffa Author: Jingsong Lee AuthorDate: Mon Apr 15 16:56:34 2019 +0800 [FLINK-12168][table-planner-blink] Support e2e limit, sortLimit, rank, union in blink batch (#8156) --- .../codegen/agg/batch/HashAggCodeGenHelper.scala | 3 +- .../codegen/sort/ComparatorCodeGenerator.scala | 84 .../codegen/{ => sort}/SortCodeGenerator.scala | 33 + .../plan/nodes/physical/batch/BatchExecLimit.scala | 42 +- .../plan/nodes/physical/batch/BatchExecRank.scala | 71 ++- .../plan/nodes/physical/batch/BatchExecSort.scala | 2 +- .../nodes/physical/batch/BatchExecSortLimit.scala | 58 - .../physical/batch/BatchExecSortMergeJoin.scala| 3 +- .../plan/nodes/physical/batch/BatchExecUnion.scala | 21 ++- .../flink/table/codegen/SortCodeGeneratorTest.java | 1 + .../apache/flink/table/util/BaseRowTestUtil.java | 5 + .../flink/table/runtime/batch/sql/CalcITCase.scala | 3 - .../table/runtime/batch/sql/CorrelateITCase.scala | 1 - .../table/runtime/batch/sql/CorrelateITCase2.scala | 4 +- .../table/runtime/batch/sql/DecimalITCase.scala| 53 ++-- .../table/runtime/batch/sql/LimitITCase.scala | 120 + .../flink/table/runtime/batch/sql/RankITCase.scala | 142 + .../table/runtime/batch/sql/SortLimitITCase.scala | 129 +++ .../table/runtime/batch/sql/UnionITCase.scala | 141 .../batch/sql/agg/AggregateITCaseBase.scala| 1 - .../table/runtime/batch/sql/join/JoinITCase.scala | 1 - .../runtime/batch/sql/join/OuterJoinITCase.scala | 5 +- .../flink/table/runtime/utils/BatchTestBase.scala | 26 +++- .../flink/table/runtime/sort/LimitOperator.java| 63 + .../flink/table/runtime/sort/RankOperator.java | 112 .../table/runtime/sort/SortLimitOperator.java | 110 26 files changed, 1167 insertions(+), 67 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala index 64aeb81..f968c06 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/HashAggCodeGenHelper.scala @@ -25,7 +25,8 @@ import org.apache.flink.table.`type`.{InternalType, RowType} import org.apache.flink.table.api.TableConfig import org.apache.flink.table.codegen.CodeGenUtils.{binaryRowFieldSetAccess, binaryRowSetNull} import org.apache.flink.table.codegen.agg.batch.AggCodeGenHelper.buildAggregateArgsMapping -import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, ExprCodeGenerator, GenerateUtils, GeneratedExpression, OperatorCodeGenerator, SortCodeGenerator} +import org.apache.flink.table.codegen.sort.SortCodeGenerator +import org.apache.flink.table.codegen.{CodeGenUtils, CodeGeneratorContext, ExprCodeGenerator, GenerateUtils, GeneratedExpression, OperatorCodeGenerator} import org.apache.flink.table.dataformat.{BaseRow, BinaryRow, GenericRow, JoinedRow} import org.apache.flink.table.expressions.{CallExpression, Expression, ExpressionVisitor, FieldReferenceExpression, ResolvedAggInputReference, RexNodeConverter, SymbolExpression, TypeLiteralExpression, UnresolvedReferenceExpression, ValueLiteralExpression} import org.apache.flink.table.functions.aggfunctions.DeclarativeAggregateFunction diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/ComparatorCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/ComparatorCodeGenerator.scala new file mode 100644 index 000..02c3778 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/ComparatorCodeGenerator.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.or