abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/805
Change subject: Update the Key Value Reader
......................................................................
Update the Key Value Reader
Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699
---
M asterixdb/asterix-external-data/pom.xml
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
4 files changed, 24 insertions(+), 35 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/05/805/1
diff --git a/asterixdb/asterix-external-data/pom.xml
b/asterixdb/asterix-external-data/pom.xml
index 8c59cc4..3a47ba6 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -287,7 +287,7 @@
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>core-io</artifactId>
- <version>1.2.3</version>
+ <version>1.3.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
index 185aea0..3f5b531 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
@@ -30,10 +30,7 @@
import org.apache.log4j.Logger;
import com.couchbase.client.core.CouchbaseCore;
-import com.couchbase.client.core.dcp.BucketStreamAggregator;
-import com.couchbase.client.core.dcp.BucketStreamAggregatorState;
-import com.couchbase.client.core.dcp.BucketStreamState;
-import com.couchbase.client.core.dcp.BucketStreamStateUpdatedEvent;
+import com.couchbase.client.core.endpoint.dcp.DCPConnection;
import com.couchbase.client.core.env.DefaultCoreEnvironment;
import com.couchbase.client.core.env.DefaultCoreEnvironment.Builder;
import com.couchbase.client.core.message.cluster.CloseBucketRequest;
@@ -41,27 +38,30 @@
import com.couchbase.client.core.message.cluster.SeedNodesRequest;
import com.couchbase.client.core.message.dcp.DCPRequest;
import com.couchbase.client.core.message.dcp.MutationMessage;
+import com.couchbase.client.core.message.dcp.OpenConnectionRequest;
+import com.couchbase.client.core.message.dcp.OpenConnectionResponse;
import com.couchbase.client.core.message.dcp.RemoveMessage;
import com.couchbase.client.core.message.dcp.SnapshotMarkerMessage;
+import rx.Observable;
import rx.functions.Action1;
public class KVReader implements IRecordReader<DCPRequest> {
private static final Logger LOGGER = Logger.getLogger(KVReader.class);
- private static final MutationMessage POISON_PILL = new
MutationMessage((short) 0, null, null, 0, 0L, 0L, 0, 0, 0L,
- null);
+ private static final MutationMessage POISON_PILL = new MutationMessage(0,
(short) 0, null, null, 0, 0L, 0L, 0, 0,
+ 0L, null);
private final String feedName;
private final short[] vbuckets;
private final String bucket;
private final String password;
private final String[] sourceNodes;
private final Builder builder;
- private final BucketStreamAggregator bucketStreamAggregator;
private final CouchbaseCore core;
private final DefaultCoreEnvironment env;
private final GenericRecord<DCPRequest> record;
private final ArrayBlockingQueue<DCPRequest> messages;
+ private final DCPConnection connection;
private AbstractFeedDataFlowController controller;
private Thread pushThread;
private boolean done = false;
@@ -78,7 +78,8 @@
.autoreleaseAfter(KVReaderFactory.AUTO_RELEASE_AFTER_MILLISECONDS);
this.env = builder.build();
this.core = new CouchbaseCore(env);
- this.bucketStreamAggregator = new BucketStreamAggregator(feedName,
core, bucket);
+ connection = core.<OpenConnectionResponse> send(new
OpenConnectionRequest(feedName, bucket)).toBlocking()
+ .single().connection();
this.record = new GenericRecord<>();
connect();
}
@@ -98,36 +99,24 @@
this.pushThread = new Thread(new Runnable() {
@Override
public void run() {
- KVReader.this.run(bucketStreamAggregator);
+ KVReader.this.run(connection);
}
}, feedName);
pushThread.start();
}
- private void run(BucketStreamAggregator bucketStreamAggregator) {
- BucketStreamAggregatorState state = new BucketStreamAggregatorState();
- for (int i = 0; i < vbuckets.length; i++) {
- state.put(new BucketStreamState(vbuckets[i], 0, 0, 0xffffffff, 0,
0xffffffff));
- }
- state.updates().subscribe(new Action1<BucketStreamStateUpdatedEvent>()
{
- @Override
- public void call(BucketStreamStateUpdatedEvent event) {
- if (event.partialUpdate()) {
- } else {
- }
- }
- });
+ private void run(DCPConnection connection) {
try {
- bucketStreamAggregator.feed(state).toBlocking().forEach(new
Action1<DCPRequest>() {
+ for (int i = 0; i < vbuckets.length; i++) {
+ connection.addStream(vbuckets[i]).toBlocking().single();
+ }
+
connection.subject().takeUntil(Observable.never()).toBlocking().forEach(new
Action1<DCPRequest>() {
@Override
public void call(DCPRequest dcpRequest) {
try {
if (dcpRequest instanceof SnapshotMarkerMessage) {
SnapshotMarkerMessage message =
(SnapshotMarkerMessage) dcpRequest;
- BucketStreamState oldState =
state.get(message.partition());
- state.put(new
BucketStreamState(message.partition(), oldState.vbucketUUID(),
- message.endSequenceNumber(),
oldState.endSequenceNumber(),
- message.endSequenceNumber(),
oldState.snapshotEndSequenceNumber()));
+ LOGGER.info("snapshot DCP message received: " +
message);
} else if ((dcpRequest instanceof MutationMessage) ||
(dcpRequest instanceof RemoveMessage)) {
messages.put(dcpRequest);
} else {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
index bc2a980..6c85b20 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReaderFactory.java
@@ -138,8 +138,8 @@
for (int i = 0; i < vbuckets.length; i++) {
vbuckets[i] = listOfAssignedVBuckets.get(i);
}
- return new KVReader(feedName + ":" + nodeName + ":" + partition,
bucket, password, couchbaseNodes,
- vbuckets, ExternalDataUtils.getQueueSize(configuration));
+ return new KVReader(feedName + ":" + nodeName + ":" + partition,
bucket, password, couchbaseNodes, vbuckets,
+ ExternalDataUtils.getQueueSize(configuration));
}
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
index b75f26c..43132b6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVTestReader.java
@@ -132,7 +132,7 @@
if (nextDeleteKey != null) {
final String key = nextDeleteKey;
nextDeleteKey = null;
- return new RemoveMessage(nextDeletePartition, key, cas++,
seq++, 0L, bucket);
+ return new RemoveMessage(0, nextDeletePartition, key, cas++,
seq++, 0L, bucket);
}
}
generateNextDocument();
@@ -141,12 +141,12 @@
final String key = nextUpsertKey;
nextUpsertKey = null;
upsertCounter++;
- return new MutationMessage(nextUpsertPartition, key, byteBuff,
expiration++, seq++, 0, 0, lockTime++,
- cas++, bucket);
+ return new MutationMessage(byteBuff.readableBytes(),
nextUpsertPartition, key, byteBuff, expiration++,
+ seq++, 0, 0, lockTime++, cas++, bucket);
}
}
- return new MutationMessage(assigned.get(counter % assigned.size()),
generateKey(), byteBuff, expiration++,
- seq++, 0, 0, lockTime++, cas++, bucket);
+ return new MutationMessage(byteBuff.readableBytes(),
assigned.get(counter % assigned.size()), generateKey(),
+ byteBuff, expiration++, seq++, 0, 0, lockTime++, cas++,
bucket);
}
private void generateNextDocument() {
--
To view, visit https://asterix-gerrit.ics.uci.edu/805
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I54f8a313c871428b2468b74c0760e8d28e810699
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>