[FLINK-1882] Removed RemotedCollector classes

This closes #985


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/100e8c5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/100e8c5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/100e8c5f

Branch: refs/heads/master
Commit: 100e8c5ff9f6d25b3d5db326a5f31b9c4432e334
Parents: 5546a1e
Author: zentol <s.mo...@web.de>
Authored: Sat Jul 25 15:17:55 2015 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Aug 5 14:46:35 2015 +0200

----------------------------------------------------------------------
 .../RemoteCollectorOutputFormatExample.java     | 114 ----------
 .../flink/api/java/io/RemoteCollector.java      |  46 ----
 .../api/java/io/RemoteCollectorConsumer.java    |  26 ---
 .../flink/api/java/io/RemoteCollectorImpl.java  | 228 -------------------
 .../java/io/RemoteCollectorOutputFormat.java    | 175 --------------
 5 files changed, 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
 
b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
deleted file mode 100644
index f524718..0000000
--- 
a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.misc;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.io.RemoteCollectorConsumer;
-import org.apache.flink.api.java.io.RemoteCollectorImpl;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over some sample data and collects the results with an
- * implementation of a {@link RemoteCollectorConsumer}.
- */
-@SuppressWarnings("serial")
-public class RemoteCollectorOutputFormatExample {
-
-       public static void main(String[] args) throws Exception {
-
-               /**
-                * We create a remote {@link ExecutionEnvironment} here, 
because this
-                * OutputFormat is designed for use in a distributed setting. 
For local
-                * use you should consider using the {@link 
LocalCollectionOutputFormat
-                * <T>}.
-                */
-               final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment("<remote>", 6124,
-                               "/path/to/your/file.jar");
-
-               // get input data
-               DataSet<String> text = env.fromElements(
-                               "To be, or not to be,--that is the question:--",
-                               "Whether 'tis nobler in the mind to suffer",
-                               "The slings and arrows of outrageous fortune",
-                               "Or to take arms against a sea of troubles,");
-
-               DataSet<Tuple2<String, Integer>> counts =
-               // split up the lines in pairs (2-tuples) containing: (word,1)
-               text.flatMap(new LineSplitter())
-               // group by the tuple field "0" and sum up tuple field "1"
-                               .groupBy(0).aggregate(Aggregations.SUM, 1);
-
-               // emit result
-               RemoteCollectorImpl.collectLocal(counts,
-                               new RemoteCollectorConsumer<Tuple2<String, 
Integer>>() {
-                                       // user defined IRemoteCollectorConsumer
-                                       @Override
-                                       public void collect(Tuple2<String, 
Integer> element) {
-                                               
System.out.println("word/occurrences:" + element);
-                                       }
-                               });
-
-               // local collection to store results in
-               Set<Tuple2<String, Integer>> collection = new 
HashSet<Tuple2<String, Integer>>();
-               // collect results from remote in local collection
-               RemoteCollectorImpl.collectLocal(counts, collection);
-
-               // execute program
-               env.execute("WordCount Example with 
RemoteCollectorOutputFormat");
-
-               System.out.println(collection);
-               
-               RemoteCollectorImpl.shutdownAll();
-       }
-
-       //
-       // User Functions
-       //
-
-       /**
-        * Implements the string tokenizer that splits sentences into words as a
-        * user-defined FlatMapFunction. The function takes a line (String) and
-        * splits it into multiple pairs in the form of "(word,1)" 
(Tuple2<String,
-        * Integer>).
-        */
-       public static final class LineSplitter implements
-                       FlatMapFunction<String, Tuple2<String, Integer>> {
-
-               @Override
-               public void flatMap(String value, Collector<Tuple2<String, 
Integer>> out) {
-                       // normalize and split the line
-                       String[] tokens = value.toLowerCase().split("\\W+");
-
-                       // emit the pairs
-                       for (String token : tokens) {
-                               if (token.length() > 0) {
-                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
deleted file mode 100644
index bcfc332..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.java.DataSet;
-
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-
-/**
- * This interface is the counterpart to the {@link RemoteCollectorOutputFormat}
- * and implementations will receive remote results through the collect 
function.
- * 
- * @param <T>
- *            The type of the records the collector will receive
- *
- * @deprecated Results are retrieved through {@link 
org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-@Deprecated
-public interface RemoteCollector<T> extends Remote {
-
-       public void collect(T element) throws RemoteException;
-
-       public RemoteCollectorConsumer<T> getConsumer() throws RemoteException;
-
-       public void setConsumer(RemoteCollectorConsumer<T> consumer)
-                       throws RemoteException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
deleted file mode 100644
index 439c6af..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-/**
- * This interface describes consumers of {@link RemoteCollector} 
implementations.
- */
-public interface RemoteCollectorConsumer<T> {
-       public void collect(T element);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
deleted file mode 100644
index 2d080ab..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.ServerSocket;
-import java.rmi.AccessException;
-import java.rmi.AlreadyBoundException;
-import java.rmi.NotBoundException;
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSink;
-
-/**
- * This class provides a counterpart implementation for the
- * {@link RemoteCollectorOutputFormat}.
- *
- * @deprecated Results are retrieved through {@link 
org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-
-@Deprecated
-public class RemoteCollectorImpl<T> extends UnicastRemoteObject implements
-               RemoteCollector<T> {
-
-       private static final long serialVersionUID = 1L;
-
-       /**
-        * Instance of an implementation of a {@link RemoteCollectorConsumer}. 
This
-        * instance will get the records passed.
-        */
-
-       private RemoteCollectorConsumer<T> consumer;
-       
-        /**
-         * This list stores all created {@link Registry}s to unbind and 
unexport all
-         * exposed {@link Remote} objects ({@link RemoteCollectorConsumer} in 
our
-         * case) in the shutdown phase.
-         */
-       private static List<Registry> registries = new ArrayList<Registry>();
-
-       /**
-        * This factory method creates an instance of the
-        * {@link RemoteCollectorImpl} and binds it in the local RMI
-        * {@link Registry}.
-        * 
-        * @param port
-        *            The port where the local colector is listening.
-        * @param consumer
-        *            The consumer instance.
-        * @param rmiId 
-        *                An ID to register the collector in the RMI registry.
-        */
-       public static <T> void createAndBind(Integer port, 
RemoteCollectorConsumer<T> consumer, String rmiId) {
-               RemoteCollectorImpl<T> collectorInstance = null;
-
-               try {
-                       collectorInstance = new RemoteCollectorImpl<T>();
-
-                       Registry registry;
-
-                       registry = LocateRegistry.createRegistry(port);
-                       registry.bind(rmiId, collectorInstance);
-                       
-                       registries.add(registry);
-               } catch (RemoteException e) {
-                       e.printStackTrace();
-               } catch (AlreadyBoundException e) {
-                       e.printStackTrace();
-               }
-
-               collectorInstance.setConsumer(consumer);
-       }
-
-       /**
-        * Writes a DataSet to a {@link RemoteCollectorConsumer} through an
-        * {@link RemoteCollector} remotely called from the
-        * {@link RemoteCollectorOutputFormat}.<br/>
-        * 
-        * @return The DataSink that writes the DataSet.
-        */
-       public static <T> DataSink<T> collectLocal(DataSet<T> source,
-                       RemoteCollectorConsumer<T> consumer) {
-               // if the RMI parameter was not set by the user make a "good 
guess"
-               String ip = System.getProperty("java.rmi.server.hostname");
-               if (ip == null) {
-                       Enumeration<NetworkInterface> networkInterfaces = null;
-                       try {
-                               networkInterfaces = 
NetworkInterface.getNetworkInterfaces();
-                       } catch (Throwable t) {
-                               throw new RuntimeException(t);
-                       }
-                       while (networkInterfaces.hasMoreElements()) {
-                               NetworkInterface networkInterface = 
(NetworkInterface) networkInterfaces
-                                               .nextElement();
-                               Enumeration<InetAddress> inetAddresses = 
networkInterface
-                                               .getInetAddresses();
-                               while (inetAddresses.hasMoreElements()) {
-                                       InetAddress inetAddress = (InetAddress) 
inetAddresses
-                                                       .nextElement();
-                                       if (!inetAddress.isLoopbackAddress()
-                                                       && inetAddress 
instanceof Inet4Address) {
-                                               ip = 
inetAddress.getHostAddress();
-                                               
System.setProperty("java.rmi.server.hostname", ip);
-                                       }
-                               }
-                       }
-               }
-
-               // get some random free port
-               Integer randomPort = 0;
-               try {
-                       ServerSocket tmp = new ServerSocket(0);
-                       randomPort = tmp.getLocalPort();
-                       tmp.close();
-               } catch (Throwable t) {
-                       throw new RuntimeException(t);
-               }
-
-               // create an ID for this output format instance
-               String rmiId = String.format("%s-%s", 
RemoteCollectorOutputFormat.class.getName(), UUID.randomUUID());
-               
-               // create the local listening object and bind it to the RMI 
registry
-               RemoteCollectorImpl.createAndBind(randomPort, consumer, rmiId);
-
-               // create and configure the output format
-               OutputFormat<T> remoteCollectorOutputFormat = new 
RemoteCollectorOutputFormat<T>(ip, randomPort, rmiId);
-
-               // create sink
-               return source.output(remoteCollectorOutputFormat);
-       }
-
-       /**
-        * Writes a DataSet to a local {@link Collection} through an
-        * {@link RemoteCollector} and a standard {@link 
RemoteCollectorConsumer}
-        * implementation remotely called from the
-        * {@link RemoteCollectorOutputFormat}.<br/>
-        * 
-        * @param source the source data set
-        * @param collection the local collection
-        */
-       public static <T> void collectLocal(DataSet<T> source,
-                       Collection<T> collection) {
-               final Collection<T> synchronizedCollection = Collections
-                               .synchronizedCollection(collection);
-               collectLocal(source, new RemoteCollectorConsumer<T>() {
-                       @Override
-                       public void collect(T element) {
-                               synchronizedCollection.add(element);
-                       }
-               });
-       }
-
-       /**
-        * Necessary private default constructor.
-        * 
-        * @throws RemoteException
-        */
-       private RemoteCollectorImpl() throws RemoteException {
-               super();
-       }
-
-       /**
-        * This method is called by the remote to collect records.
-        */
-       @Override
-       public void collect(T element) throws RemoteException {
-               this.consumer.collect(element);
-       }
-
-       @Override
-       public RemoteCollectorConsumer<T> getConsumer() {
-               return this.consumer;
-       }
-
-       @Override
-       public void setConsumer(RemoteCollectorConsumer<T> consumer) {
-               this.consumer = consumer;
-       }
-
-       /**
-        * This method unbinds and unexports all exposed {@link Remote} objects
-        * 
-        * @throws AccessException
-        * @throws RemoteException
-        * @throws NotBoundException
-        */
-       public static void shutdownAll() throws AccessException, 
RemoteException, NotBoundException {
-               for (Registry registry : registries) {
-                       for (String id : registry.list()) {
-                               Remote remote = registry.lookup(id);
-                               registry.unbind(id);
-                               UnicastRemoteObject.unexportObject(remote, 
true);
-                       }
-
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
deleted file mode 100644
index 3fe5cef..0000000
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-import java.rmi.AccessException;
-import java.rmi.NotBoundException;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * An output format that sends results through JAVA RMI to an
- * {@link RemoteCollector} implementation. The client has to provide an
- * implementation of {@link RemoteCollector} and has to write it's plan's 
output
- * into an instance of {@link RemoteCollectorOutputFormat}. Further in the
- * client's VM parameters -Djava.rmi.server.hostname should be set to the own 
IP
- * address.
- *
- * @deprecated Results are retrieved through {@link 
org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-@Deprecated
-public class RemoteCollectorOutputFormat<T> implements OutputFormat<T> {
-
-       private static final long serialVersionUID = 1922744224032398102L;
-
-       /**
-        * The reference of the {@link RemoteCollector} object
-        */
-       private transient RemoteCollector<T> remoteCollector;
-
-       transient private Registry registry;
-
-       /**
-        * Config parameter for the remote's port number
-        */
-       public static final String PORT = "port";
-       /**
-        * Config parameter for the remote's address
-        */
-       public static final String REMOTE = "remote";
-       /**
-        * An id used necessary for Java RMI
-        */
-       public static final String RMI_ID = "rmiId";
-
-       private String remote;
-
-       private int port;
-
-       private String rmiId;
-
-       /**
-        * Create a new {@link RemoteCollectorOutputFormat} instance. The 
remote and
-        * port for this output are by default localhost:8888 but can be 
configured
-        * via a {@link Configuration} object.
-        * 
-        * @see RemoteCollectorOutputFormat#REMOTE
-        * @see RemoteCollectorOutputFormat#PORT
-        */
-       public RemoteCollectorOutputFormat() {
-               this("localhost", 8888, null);
-       }
-
-       /**
-        * Creates a new {@link RemoteCollectorOutputFormat} instance for the
-        * specified remote and port.
-        * 
-        * @param rmiId
-        */
-       public RemoteCollectorOutputFormat(String remote, int port, String 
rmiId) {
-               super();
-               this.remote = remote;
-               this.port = port;
-               this.rmiId = rmiId;
-               
-               if (this.remote == null) {
-                       throw new IllegalStateException(String.format(
-                                       "No remote configured for %s.", this));
-               }
-
-               if (this.rmiId == null) {
-                       throw new IllegalStateException(String.format(
-                                       "No registry ID configured for %s.", 
this));
-               }
-       }
-
-       @Override
-       /**
-        * This method receives the Configuration object, where the fields 
"remote" and "port" must be set.
-        */
-       public void configure(Configuration parameters) {
-               this.remote = parameters.getString(REMOTE, this.remote);
-               this.port = parameters.getInteger(PORT, this.port);
-               this.rmiId = parameters.getString(RMI_ID, this.rmiId);
-
-               if (this.remote == null) {
-                       throw new IllegalStateException(String.format(
-                                       "No remote configured for %s.", this));
-               }
-
-               if (this.rmiId == null) {
-                       throw new IllegalStateException(String.format(
-                                       "No registry ID configured for %s.", 
this));
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void open(int taskNumber, int numTasks) throws IOException {
-               // get the remote's RMI Registry
-               try {
-                       registry = LocateRegistry.getRegistry(this.remote, 
this.port);
-               } catch (RemoteException e) {
-                       throw new IllegalStateException(e);
-               }
-
-               // try to get an intance of an IRemoteCollector implementation
-               try {
-                       this.remoteCollector = (RemoteCollector<T>) registry
-                                       .lookup(this.rmiId);
-               } catch (AccessException e) {
-                       throw new IllegalStateException(e);
-               } catch (RemoteException e) {
-                       throw new IllegalStateException(e);
-               } catch (NotBoundException e) {
-                       throw new IllegalStateException(e);
-               }
-       }
-
-       /**
-        * This method forwards records simply to the remote's
-        * {@link RemoteCollector} implementation
-        */
-       @Override
-       public void writeRecord(T record) throws IOException {
-               remoteCollector.collect(record);
-       }
-
-       /**
-        * This method unbinds the reference of the implementation of
-        * {@link RemoteCollector}.
-        */
-       @Override
-       public void close() throws IOException {
-       }
-
-       @Override
-       public String toString() {
-               return "RemoteCollectorOutputFormat(" + remote + ":" + port + 
", "
-                               + rmiId + ")";
-       }
-
-}

Reply via email to