abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/758
Change subject: Skip Flush messages in KV reader
......................................................................
Skip Flush messages in KV reader
Change-Id: I0e1a714035551771fe8a583a9bbb4098511ff150
---
M
asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
1 file changed, 19 insertions(+), 5 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/58/758/1
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
index 4e41357..7307727 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/kv/KVReader.java
@@ -19,6 +19,7 @@
package org.apache.asterix.external.input.record.reader.kv;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.asterix.external.api.IDataFlowController;
@@ -92,10 +93,10 @@
}
private void connect() {
- core.send(new SeedNodesRequest(sourceNodes))
- .timeout(KVReaderFactory.TIMEOUT,
KVReaderFactory.TIME_UNIT).toBlocking().single();
- core.send(new OpenBucketRequest(bucket, password))
- .timeout(KVReaderFactory.TIMEOUT,
KVReaderFactory.TIME_UNIT).toBlocking().single();
+ core.send(new
SeedNodesRequest(sourceNodes)).timeout(KVReaderFactory.TIMEOUT,
KVReaderFactory.TIME_UNIT)
+ .toBlocking().single();
+ core.send(new OpenBucketRequest(bucket,
password)).timeout(KVReaderFactory.TIMEOUT, KVReaderFactory.TIME_UNIT)
+ .toBlocking().single();
this.pushThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -126,11 +127,24 @@
if (dcpRequest instanceof SnapshotMarkerMessage) {
SnapshotMarkerMessage message =
(SnapshotMarkerMessage) dcpRequest;
BucketStreamState oldState =
state.get(message.partition());
+ /*
state.put(new
BucketStreamState(message.partition(), oldState.vbucketUUID(),
message.endSequenceNumber(),
oldState.endSequenceNumber(),
message.endSequenceNumber(),
oldState.snapshotEndSequenceNumber()));
+ */
} else if ((dcpRequest instanceof MutationMessage) ||
(dcpRequest instanceof RemoveMessage)) {
- messages.put(dcpRequest);
+ String key = ((MutationMessage) dcpRequest).key();
+ if (key.startsWith("__")) {
+ String content = ((MutationMessage)
dcpRequest).content()
+ .toString(StandardCharsets.UTF_8);
+ if (content.startsWith("__")) {
+ LOGGER.warn("DCP Control Message Received:
" + content);
+ } else {
+ messages.put(dcpRequest);
+ }
+ } else {
+ messages.put(dcpRequest);
+ }
} else {
LOGGER.warn("Unknown type of DCP messages: " +
dcpRequest);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/758
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0e1a714035551771fe8a583a9bbb4098511ff150
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>