[FLINK-1882] Removed RemotedCollector classes This closes #985
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/100e8c5f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/100e8c5f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/100e8c5f Branch: refs/heads/master Commit: 100e8c5ff9f6d25b3d5db326a5f31b9c4432e334 Parents: 5546a1e Author: zentol <s.mo...@web.de> Authored: Sat Jul 25 15:17:55 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Aug 5 14:46:35 2015 +0200 ---------------------------------------------------------------------- .../RemoteCollectorOutputFormatExample.java | 114 ---------- .../flink/api/java/io/RemoteCollector.java | 46 ---- .../api/java/io/RemoteCollectorConsumer.java | 26 --- .../flink/api/java/io/RemoteCollectorImpl.java | 228 ------------------- .../java/io/RemoteCollectorOutputFormat.java | 175 -------------- 5 files changed, 589 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java deleted file mode 100644 index f524718..0000000 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.examples.java.misc; - -import java.util.HashSet; -import java.util.Set; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.io.RemoteCollectorConsumer; -import org.apache.flink.api.java.io.RemoteCollectorImpl; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; - -/** - * Implements the "WordCount" program that computes a simple word occurrence - * histogram over some sample data and collects the results with an - * implementation of a {@link RemoteCollectorConsumer}. - */ -@SuppressWarnings("serial") -public class RemoteCollectorOutputFormatExample { - - public static void main(String[] args) throws Exception { - - /** - * We create a remote {@link ExecutionEnvironment} here, because this - * OutputFormat is designed for use in a distributed setting. For local - * use you should consider using the {@link LocalCollectionOutputFormat - * <T>}. - */ - final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("<remote>", 6124, - "/path/to/your/file.jar"); - - // get input data - DataSet<String> text = env.fromElements( - "To be, or not to be,--that is the question:--", - "Whether 'tis nobler in the mind to suffer", - "The slings and arrows of outrageous fortune", - "Or to take arms against a sea of troubles,"); - - DataSet<Tuple2<String, Integer>> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new LineSplitter()) - // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0).aggregate(Aggregations.SUM, 1); - - // emit result - RemoteCollectorImpl.collectLocal(counts, - new RemoteCollectorConsumer<Tuple2<String, Integer>>() { - // user defined IRemoteCollectorConsumer - @Override - public void collect(Tuple2<String, Integer> element) { - System.out.println("word/occurrences:" + element); - } - }); - - // local collection to store results in - Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>(); - // collect results from remote in local collection - RemoteCollectorImpl.collectLocal(counts, collection); - - // execute program - env.execute("WordCount Example with RemoteCollectorOutputFormat"); - - System.out.println(collection); - - RemoteCollectorImpl.shutdownAll(); - } - - // - // User Functions - // - - /** - * Implements the string tokenizer that splits sentences into words as a - * user-defined FlatMapFunction. The function takes a line (String) and - * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String, - * Integer>). - */ - public static final class LineSplitter implements - FlatMapFunction<String, Tuple2<String, Integer>> { - - @Override - public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { - // normalize and split the line - String[] tokens = value.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2<String, Integer>(token, 1)); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java deleted file mode 100644 index bcfc332..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import org.apache.flink.api.java.DataSet; - -import java.rmi.Remote; -import java.rmi.RemoteException; - -/** - * This interface is the counterpart to the {@link RemoteCollectorOutputFormat} - * and implementations will receive remote results through the collect function. - * - * @param <T> - * The type of the records the collector will receive - * - * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator} - * and the {@link DataSet#collect()} method respectively. - */ -@Deprecated -public interface RemoteCollector<T> extends Remote { - - public void collect(T element) throws RemoteException; - - public RemoteCollectorConsumer<T> getConsumer() throws RemoteException; - - public void setConsumer(RemoteCollectorConsumer<T> consumer) - throws RemoteException; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java deleted file mode 100644 index 439c6af..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -/** - * This interface describes consumers of {@link RemoteCollector} implementations. - */ -public interface RemoteCollectorConsumer<T> { - public void collect(T element); -} http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java deleted file mode 100644 index 2d080ab..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import java.net.Inet4Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.ServerSocket; -import java.rmi.AccessException; -import java.rmi.AlreadyBoundException; -import java.rmi.NotBoundException; -import java.rmi.Remote; -import java.rmi.RemoteException; -import java.rmi.registry.LocateRegistry; -import java.rmi.registry.Registry; -import java.rmi.server.UnicastRemoteObject; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Enumeration; -import java.util.List; -import java.util.UUID; - -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.operators.DataSink; - -/** - * This class provides a counterpart implementation for the - * {@link RemoteCollectorOutputFormat}. - * - * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator} - * and the {@link DataSet#collect()} method respectively. - */ - -@Deprecated -public class RemoteCollectorImpl<T> extends UnicastRemoteObject implements - RemoteCollector<T> { - - private static final long serialVersionUID = 1L; - - /** - * Instance of an implementation of a {@link RemoteCollectorConsumer}. This - * instance will get the records passed. - */ - - private RemoteCollectorConsumer<T> consumer; - - /** - * This list stores all created {@link Registry}s to unbind and unexport all - * exposed {@link Remote} objects ({@link RemoteCollectorConsumer} in our - * case) in the shutdown phase. - */ - private static List<Registry> registries = new ArrayList<Registry>(); - - /** - * This factory method creates an instance of the - * {@link RemoteCollectorImpl} and binds it in the local RMI - * {@link Registry}. - * - * @param port - * The port where the local colector is listening. - * @param consumer - * The consumer instance. - * @param rmiId - * An ID to register the collector in the RMI registry. - */ - public static <T> void createAndBind(Integer port, RemoteCollectorConsumer<T> consumer, String rmiId) { - RemoteCollectorImpl<T> collectorInstance = null; - - try { - collectorInstance = new RemoteCollectorImpl<T>(); - - Registry registry; - - registry = LocateRegistry.createRegistry(port); - registry.bind(rmiId, collectorInstance); - - registries.add(registry); - } catch (RemoteException e) { - e.printStackTrace(); - } catch (AlreadyBoundException e) { - e.printStackTrace(); - } - - collectorInstance.setConsumer(consumer); - } - - /** - * Writes a DataSet to a {@link RemoteCollectorConsumer} through an - * {@link RemoteCollector} remotely called from the - * {@link RemoteCollectorOutputFormat}.<br/> - * - * @return The DataSink that writes the DataSet. - */ - public static <T> DataSink<T> collectLocal(DataSet<T> source, - RemoteCollectorConsumer<T> consumer) { - // if the RMI parameter was not set by the user make a "good guess" - String ip = System.getProperty("java.rmi.server.hostname"); - if (ip == null) { - Enumeration<NetworkInterface> networkInterfaces = null; - try { - networkInterfaces = NetworkInterface.getNetworkInterfaces(); - } catch (Throwable t) { - throw new RuntimeException(t); - } - while (networkInterfaces.hasMoreElements()) { - NetworkInterface networkInterface = (NetworkInterface) networkInterfaces - .nextElement(); - Enumeration<InetAddress> inetAddresses = networkInterface - .getInetAddresses(); - while (inetAddresses.hasMoreElements()) { - InetAddress inetAddress = (InetAddress) inetAddresses - .nextElement(); - if (!inetAddress.isLoopbackAddress() - && inetAddress instanceof Inet4Address) { - ip = inetAddress.getHostAddress(); - System.setProperty("java.rmi.server.hostname", ip); - } - } - } - } - - // get some random free port - Integer randomPort = 0; - try { - ServerSocket tmp = new ServerSocket(0); - randomPort = tmp.getLocalPort(); - tmp.close(); - } catch (Throwable t) { - throw new RuntimeException(t); - } - - // create an ID for this output format instance - String rmiId = String.format("%s-%s", RemoteCollectorOutputFormat.class.getName(), UUID.randomUUID()); - - // create the local listening object and bind it to the RMI registry - RemoteCollectorImpl.createAndBind(randomPort, consumer, rmiId); - - // create and configure the output format - OutputFormat<T> remoteCollectorOutputFormat = new RemoteCollectorOutputFormat<T>(ip, randomPort, rmiId); - - // create sink - return source.output(remoteCollectorOutputFormat); - } - - /** - * Writes a DataSet to a local {@link Collection} through an - * {@link RemoteCollector} and a standard {@link RemoteCollectorConsumer} - * implementation remotely called from the - * {@link RemoteCollectorOutputFormat}.<br/> - * - * @param source the source data set - * @param collection the local collection - */ - public static <T> void collectLocal(DataSet<T> source, - Collection<T> collection) { - final Collection<T> synchronizedCollection = Collections - .synchronizedCollection(collection); - collectLocal(source, new RemoteCollectorConsumer<T>() { - @Override - public void collect(T element) { - synchronizedCollection.add(element); - } - }); - } - - /** - * Necessary private default constructor. - * - * @throws RemoteException - */ - private RemoteCollectorImpl() throws RemoteException { - super(); - } - - /** - * This method is called by the remote to collect records. - */ - @Override - public void collect(T element) throws RemoteException { - this.consumer.collect(element); - } - - @Override - public RemoteCollectorConsumer<T> getConsumer() { - return this.consumer; - } - - @Override - public void setConsumer(RemoteCollectorConsumer<T> consumer) { - this.consumer = consumer; - } - - /** - * This method unbinds and unexports all exposed {@link Remote} objects - * - * @throws AccessException - * @throws RemoteException - * @throws NotBoundException - */ - public static void shutdownAll() throws AccessException, RemoteException, NotBoundException { - for (Registry registry : registries) { - for (String id : registry.list()) { - Remote remote = registry.lookup(id); - registry.unbind(id); - UnicastRemoteObject.unexportObject(remote, true); - } - - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java deleted file mode 100644 index 3fe5cef..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.io; - -import java.io.IOException; -import java.rmi.AccessException; -import java.rmi.NotBoundException; -import java.rmi.RemoteException; -import java.rmi.registry.LocateRegistry; -import java.rmi.registry.Registry; - -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.configuration.Configuration; - -/** - * An output format that sends results through JAVA RMI to an - * {@link RemoteCollector} implementation. The client has to provide an - * implementation of {@link RemoteCollector} and has to write it's plan's output - * into an instance of {@link RemoteCollectorOutputFormat}. Further in the - * client's VM parameters -Djava.rmi.server.hostname should be set to the own IP - * address. - * - * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator} - * and the {@link DataSet#collect()} method respectively. - */ -@Deprecated -public class RemoteCollectorOutputFormat<T> implements OutputFormat<T> { - - private static final long serialVersionUID = 1922744224032398102L; - - /** - * The reference of the {@link RemoteCollector} object - */ - private transient RemoteCollector<T> remoteCollector; - - transient private Registry registry; - - /** - * Config parameter for the remote's port number - */ - public static final String PORT = "port"; - /** - * Config parameter for the remote's address - */ - public static final String REMOTE = "remote"; - /** - * An id used necessary for Java RMI - */ - public static final String RMI_ID = "rmiId"; - - private String remote; - - private int port; - - private String rmiId; - - /** - * Create a new {@link RemoteCollectorOutputFormat} instance. The remote and - * port for this output are by default localhost:8888 but can be configured - * via a {@link Configuration} object. - * - * @see RemoteCollectorOutputFormat#REMOTE - * @see RemoteCollectorOutputFormat#PORT - */ - public RemoteCollectorOutputFormat() { - this("localhost", 8888, null); - } - - /** - * Creates a new {@link RemoteCollectorOutputFormat} instance for the - * specified remote and port. - * - * @param rmiId - */ - public RemoteCollectorOutputFormat(String remote, int port, String rmiId) { - super(); - this.remote = remote; - this.port = port; - this.rmiId = rmiId; - - if (this.remote == null) { - throw new IllegalStateException(String.format( - "No remote configured for %s.", this)); - } - - if (this.rmiId == null) { - throw new IllegalStateException(String.format( - "No registry ID configured for %s.", this)); - } - } - - @Override - /** - * This method receives the Configuration object, where the fields "remote" and "port" must be set. - */ - public void configure(Configuration parameters) { - this.remote = parameters.getString(REMOTE, this.remote); - this.port = parameters.getInteger(PORT, this.port); - this.rmiId = parameters.getString(RMI_ID, this.rmiId); - - if (this.remote == null) { - throw new IllegalStateException(String.format( - "No remote configured for %s.", this)); - } - - if (this.rmiId == null) { - throw new IllegalStateException(String.format( - "No registry ID configured for %s.", this)); - } - } - - @SuppressWarnings("unchecked") - @Override - public void open(int taskNumber, int numTasks) throws IOException { - // get the remote's RMI Registry - try { - registry = LocateRegistry.getRegistry(this.remote, this.port); - } catch (RemoteException e) { - throw new IllegalStateException(e); - } - - // try to get an intance of an IRemoteCollector implementation - try { - this.remoteCollector = (RemoteCollector<T>) registry - .lookup(this.rmiId); - } catch (AccessException e) { - throw new IllegalStateException(e); - } catch (RemoteException e) { - throw new IllegalStateException(e); - } catch (NotBoundException e) { - throw new IllegalStateException(e); - } - } - - /** - * This method forwards records simply to the remote's - * {@link RemoteCollector} implementation - */ - @Override - public void writeRecord(T record) throws IOException { - remoteCollector.collect(record); - } - - /** - * This method unbinds the reference of the implementation of - * {@link RemoteCollector}. - */ - @Override - public void close() throws IOException { - } - - @Override - public String toString() { - return "RemoteCollectorOutputFormat(" + remote + ":" + port + ", " - + rmiId + ")"; - } - -}