Author: chirino
Date: Mon Nov 24 09:44:12 2008
New Revision: 720234
URL: http://svn.apache.org/viewvc?rev=720234&view=rev
Log:
- Switched to using camel case in the xml emlement names to make things
consistent.
- Replaced the asyncReplication property in the ReplicationService with a
minimumReplicas properties. When set to 0, async replication will be in effect.
- Also removed the use of a map to track replication requests since at most,
only 1 sync requrest is issued at a time.
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
activemq/trunk/kahadb/src/test/resources/broker1/ha.xml
activemq/trunk/kahadb/src/test/resources/broker2/ha.xml
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationBrokerService.java
Mon Nov 24 09:44:12 2008
@@ -26,7 +26,7 @@
* he will create the actual BrokerService
*
* @author chirino
- * @org.apache.xbean.XBean element="kahadb-replication-broker"
+ * @org.apache.xbean.XBean element="kahadbReplicationBroker"
*/
public class ReplicationBrokerService extends BrokerService {
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
Mon Nov 24 09:44:12 2008
@@ -23,9 +23,10 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -51,58 +52,61 @@
import org.apache.kahadb.store.KahaDBStore;
import org.apache.kahadb.util.ByteSequence;
-import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
-import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
public class ReplicationMaster implements Service, ClusterListener,
ReplicationTarget {
- private static final Log LOG =
LogFactory.getLog(ReplicationService.class);
+ private static final Log LOG = LogFactory.getLog(ReplicationService.class);
- private final ReplicationService replicationService;
+ private final ReplicationService replicationService;
+
+ private Object serverMutex = new Object() {
+ };
+ private TransportServer server;
+
+ private ArrayList<ReplicationSession> sessions = new
ArrayList<ReplicationSession>();
+
+ private final AtomicInteger nextSnapshotId = new AtomicInteger();
+ private final Object requestMutex = new Object(){};
+ private Location requestLocation;
+ private CountDownLatch requestLatch;
+ private int minimumReplicas;
+
+ public ReplicationMaster(ReplicationService replicationService) {
+ this.replicationService = replicationService;
+ minimumReplicas = replicationService.getMinimumReplicas();
+ }
+
+ public void start() throws Exception {
+ synchronized (serverMutex) {
+ server = TransportFactory.bind(new
URI(replicationService.getUri()));
+ server.setAcceptListener(new TransportAcceptListener() {
+ public void onAccept(Transport transport) {
+ try {
+ synchronized (serverMutex) {
+ ReplicationSession session = new
ReplicationSession(transport);
+ session.start();
+ addSession(session);
+ }
+ } catch (Exception e) {
+ LOG.info("Could not accept replication connection from
slave at " + transport.getRemoteAddress() + ", due to: " + e, e);
+ }
+ }
+
+ public void onAcceptError(Exception e) {
+ LOG.info("Could not accept replication connection: " + e,
e);
+ }
+ });
+ server.start();
+ }
+ replicationService.getStore().getJournal().setReplicationTarget(this);
+ }
- private Object serverMutex = new Object() {};
- private TransportServer server;
-
- private ArrayList<ReplicationSession> sessions = new
ArrayList<ReplicationSession>();
-
- private final AtomicInteger nextSnapshotId = new AtomicInteger();
- private final Map<Location, CountDownLatch> requestMap = new
LinkedHashMap<Location, CountDownLatch>();
-
- public ReplicationMaster(ReplicationService replicationService) {
- this.replicationService = replicationService;
- }
-
- public void start() throws Exception {
- synchronized (serverMutex) {
- server = TransportFactory.bind(new
URI(replicationService.getUri()));
- server.setAcceptListener(new TransportAcceptListener() {
- public void onAccept(Transport transport) {
- try {
- synchronized (serverMutex) {
- ReplicationSession
session = new ReplicationSession(transport);
- session.start();
- addSession(session);
- }
- } catch (Exception e) {
- LOG.info("Could not accept
replication connection from slave at " + transport.getRemoteAddress() + ", due
to: " + e, e);
- }
- }
-
- public void onAcceptError(Exception e) {
- LOG.info("Could not accept replication
connection: " + e, e);
- }
- });
- server.start();
- }
-
replicationService.getStore().getJournal().setReplicationTarget(this);
- }
-
boolean isStarted() {
synchronized (serverMutex) {
- return server!=null;
+ return server != null;
}
}
-
+
public void stop() throws Exception {
replicationService.getStore().getJournal().setReplicationTarget(null);
synchronized (serverMutex) {
@@ -111,24 +115,24 @@
server = null;
}
}
-
+
ArrayList<ReplicationSession> sessionsSnapshot;
synchronized (this.sessions) {
sessionsSnapshot = this.sessions;
}
-
- for (ReplicationSession session: sessionsSnapshot) {
+
+ for (ReplicationSession session : sessionsSnapshot) {
session.stop();
}
}
- protected void addSession(ReplicationSession session) {
- synchronized (sessions) {
- sessions = new ArrayList<ReplicationSession>(sessions);
- sessions.add(session);
+ protected void addSession(ReplicationSession session) {
+ synchronized (sessions) {
+ sessions = new ArrayList<ReplicationSession>(sessions);
+ sessions.add(session);
}
}
-
+
protected void removeSession(ReplicationSession session) {
synchronized (sessions) {
sessions = new ArrayList<ReplicationSession>(sessions);
@@ -136,352 +140,348 @@
}
}
- public void onClusterChange(ClusterState config) {
- // For now, we don't really care about changes in the slave
config..
- }
-
- /**
- * This is called by the Journal so that we can replicate the update to
the
- * slaves.
- */
- public void replicate(Location location, ByteSequence sequence, boolean
sync) {
- ArrayList<ReplicationSession> sessionsSnapshot;
+ public void onClusterChange(ClusterState config) {
+ // For now, we don't really care about changes in the slave config..
+ }
+
+ /**
+ * This is called by the Journal so that we can replicate the update to the
+ * slaves.
+ */
+ public void replicate(Location location, ByteSequence sequence, boolean
sync) {
+ ArrayList<ReplicationSession> sessionsSnapshot;
synchronized (this.sessions) {
// Hurrah for copy on write..
sessionsSnapshot = this.sessions;
}
-
- // We may be configured to always do async replication..
- if ( replicationService.isAsyncReplication() ) {
- sync=false;
- }
- CountDownLatch latch=null;
- if( sync ) {
- latch = new CountDownLatch(1);
- synchronized (requestMap) {
- requestMap.put(location, latch);
- }
- }
-
- ReplicationFrame frame=null;
- for (ReplicationSession session : sessionsSnapshot) {
- if( session.subscribedToJournalUpdates.get() ) {
-
- // Lazy create the frame since we may have not
avilable sessions to send to.
- if( frame == null ) {
- frame = new ReplicationFrame();
+ // We may be able to always async replicate...
+ if (minimumReplicas==0) {
+ sync = false;
+ }
+ CountDownLatch latch = null;
+ if (sync) {
+ latch = new CountDownLatch(minimumReplicas);
+ synchronized (requestMutex) {
+ requestLatch = latch;
+ requestLocation = location;
+ }
+ }
+
+ ReplicationFrame frame = null;
+ for (ReplicationSession session : sessionsSnapshot) {
+ if (session.subscribedToJournalUpdates.get()) {
+
+ // Lazy create the frame since we may have not avilable
sessions
+ // to send to.
+ if (frame == null) {
+ frame = new ReplicationFrame();
frame.setHeader(new
PBHeader().setType(PBType.JOURNAL_UPDATE));
PBJournalUpdate payload = new PBJournalUpdate();
payload.setLocation(ReplicationSupport.convert(location));
payload.setData(new
org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(),
sequence.getLength()));
payload.setSendAck(sync);
frame.setPayload(payload);
- }
+ }
+
+ // TODO: use async send threads so that the frames can be
pushed
+ // out in parallel.
+ try {
+ session.setLastUpdateLocation(location);
+ session.transport.oneway(frame);
+ } catch (IOException e) {
+ session.onException(e);
+ }
+ }
+ }
- // TODO: use async send threads so that the
frames can be pushed out in parallel.
- try {
- session.setLastUpdateLocation(location);
- session.transport.oneway(frame);
- } catch (IOException e) {
- session.onException(e);
- }
- }
- }
-
if (sync) {
try {
int timeout = 500;
- int counter=0;
- while( true ) {
- if( latch.await(timeout, TimeUnit.MILLISECONDS) ) {
- synchronized (requestMap) {
- requestMap.remove(location);
- }
+ int counter = 0;
+ while (true) {
+ if (latch.await(timeout, TimeUnit.MILLISECONDS)) {
return;
}
- if( !isStarted() ) {
+ if (!isStarted()) {
return;
}
counter++;
- if( (counter%10)==0 ) {
- LOG.warn("KahaDB is waiting for slave to come online.
"+(timeout*counter/1000.f)+" seconds have elapsed.");
+ if ((counter % 10) == 0) {
+ LOG.warn("KahaDB is waiting for slave to come online.
" + (timeout * counter / 1000.f) + " seconds have elapsed.");
}
- }
+ }
} catch (InterruptedException ignore) {
}
}
-
- }
-
+
+ }
+
private void ackAllFromTo(Location lastAck, Location newAck) {
- if ( replicationService.isAsyncReplication() ) {
+ Location l;
+ java.util.concurrent.CountDownLatch latch;
+ synchronized (requestMutex) {
+ latch = requestLatch;
+ l = requestLocation;
+ }
+ if( l == null ) {
return;
}
- ArrayList<Entry<Location, CountDownLatch>> entries;
- synchronized (requestMap) {
- entries = new ArrayList<Entry<Location,
CountDownLatch>>(requestMap.entrySet());
- }
- boolean inRange=false;
- for (Entry<Location, CountDownLatch> entry : entries) {
- Location l = entry.getKey();
- if( !inRange ) {
- if( lastAck==null || lastAck.compareTo(l) < 0 ) {
- inRange=true;
- }
- }
- if( inRange ) {
- entry.getValue().countDown();
- if( newAck!=null && l.compareTo(newAck) <= 0 ) {
- return;
- }
+ if (lastAck == null || lastAck.compareTo(l) < 0) {
+ if (newAck != null && l.compareTo(newAck) <= 0) {
+ latch.countDown();
+ return;
}
- }
+ }
}
+ class ReplicationSession implements Service, TransportListener {
- class ReplicationSession implements Service, TransportListener {
-
- private final Transport transport;
- private final AtomicBoolean subscribedToJournalUpdates = new
AtomicBoolean();
+ private final Transport transport;
+ private final AtomicBoolean subscribedToJournalUpdates = new
AtomicBoolean();
private boolean stopped;
-
- private File snapshotFile;
- private HashSet<Integer> journalReplicatedFiles;
- private Location lastAckLocation;
+
+ private File snapshotFile;
+ private HashSet<Integer> journalReplicatedFiles;
+ private Location lastAckLocation;
private Location lastUpdateLocation;
private boolean online;
- public ReplicationSession(Transport transport) {
- this.transport = transport;
- }
+ public ReplicationSession(Transport transport) {
+ this.transport = transport;
+ }
- synchronized public void setLastUpdateLocation(Location
lastUpdateLocation) {
+ synchronized public void setLastUpdateLocation(Location
lastUpdateLocation) {
this.lastUpdateLocation = lastUpdateLocation;
}
public void start() throws Exception {
- transport.setTransportListener(this);
- transport.start();
- }
+ transport.setTransportListener(this);
+ transport.start();
+ }
synchronized public void stop() throws Exception {
- if ( !stopped ) {
- stopped=true;
- deleteReplicationData();
- transport.stop();
- }
- }
+ if (!stopped) {
+ stopped = true;
+ deleteReplicationData();
+ transport.stop();
+ }
+ }
- synchronized private void onJournalUpdateAck(ReplicationFrame
frame, PBJournalLocation location) {
+ synchronized private void onJournalUpdateAck(ReplicationFrame frame,
PBJournalLocation location) {
Location ack = ReplicationSupport.convert(location);
- if( online ) {
+ if (online) {
ackAllFromTo(lastAckLocation, ack);
- }
- lastAckLocation=ack;
- }
-
- synchronized private void onSlaveOnline(ReplicationFrame frame)
{
+ }
+ lastAckLocation = ack;
+ }
+
+ synchronized private void onSlaveOnline(ReplicationFrame frame) {
deleteReplicationData();
- online = true;
- if( lastAckLocation!=null ) {
+ online = true;
+ if (lastAckLocation != null) {
ackAllFromTo(null, lastAckLocation);
}
-
+
}
public void onCommand(Object command) {
- try {
- ReplicationFrame frame = (ReplicationFrame)
command;
- switch (frame.getHeader().getType()) {
- case SLAVE_INIT:
- onSlaveInit(frame, (PBSlaveInit)
frame.getPayload());
- break;
- case SLAVE_ONLINE:
- onSlaveOnline(frame);
- break;
- case FILE_TRANSFER:
- onFileTransfer(frame, (PBFileInfo)
frame.getPayload());
- break;
- case JOURNAL_UPDATE_ACK:
- onJournalUpdateAck(frame,
(PBJournalLocation) frame.getPayload());
- break;
- }
- } catch (Exception e) {
- LOG.warn("Slave request failed: "+e, e);
- failed(e);
- }
- }
-
- public void onException(IOException error) {
- failed(error);
- }
-
- public void failed(Exception error) {
- try {
- stop();
- } catch (Exception ignore) {
- }
- }
-
- public void transportInterupted() {
- }
- public void transportResumed() {
- }
-
- private void deleteReplicationData() {
- if( snapshotFile!=null ) {
- snapshotFile.delete();
- snapshotFile=null;
- }
- if( journalReplicatedFiles!=null ) {
- journalReplicatedFiles=null;
- updateJournalReplicatedFiles();
- }
- }
-
- private void onSlaveInit(ReplicationFrame frame, PBSlaveInit
slaveInit) throws Exception {
-
- // Start sending journal updates to the slave.
- subscribedToJournalUpdates.set(true);
-
- // We could look at the slave state sent in the
slaveInit and decide
- // that a full sync is not needed..
- // but for now we will do a full sync every time.
- ReplicationFrame rc = new ReplicationFrame();
- final PBSlaveInitResponse rcPayload = new
PBSlaveInitResponse();
- rc.setHeader(new
PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
- rc.setPayload(rcPayload);
-
- // Setup a map of all the files that the slave has
- final HashMap<String, PBFileInfo> slaveFiles = new
HashMap<String, PBFileInfo>();
- for (PBFileInfo info : slaveInit.getCurrentFilesList())
{
- slaveFiles.put(info.getName(), info);
- }
-
-
- final KahaDBStore store = replicationService.getStore();
- store.checkpoint(new Callback() {
- public void execute() throws Exception {
- // This call back is executed once the
checkpoint is
- // completed and all data has been
synced to disk,
- // but while a lock is still held on
the store so
- // that no updates are done while we
are in this
- // method.
-
- KahaDBStore store =
replicationService.getStore();
- if( lastAckLocation==null ) {
- lastAckLocation =
store.getLastUpdatePosition();
- }
-
- int snapshotId =
nextSnapshotId.incrementAndGet();
- File file =
store.getPageFile().getFile();
- File dir =
replicationService.getTempReplicationDir();
- dir.mkdirs();
- snapshotFile = new File(dir,
"snapshot-" + snapshotId);
-
- journalReplicatedFiles = new
HashSet<Integer>();
-
- // Store the list files associated with
the snapshot.
- ArrayList<PBFileInfo> snapshotInfos =
new ArrayList<PBFileInfo>();
- Map<Integer, DataFile> journalFiles =
store.getJournal().getFileMap();
- for (DataFile df :
journalFiles.values()) {
- // Look at what the slave has
so that only the missing bits are transfered.
- String name = "journal-" +
df.getDataFileId();
- PBFileInfo slaveInfo =
slaveFiles.remove(name);
-
- // Use the checksum info to see
if the slave has the file already.. Checksums are less acurrate for
- // small amounts of data.. so
ignore small files.
- if( slaveInfo!=null &&
slaveInfo.getEnd()> 1024*512 ) {
- // If the slave's file
checksum matches what we have..
- if(
ReplicationSupport.checksum(df.getFile(), 0,
slaveInfo.getEnd())==slaveInfo.getChecksum() ) {
- // is Our file
longer? then we need to continue transferring the rest of the file.
- if(
df.getLength() > slaveInfo.getEnd() ) {
-
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(),
slaveInfo.getEnd(), df.getLength()));
-
journalReplicatedFiles.add(df.getDataFileId());
-
continue;
- } else {
- // No
need to replicate this file.
-
continue;
- }
- }
- }
-
- // If we got here then it means
we need to transfer the whole file.
-
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 0,
df.getLength()));
-
journalReplicatedFiles.add(df.getDataFileId());
- }
-
- PBFileInfo info = new PBFileInfo();
- info.setName("database");
- info.setSnapshotId(snapshotId);
- info.setStart(0);
- info.setEnd(file.length());
-
info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
- snapshotInfos.add(info);
-
-
rcPayload.setCopyFilesList(snapshotInfos);
- ArrayList<String> deleteFiles = new
ArrayList<String>();
- slaveFiles.remove("database");
- for (PBFileInfo unused :
slaveFiles.values()) {
-
deleteFiles.add(unused.getName());
- }
-
rcPayload.setDeleteFilesList(deleteFiles);
-
- updateJournalReplicatedFiles();
- }
-
- });
-
- transport.oneway(rc);
- }
-
- private void onFileTransfer(ReplicationFrame frame, PBFileInfo
fileInfo) throws IOException {
- File file =
replicationService.getReplicationFile(fileInfo.getName());
- long payloadSize =
fileInfo.getEnd()-fileInfo.getStart();
-
- if( file.length() < fileInfo.getStart()+payloadSize ) {
- throw new IOException("Requested replication
file dose not have enough data.");
- }
-
- ReplicationFrame rc = new ReplicationFrame();
- rc.setHeader(new
PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
-
- FileInputStream is = new FileInputStream(file);
- rc.setPayload(is);
- try {
- is.skip(fileInfo.getStart());
- transport.oneway(rc);
- } finally {
- try {
- is.close();
- } catch (Throwable e) {
- }
- }
- }
-
- }
-
- /**
- * Looks at all the journal files being currently replicated and
informs the KahaDB so that
- * it does not delete them while the replication is occuring.
- */
- private void updateJournalReplicatedFiles() {
- HashSet<Integer> files =
replicationService.getStore().getJournalFilesBeingReplicated();
- files.clear();
+ try {
+ ReplicationFrame frame = (ReplicationFrame)command;
+ switch (frame.getHeader().getType()) {
+ case SLAVE_INIT:
+ onSlaveInit(frame, (PBSlaveInit)frame.getPayload());
+ break;
+ case SLAVE_ONLINE:
+ onSlaveOnline(frame);
+ break;
+ case FILE_TRANSFER:
+ onFileTransfer(frame, (PBFileInfo)frame.getPayload());
+ break;
+ case JOURNAL_UPDATE_ACK:
+ onJournalUpdateAck(frame,
(PBJournalLocation)frame.getPayload());
+ break;
+ }
+ } catch (Exception e) {
+ LOG.warn("Slave request failed: " + e, e);
+ failed(e);
+ }
+ }
+
+ public void onException(IOException error) {
+ failed(error);
+ }
+
+ public void failed(Exception error) {
+ try {
+ stop();
+ } catch (Exception ignore) {
+ }
+ }
+
+ public void transportInterupted() {
+ }
+
+ public void transportResumed() {
+ }
+
+ private void deleteReplicationData() {
+ if (snapshotFile != null) {
+ snapshotFile.delete();
+ snapshotFile = null;
+ }
+ if (journalReplicatedFiles != null) {
+ journalReplicatedFiles = null;
+ updateJournalReplicatedFiles();
+ }
+ }
+
+ private void onSlaveInit(ReplicationFrame frame, PBSlaveInit
slaveInit) throws Exception {
+
+ // Start sending journal updates to the slave.
+ subscribedToJournalUpdates.set(true);
+
+ // We could look at the slave state sent in the slaveInit and
decide
+ // that a full sync is not needed..
+ // but for now we will do a full sync every time.
+ ReplicationFrame rc = new ReplicationFrame();
+ final PBSlaveInitResponse rcPayload = new PBSlaveInitResponse();
+ rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
+ rc.setPayload(rcPayload);
+
+ // Setup a map of all the files that the slave has
+ final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String,
PBFileInfo>();
+ for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
+ slaveFiles.put(info.getName(), info);
+ }
+
+ final KahaDBStore store = replicationService.getStore();
+ store.checkpoint(new Callback() {
+ public void execute() throws Exception {
+ // This call back is executed once the checkpoint is
+ // completed and all data has been synced to disk,
+ // but while a lock is still held on the store so
+ // that no updates are done while we are in this
+ // method.
+
+ KahaDBStore store = replicationService.getStore();
+ if (lastAckLocation == null) {
+ lastAckLocation = store.getLastUpdatePosition();
+ }
+
+ int snapshotId = nextSnapshotId.incrementAndGet();
+ File file = store.getPageFile().getFile();
+ File dir = replicationService.getTempReplicationDir();
+ dir.mkdirs();
+ snapshotFile = new File(dir, "snapshot-" + snapshotId);
+
+ journalReplicatedFiles = new HashSet<Integer>();
+
+ // Store the list files associated with the snapshot.
+ ArrayList<PBFileInfo> snapshotInfos = new
ArrayList<PBFileInfo>();
+ Map<Integer, DataFile> journalFiles =
store.getJournal().getFileMap();
+ for (DataFile df : journalFiles.values()) {
+ // Look at what the slave has so that only the missing
+ // bits are transfered.
+ String name = "journal-" + df.getDataFileId();
+ PBFileInfo slaveInfo = slaveFiles.remove(name);
+
+ // Use the checksum info to see if the slave has the
+ // file already.. Checksums are less acurrate for
+ // small amounts of data.. so ignore small files.
+ if (slaveInfo != null && slaveInfo.getEnd() > 1024 *
512) {
+ // If the slave's file checksum matches what we
+ // have..
+ if (ReplicationSupport.checksum(df.getFile(), 0,
slaveInfo.getEnd()) == slaveInfo.getChecksum()) {
+ // is Our file longer? then we need to continue
+ // transferring the rest of the file.
+ if (df.getLength() > slaveInfo.getEnd()) {
+
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(),
slaveInfo.getEnd(), df.getLength()));
+
journalReplicatedFiles.add(df.getDataFileId());
+ continue;
+ } else {
+ // No need to replicate this file.
+ continue;
+ }
+ }
+ }
+
+ // If we got here then it means we need to transfer the
+ // whole file.
+ snapshotInfos.add(ReplicationSupport.createInfo(name,
df.getFile(), 0, df.getLength()));
+ journalReplicatedFiles.add(df.getDataFileId());
+ }
+
+ PBFileInfo info = new PBFileInfo();
+ info.setName("database");
+ info.setSnapshotId(snapshotId);
+ info.setStart(0);
+ info.setEnd(file.length());
+ info.setChecksum(ReplicationSupport.copyAndChecksum(file,
snapshotFile));
+ snapshotInfos.add(info);
+
+ rcPayload.setCopyFilesList(snapshotInfos);
+ ArrayList<String> deleteFiles = new ArrayList<String>();
+ slaveFiles.remove("database");
+ for (PBFileInfo unused : slaveFiles.values()) {
+ deleteFiles.add(unused.getName());
+ }
+ rcPayload.setDeleteFilesList(deleteFiles);
+
+ updateJournalReplicatedFiles();
+ }
+
+ });
+
+ transport.oneway(rc);
+ }
+
+ private void onFileTransfer(ReplicationFrame frame, PBFileInfo
fileInfo) throws IOException {
+ File file =
replicationService.getReplicationFile(fileInfo.getName());
+ long payloadSize = fileInfo.getEnd() - fileInfo.getStart();
+
+ if (file.length() < fileInfo.getStart() + payloadSize) {
+ throw new IOException("Requested replication file dose not
have enough data.");
+ }
+
+ ReplicationFrame rc = new ReplicationFrame();
+ rc.setHeader(new
PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
+
+ FileInputStream is = new FileInputStream(file);
+ rc.setPayload(is);
+ try {
+ is.skip(fileInfo.getStart());
+ transport.oneway(rc);
+ } finally {
+ try {
+ is.close();
+ } catch (Throwable e) {
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Looks at all the journal files being currently replicated and informs
the
+ * KahaDB so that it does not delete them while the replication is
occuring.
+ */
+ private void updateJournalReplicatedFiles() {
+ HashSet<Integer> files =
replicationService.getStore().getJournalFilesBeingReplicated();
+ files.clear();
ArrayList<ReplicationSession> sessionsSnapshot;
synchronized (this.sessions) {
// Hurrah for copy on write..
sessionsSnapshot = this.sessions;
}
-
- for (ReplicationSession session : sessionsSnapshot) {
- if( session.journalReplicatedFiles !=null ) {
- files.addAll(session.journalReplicatedFiles);
- }
- }
- }
-
+
+ for (ReplicationSession session : sessionsSnapshot) {
+ if (session.journalReplicatedFiles != null) {
+ files.addAll(session.journalReplicatedFiles);
+ }
+ }
+ }
+
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
Mon Nov 24 09:44:12 2008
@@ -34,7 +34,7 @@
* slave or master facets of the broker.
*
* @author chirino
- * @org.apache.xbean.XBean element="kahadb-replication"
+ * @org.apache.xbean.XBean element="kahadbReplication"
*/
public class ReplicationService implements Service, ClusterListener {
@@ -47,7 +47,7 @@
private File tempReplicationDir;
private String uri;
private ClusterStateManager cluster;
- private boolean asyncReplication=false;
+ private int minimumReplicas=1;
private KahaDBStore store;
@@ -279,12 +279,12 @@
this.cluster = cluster;
}
- public void setAsyncReplication(boolean asyncReplication) {
- this.asyncReplication = asyncReplication;
+ public int getMinimumReplicas() {
+ return minimumReplicas;
}
- public boolean isAsyncReplication() {
- return asyncReplication;
+ public void setMinimumReplicas(int minimumReplicas) {
+ this.minimumReplicas = minimumReplicas;
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/replication/zk/ZooKeeperClusterStateManager.java
Mon Nov 24 09:44:12 2008
@@ -48,7 +48,7 @@
/**
*
* @author chirino
- * @org.apache.xbean.XBean element="zookeeper-cluster"
+ * @org.apache.xbean.XBean element="zookeeperCluster"
*/
public class ZooKeeperClusterStateManager implements ClusterStateManager,
Watcher {
private static final Log LOG =
LogFactory.getLog(ZooKeeperClusterStateManager.class);
Modified:
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
(original)
+++
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/replication/ReplicationTest.java
Mon Nov 24 09:44:12 2008
@@ -49,7 +49,7 @@
StaticClusterStateManager cluster = new
StaticClusterStateManager();
ReplicationService rs1 = new ReplicationService();
- rs1.setAsyncReplication(true);
+ rs1.setMinimumReplicas(0);
rs1.setUri(BROKER1_REPLICATION_ID);
rs1.setCluster(cluster);
rs1.setDirectory(new File("target/replication-test/broker1"));
@@ -57,7 +57,7 @@
rs1.start();
ReplicationService rs2 = new ReplicationService();
- rs2.setAsyncReplication(true);
+ rs2.setMinimumReplicas(0);
rs2.setUri(BROKER2_REPLICATION_ID);
rs2.setCluster(cluster);
rs2.setDirectory(new File("target/replication-test/broker2"));
Modified: activemq/trunk/kahadb/src/test/resources/broker1/ha.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/resources/broker1/ha.xml?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/resources/broker1/ha.xml (original)
+++ activemq/trunk/kahadb/src/test/resources/broker1/ha.xml Mon Nov 24 09:44:12
2008
@@ -27,21 +27,21 @@
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
- <kahadb-replication-broker xmlns="http://activemq.apache.org/schema/kahadb">
+ <kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
<replicationService>
- <kahadb-replication
+ <kahadbReplication
directory="target/kaha-data/broker1"
brokerURI="xbean:broker1/ha-broker.xml"
uri="kdbr://localhost:6001"
- asyncReplication="true">
+ minimumReplicas="0">
<cluster>
- <zookeeper-cluster
uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq"
password=""/>
+ <zookeeperCluster
uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq"
password=""/>
</cluster>
- </kahadb-replication>
+ </kahadbReplication>
</replicationService>
- </kahadb-replication-broker>
+ </kahadbReplicationBroker>
</beans>
<!-- END SNIPPET: example -->
Modified: activemq/trunk/kahadb/src/test/resources/broker2/ha.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/resources/broker2/ha.xml?rev=720234&r1=720233&r2=720234&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/resources/broker2/ha.xml (original)
+++ activemq/trunk/kahadb/src/test/resources/broker2/ha.xml Mon Nov 24 09:44:12
2008
@@ -27,21 +27,21 @@
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
- <kahadb-replication-broker xmlns="http://activemq.apache.org/schema/kahadb">
+ <kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
<replicationService>
- <kahadb-replication
+ <kahadbReplication
directory="target/kaha-data-broker2"
brokerURI="xbean:broker2/ha-broker.xml"
uri="kdbr://localhost:6002"
- asyncReplication="true">
+ minimumReplicas="0">
<cluster>
- <zookeeper-cluster
uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq"
password=""/>
+ <zookeeperCluster
uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq"
password=""/>
</cluster>
- </kahadb-replication>
+ </kahadbReplication>
</replicationService>
- </kahadb-replication-broker>
+ </kahadbReplicationBroker>
</beans>
<!-- END SNIPPET: example -->