Hi,
I have been working on Derby-2921( Replication: Add a network service that connects the master and slave Derby instances ) and have developed a preliminary patch for it. I have not been able to attach it to JIRA the whole of the afternoon today hence I am sending it as an attachment with this email along with the explanation and some questions I had during the implementation so that it would enable anyone interested to take a look at it.
I will attach this to the issue once JIRA allows to attach the patch. I would be able to attach it only on Monday.
Narayanan
Index: java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java =================================================================== --- java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java (revision 0) +++ java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java (revision 0) @@ -0,0 +1,212 @@ +/* + + Derby - Class org.apache.derby.impl.services.replication.net.ReplicationMessageTransmit + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to you under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + */ + +package org.apache.derby.impl.services.replication.net; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.ConnectException; +import java.net.InetAddress; + +import java.net.Socket; +import java.net.UnknownHostException; +import javax.net.SocketFactory; + +import org.apache.derby.iapi.error.StandardException; +import org.apache.derby.iapi.reference.SQLState; + +/** + * A class used for sending Replication Messages to a server running + * on another database. + */ +public class ReplicationMessageTransmit { + + /** + * The socket representing the connection with the + * replication peer. + */ + private Socket s; + + /** + * The host name of the peer to connect to. + */ + private String hostName; + + /** + * The port number of the peer to connect to. + */ + private int portNumber; + + /** + * The Object Output Stream from the socket that + * can be used to send the Message to the server. + */ + private ObjectOutputStream oos; + + /** + * Use this as the default port number if the port number + * is not mentioned. + */ + private final int DEFAULT_PORT_NO = 8001; + + /** + * Constructor for the <code>ReplicationMessageTransmit</code> class. + * Initialize with the supplied <code>hostName</code> and the + * <code>portNumber</code> and attempt to open a socket connection. + * + * @param hostName a String that represents the host name + * of the machine to which the connection + * must be established. + * @param portNumber an integer that represents at which + * the receiver will be listening for + * connections. + * @throws StandardException + */ + public void ReplicationMessageTransmit (String hostName, int portNumber) + throws StandardException { + this.hostName = hostName; + this.portNumber = portNumber; + //Call connect to initialize the socket connection. + connect (); + } + + /** + * Initialize with default values for <code>hostName</code>, + * <code>portNumber</code> + * + * @throws StandardException. + */ + private void defaultNetworkValues () throws StandardException { + //use localhost as the host name and 8001 as the + //default port number if the user does not + //supply the port number. + hostName = "localhost"; + portNumber = DEFAULT_PORT_NO; + } + + /** + * Open a socket connection and get the <code>ObjectOutputStream</code> + * from this connection. + * + * @throws StandardException + */ + private void getSocketAndInputOutputStreams () throws StandardException { + //The socket is opened in a privileged block + try { + //use the OpenSocketAction class to return a socket. + //This class can be later extended if ssl needs to + //be plugged into this layer. + s = (java.net.Socket) java.security.AccessController.doPrivileged + (new OpenSocketAction (hostName, portNumber)); + } catch (java.security.PrivilegedActionException e) { + //If a exception occurs during the opening of the + //socket, bundle the exception in a StandardException + //and throw it. + Exception openSocketException = e.getException (); + if (openSocketException instanceof IOException) { + throw StandardException.newException + (SQLState.REPLICATION_CONNECT_SOCKET_EXCEPTION, + openSocketException); + } else { + throw StandardException.newException + (SQLState.REPLICATION_UNEXPECTED_EXCEPTION, + openSocketException); + } + } + + try { + //Get the OutputStream from the socket + oos = new ObjectOutputStream (s.getOutputStream ()); + } catch(IOException e) { + //An exception has occurred during the initialization + //of the ObjectInputStream. Bundle this in a + //StandardException and throw it back. + throw StandardException.newException + (SQLState.REPLICATION_CONNECT_UNABLE_TO_OPEN_SOCKET_STREAM, e); + } + } + + /** + * checks if the <code>hostName</code> and the <code>portNumber</code> + * have valid values and if yes opens a socket connection using them, + * If valid values are not present it initializes them to default valid + * values and gets a connection. + * + * @throws StandardException + */ + private void connect () throws StandardException { + //check if valid values have been + //supplied for the hostName and the + //portNumber. If no then use the default + //values. + if(hostName == null || portNumber == -1) { + defaultNetworkValues (); + } + //Open the socket connection. + getSocketAndInputOutputStreams (); + } + + /** + * Sends a <code>ReplicationMessage</code>. + * + * @param message The request + * @throws StandardException + */ + public void send (ReplicationMessage message) + throws StandardException { + ReplicationMessage answer = null; + + try { + writeMessage (message); + } catch (StandardException se) { + //If the IOException has occurred due to the socket connection + //being closed, try to reconnect once more. If a connection is + //establised attempt to write again. Otherwise throw an + //exception indicating connection failure. + connect (); + writeMessage (message); + } catch (Exception e) { + throw StandardException.newException + (SQLState.REPLICATION_UNEXPECTED_EXCEPTION, + e); + } + } + + /** + * write the message to the <code>ObjectOutputStream</code> + * obtained from the <code>socket</code>. + * + * @throws StandardException. + */ + private void writeMessage(Object message) throws StandardException { + try { + oos.reset (); + //write the data to the server. + oos.writeObject (message); + oos.flush (); + } + catch(IOException ioe) { + throw StandardException.newException + (SQLState.REPLICATION_CONNECT_SOCKET_EXCEPTION, ioe); + } + } +} Index: java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java =================================================================== --- java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java (revision 0) +++ java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java (revision 0) @@ -0,0 +1,229 @@ +/* + + Derby - Class org.apache.derby.impl.services.replication.net.ReplicationMessageReceive + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to you under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + */ +package org.apache.derby.impl.services.replication.net; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.UnknownHostException; + +import javax.net.ServerSocketFactory; + +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; + +import org.apache.derby.iapi.error.StandardException; +import org.apache.derby.iapi.reference.SQLState; + +/** + * This class is the Receiver (viz. Socket server or listener) part of the + * network communication. It receives the message from the slave and + * performs the appropriate action depending on the type of the message. + */ +public class ReplicationMessageReceive extends Thread { + + /** + * The Hostname at which the server must be listeneing. + */ + private String hostName; + + /** + * The port number at which the server must be started. + */ + private int portNumber = -1; + + /** + * The InputStream obtained from the Socket. + */ + private ObjectInputStream ois; + + /** + * The server socket. + */ + ServerSocket serverSocket = null; + + /** + * The InetAddress of the server + */ + private InetAddress hostAddress; + + /** + * Use this as the default port number if the port number + * is not mentioned. + */ + private final int DEFAULT_PORT_NO = 8001; + + /** + * Constructor initializes the receiver with a + * <code>hostName</code> and a <code>portNumber</code> + * at which the socket server needs to be started. + * + * @param hostAddress the InetAddress of the host + * @param portNumber the portNumber of the host + * + * @throws StandardException + */ + public void ReplicationMessageReceive (InetAddress hostAddress, + int portNumber) throws StandardException { + this.hostAddress = hostAddress; + this.portNumber = portNumber; + try{ + //create a ServerSocket at the specified host name and the + //port number. + serverSocket = + (ServerSocket) + AccessController.doPrivileged (new PrivilegedExceptionAction () { + public Object run () throws IOException { + return createServerSocket (); + } + }); + } catch (PrivilegedActionException e) { + //Get the exception thrown and wrap it in + //a StandardException before throwing to + //to the user. + Exception e1 = e.getException (); + + if (e1 instanceof UnknownHostException) { + throw StandardException.newException ( + SQLState.REPLICATION_UNKNOWN_HOST, e1, hostName); + } else if (e1 instanceof IOException) { + throw StandardException.newException + (SQLState.REPLICATION_CONNECT_SOCKET_EXCEPTION, e1); + } else { + throw StandardException.newException + (SQLState.REPLICATION_UNEXPECTED_EXCEPTION, e1); + } + } catch (Exception e) { + throw StandardException.newException + (SQLState.REPLICATION_UNEXPECTED_EXCEPTION, e); + } + } + + /** + * Initialize with default values for <code>hostName</code>, + * <code>portNumber</code>. + * + * @throws StandardException. + */ + private void defaultNetworkValues () throws StandardException { + //use localhost as the host name and 8001 as the + //default port number if the user does not + //supply the port number. + try { + hostAddress = InetAddress.getLocalHost (); + } catch(UnknownHostException uhe) { + throw StandardException.newException ( + SQLState.REPLICATION_UNKNOWN_HOST, uhe, hostAddress); + } + portNumber = DEFAULT_PORT_NO; + } + + /** + * Open a <code>ServerSocket</code> connection at the given + * <code>hostName</code> and the <code>portNumber</code>. + * + * @throws an IOException. + */ + private ServerSocket createServerSocket () throws IOException { + ServerSocketFactory sf = + ServerSocketFactory.getDefault (); + return sf.createServerSocket (portNumber, 0, hostAddress); + + } + + /** + * Waits on the <code>socket</code> to listen to connections + * from the master. + */ + public void run () { + Socket client = null; + while(true) { + try { + //Start listening on the socket and accepting the connections + client = + (Socket) + AccessController.doPrivileged (new PrivilegedExceptionAction () { + public Object run () throws IOException { + return serverSocket.accept (); + } + }); + } catch (PrivilegedActionException e) { + Exception e1 = e.getException (); + throw new RuntimeException(StandardException.newException + (SQLState.REPLICATION_UNEXPECTED_EXCEPTION, e1)); + } + + try { + //Get the ObjectinputStream from the socket. + ois = new ObjectInputStream (client.getInputStream ()); + } catch(IOException e) { + throw new RuntimeException(StandardException.newException + (SQLState.REPLICATION_CONNECT_UNABLE_TO_OPEN_SOCKET_STREAM, + e)); + } + + //start waiting on the InputStream for the packets. + blockingRead (); + } + } + + /** + * wait on the <code>InputStream</code> from the <code>socket</code> + * to read packets from the master. + */ + public void blockingRead () { + while (true) { + try { + //Read the message received from the slave. + ReplicationMessage message = + (ReplicationMessage) ois.readObject (); + + switch (message.getType ()) { + case ReplicationMessage.TYPE_LOG: { + //Code for successful reception of log + //will be added later. + break; + } + default:{ + break; + } + } + } catch (IOException e) { + //An exception has occurred on the InputStream. + //Do not do anything. Return to the run method + //so that it can try to listen on the socket + //again + return; + } catch (ClassNotFoundException e) { + //return back to the listener on the + //socket. + return; + } catch(Exception e) { + //return back to the listener. + return; + } + } + } +} Index: java/engine/org/apache/derby/impl/services/replication/net/OpenSocketAction.java =================================================================== --- java/engine/org/apache/derby/impl/services/replication/net/OpenSocketAction.java (revision 0) +++ java/engine/org/apache/derby/impl/services/replication/net/OpenSocketAction.java (revision 0) @@ -0,0 +1,74 @@ +/* + + Derby - Class org.apache.derby.impl.services.replication.net.OpenSocketAction + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +*/ + +package org.apache.derby.impl.services.replication.net; + +import javax.net.SocketFactory; + +/** + * Used to open a socket connection. This class can later be enhanced + * to support SSL connection across the network. This class has been + * designed along the same lines as client/net/OpenSocketAction. + */ +public class OpenSocketAction implements + java.security.PrivilegedExceptionAction { + //HostName of the server to connect to. + private String server_; + //Port number of the server to connect to. + private int port_; + + /** + * The contructor that initializes the <code>hostName</code> + * and the <code>portNumber</code> to which the <code>socket</code> + * connection has to be opened. + * + * @param server a String that represents the hostName. + * @param port an Integer that represents a port number. + * + */ + public OpenSocketAction(String server, int port) { + server_ = server; + port_ = port; + } + + /** + * Create a <code>socket</code> and return the <code>socket</code>. + * + * @return an Object that contains the socket that has + * been created. + * + * @throws java.net.UnknownHostException + * java.io.IOException + * java.security.NoSuchAlgorithmException + * java.security.KeyManagementException + * + */ + + public Object run() + throws java.net.UnknownHostException, + java.io.IOException, + java.security.NoSuchAlgorithmException, + java.security.KeyManagementException { + + SocketFactory sf = SocketFactory.getDefault(); + return sf.createSocket(server_, port_); + } +} Index: java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessage.java =================================================================== --- java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessage.java (revision 0) +++ java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessage.java (revision 0) @@ -0,0 +1,73 @@ +/* + + Derby - Class org.apache.derby.impl.services.replication.net.ReplicationMessage + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to you under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + */ + +package org.apache.derby.impl.services.replication.net; + +import java.io.Serializable; + +/** + * Represents the message that will be exchanged + * between the master and the slave. + */ +public class ReplicationMessage implements Serializable{ + + // The message + private Object message; + // The type of the message + private int type; + + // Constants that are indicative of the + //contents of the message. + public static final int TYPE_LOG = 0; + public static final int TYPE_ACK = 1; + public static final int TYPE_ERROR = 2; + + /** + * Constructor used to set the <code>type</code> and <code>message</code>. + * + * @param type The type of this message. + * Must be one of the constants of this class + * @param message The message. Can be any object + */ + public ReplicationMessage (int type, Object message){ + this.type = type; + this.message = message; + } + + /** + * Get the <code>message</code> <code>Object</code> + * + * @return The object contained in the message + */ + public Object getMessage (){ + return message; + } + + /** + * Get the type of this <code>ReplicationMessage</code> + * + * + * @return The type + */ + public int getType (){ + return type; + } +} Index: java/engine/org/apache/derby/loc/messages.xml =================================================================== --- java/engine/org/apache/derby/loc/messages.xml (revision 560122) +++ java/engine/org/apache/derby/loc/messages.xml (working copy) @@ -7459,7 +7459,36 @@ </msg> </family> + + <family> + <title>Class XR - Derby Replication exceptions</title> + <msg> + <name>XR001</name> + <text>Socket Exception: '{0}'.</text> + <arg>error</arg> + </msg> + + <msg> + <name>XR002</name> + <text>Unable to open stream on socket: '{0}'.</text> + <arg>error</arg> + </msg> + <msg> + <name>XR003</name> + <text>Unable to resolve the host name '{1}' Exception: '{0}'.</text> + <arg>error</arg> + <arg>hostName</arg> + </msg> + + <msg> + <name>XR004</name> + <text>Unexpected exception during replication network communication initiation: '{0}'.</text> + <arg>error</arg> + </msg> + + </family> + </section> </messages> Index: java/shared/org/apache/derby/shared/common/reference/SQLState.java =================================================================== --- java/shared/org/apache/derby/shared/common/reference/SQLState.java (revision 560122) +++ java/shared/org/apache/derby/shared/common/reference/SQLState.java (working copy) @@ -1616,7 +1616,17 @@ // system severity String SHUTDOWN_DATABASE = "08006.D"; - + + //Used when a failure occurs while connecting between the + //master and the slave + //The SQLState begins with XR and runs sequentially. + //X - indicates it is derby specific + //R - indicates it is Related to replication. + String REPLICATION_CONNECT_SOCKET_EXCEPTION = "XR001"; + String REPLICATION_CONNECT_UNABLE_TO_OPEN_SOCKET_STREAM = "XR002"; + String REPLICATION_UNKNOWN_HOST = "XR003"; + String REPLICATION_UNEXPECTED_EXCEPTION = "XR004"; + //the following 2 exceptions are internal and never get seen by the user. String CLOSE_REQUEST = "close.C.1"; // no message in messages.properties as it is never printed
A java/engine/org/apache/derby/impl/services/replication A java/engine/org/apache/derby/impl/services/replication/net A java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java A java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java A java/engine/org/apache/derby/impl/services/replication/net/OpenSocketAction.java A java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessage.java M java/engine/org/apache/derby/loc/messages.xml M java/shared/org/apache/derby/shared/common/reference/SQLState.java
This patch contains the implementation of the Socket server and the socket client that will be used to communicate between the Master and the slave, during replication. The code will not be in derby.jar for now, but will be compiled in the classes directory. The class will be used by the replication master service which will contain a reference to these network classes. This will automatically be picked up and bundled into derby.jar once the master service is in place. The dependency detection is at present being done by a class called classlister.java. This class takes care of building the file derby.list that contains the list of all the classes that will be in derby.jar. Pls find below a file by file explanation of the classes that have been added or modified. A java/engine/org/apache/derby/impl/services/replication A java/engine/org/apache/derby/impl/services/replication/net The directory that contains the Network classes. As pointed out in Derby-2977 a Replication Master controller will be started as a service. This replication service and all the related classes will be placed in the services package under a sub-package called replication. Since this issue basically handles the network utility classes that will be used by the replication framework, this is placed under a sub-package of replication called net. A java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageTransmit.java The Transmitter is basically a client socket that takes care of sending the messages to the receiver. This will be used by the master to send log records and other messages to the slave. A java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessageReceive.java The Receiver is a server socket that takes care of receiving the messages that are sent by the transmitter. This will be used in the slave for receiving the log records and other messages. This will receive the messages and pass them on to the other classes that will perform appropriate action with them. A java/engine/org/apache/derby/impl/services/replication/net/OpenSocketAction.java This class has been designed along the same lines as org.apache.derby.client.net.OpenSocketAction the only difference being that it does not have ssl enabled socket creation ability for now. If this is required this can be added in the same lines of the original class at a later stage. A java/engine/org/apache/derby/impl/services/replication/net/ReplicationMessage.java The generic message unit that is sent between the master and the slave. M java/engine/org/apache/derby/loc/messages.xml M java/shared/org/apache/derby/shared/common/reference/SQLState.java The SQLState for the exceptions that are thrown when a exception occurs during Replication and their corresponding messages.
Some Questions for which I do not have an answer in this implementation 1) Replication would by default use the port number 8001 if the user does not specify the port number. The number 8001 was chosen randomly. Is it OK to do it this way? If no how should the port be chosen? 2) When a error occurs in the Transmitter or the Receiver it is bundled in a StandardException and thrown with a SQLState of XR001, XR002 etc. The logic behind choosing this was X - Means derby specific. I understood this from a scan of SQLState.java R - Replication specific. The last three digits would be sequential. Is this the correct way of assigning the SQLState? Is it OK to bundle the exceptions in a StandardException and throw them. 3) In the receiver the accept() method on the socket is called from the run method. We cannot throw checked exceptions from a run method. This has been handled this by bundling the exception thrown in a RuntimeException and throwing it. The way it is done in impl/drda/ClientThread.java does not offer a clue as to how this is to be done as there we print console messages which would not be possible in this case. Is RuntimeException the way out here? Is there a better way of doing this?