abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/605
Change subject: Support Sending Messages Alongside Frame Data
......................................................................
Support Sending Messages Alongside Frame Data
This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.
Change-Id: Iaa23e9f8a909ddcafc1c3ee95181092eb04ee1ad
---
M
asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M asterix-app/src/test/resources/logging.properties
M
asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M
asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
A
asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
M
asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
15 files changed, 85 insertions(+), 22 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/05/605/1
diff --git
a/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
b/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index cd0d21a..dbb8b70 100644
---
a/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++
b/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -372,7 +372,7 @@
}
case SUBSCRIBE_FEED: {
ILogicalOperator insertOp = new
InsertDeleteOperator(targetDatasource, varRef, varRefsForLoading,
- InsertDeleteOperator.Kind.INSERT, false);
+ InsertDeleteOperator.Kind.INSERT, false, true);
insertOp.getInputs().add(new
MutableObject<ILogicalOperator>(assign));
leafOperator = new SinkOperator();
leafOperator.getInputs().add(new
MutableObject<ILogicalOperator>(insertOp));
diff --git a/asterix-app/src/test/resources/logging.properties
b/asterix-app/src/test/resources/logging.properties
index 1253c5e..2630a24 100644
--- a/asterix-app/src/test/resources/logging.properties
+++ b/asterix-app/src/test/resources/logging.properties
@@ -79,8 +79,8 @@
org.apache.asterix.test.level = INFO
-#org.apache.asterix.level = FINE
-#org.apache.hyracks.algebricks.level = FINE
-#org.apache.hyracks.level = INFO
+org.apache.asterix.level = FINE
+org.apache.hyracks.algebricks.level = FINE
+org.apache.hyracks.level = INFO
org.apache.asterix.test = INFO
org.apache.asterix.installer.test = INFO
diff --git
a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
index 27f4fcb..afd8920 100644
---
a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
+++
b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
@@ -21,7 +21,7 @@
import java.util.Map;
import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -38,7 +38,7 @@
public void configure(Map<String, String> configuration) throws
HyracksDataException;
- public void initialize(IHyracksCommonContext ctx, IFrameWriter
frameWriter) throws HyracksDataException;
+ public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter)
throws HyracksDataException;
public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
index 116ec09..5deaef0 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
@@ -28,7 +28,7 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -69,7 +69,7 @@
}
@Override
- public void initialize(IHyracksCommonContext ctx, IFrameWriter writer)
throws HyracksDataException {
+ public void initialize(IHyracksTaskContext ctx, IFrameWriter writer)
throws HyracksDataException {
this.appender = new FrameTupleAppender();
this.frame = new VSizeFrame(ctx);
appender.reset(frame, true);
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 2a4eaf9..9323e83 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -53,6 +53,7 @@
tupleForwarder.addTuple(tb);
}
} catch (Throwable th) {
+ th.printStackTrace();
hde = new HyracksDataException(th);
}
try {
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index b46a338..e6deef5 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -22,14 +22,16 @@
import org.apache.asterix.common.parse.ITupleForwarder;
import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
public class FeedTupleForwarder implements ITupleForwarder {
@@ -43,10 +45,14 @@
}
@Override
- public void initialize(IHyracksCommonContext ctx, IFrameWriter writer)
throws HyracksDataException {
+ public void initialize(IHyracksTaskContext ctx, IFrameWriter writer)
throws HyracksDataException {
this.frame = new VSizeFrame(ctx);
this.writer = writer;
this.appender = new FrameTupleAppender(frame);
+ // Set null feed message
+ ctx.setObject(MessagingFrameTupleAppender.KEY_LENGTH, new
MutableInt(1));
+ ctx.setObject(MessagingFrameTupleAppender.KEY_MESSAGE, new byte[100]);
+ ctx.setObject(MessagingFrameTupleAppender.KEY_SLOTS, new int[] { 0 });
}
@Override
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
index 36d41b4..eefc8c2 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -42,7 +42,7 @@
}
@Override
- public void initialize(IHyracksCommonContext ctx, IFrameWriter writer)
throws HyracksDataException {
+ public void initialize(IHyracksTaskContext ctx, IFrameWriter writer)
throws HyracksDataException {
this.appender = new FrameTupleAppender();
this.frame = new VSizeFrame(ctx);
this.writer = writer;
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
index 99cc3d1..186ca80 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -24,7 +24,7 @@
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -50,7 +50,7 @@
}
@Override
- public void initialize(IHyracksCommonContext ctx, IFrameWriter writer)
throws HyracksDataException {
+ public void initialize(IHyracksTaskContext ctx, IFrameWriter writer)
throws HyracksDataException {
this.appender = new FrameTupleAppender();
this.frame = new VSizeFrame(ctx);
this.writer = writer;
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
index 967dc3e..8249fa6 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
@@ -27,6 +27,7 @@
import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
/**
* Represents the feed runtime that collects feed tuples from another feed.
@@ -40,14 +41,16 @@
private final ISubscribableRuntime sourceRuntime; // Runtime that
provides the data
private final Map<String, String> feedPolicy; // Policy
associated with the feed
private FeedFrameCollector frameCollector; // Collector that
can be plugged into a frame distributor
+ private final IHyracksTaskContext ctx;
public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId
runtimeId,
FeedRuntimeInputHandler inputSideHandler, IFrameWriter
outputSideWriter, ISubscribableRuntime sourceRuntime,
- Map<String, String> feedPolicy) {
+ Map<String, String> feedPolicy, IHyracksTaskContext ctx) {
super(runtimeId, inputSideHandler, outputSideWriter);
this.connectionId = connectionId;
this.sourceRuntime = sourceRuntime;
this.feedPolicy = feedPolicy;
+ this.ctx = ctx;
}
public State waitTillCollectionOver() throws InterruptedException {
@@ -93,4 +96,7 @@
return frameCollector;
}
+ public IHyracksTaskContext getCtx() {
+ return ctx;
+ }
}
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index fd6fcb3..1801a4f 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -26,16 +26,19 @@
import org.apache.asterix.external.feed.dataflow.FrameDistributor;
import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
public class IngestionRuntime extends SubscribableRuntime {
private final IAdapterRuntimeManager adapterRuntimeManager;
+ private final IHyracksTaskContext ctx;
public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId,
DistributeFeedFrameWriter feedWriter,
- RecordDescriptor recordDesc, IAdapterRuntimeManager
adaptorRuntimeManager) {
+ RecordDescriptor recordDesc, IAdapterRuntimeManager
adaptorRuntimeManager, IHyracksTaskContext ctx) {
super(feedId, runtimeId, null, feedWriter, recordDesc);
this.adapterRuntimeManager = adaptorRuntimeManager;
+ this.ctx = ctx;
}
@Override
@@ -51,6 +54,8 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Subscribed feed collection [" + collectionRuntime +
"] to " + this);
}
+ ctx.setObject("feedId", feedId);
+ collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
}
@Override
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 8916af6..7901f03 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -146,7 +146,7 @@
policyAccessor, false, tupleAccessor, recordDesc, feedManager,
nPartitions);
collectRuntime = new CollectionRuntime(connectionId, runtimeId,
inputSideHandler, outputSideWriter,
- sourceRuntime, feedPolicy);
+ sourceRuntime, feedPolicy, ctx);
feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId,
collectRuntime);
sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
}
@@ -180,7 +180,7 @@
new FrameTupleAccessor(recordDesc), recordDesc, feedManager,
nPartitions);
collectRuntime = new CollectionRuntime(connectionId, runtimeId,
inputSideHandler, wrapper, sourceRuntime,
- feedPolicy);
+ feedPolicy, ctx);
feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId,
collectRuntime);
recordDesc = sourceRuntime.getRecordDescriptor();
sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index b31f2bf..9398fa1 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -28,11 +28,12 @@
import org.apache.asterix.external.api.IAdapterFactory;
import org.apache.asterix.external.api.IAdapterRuntimeManager;
import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
+import org.apache.asterix.external.api.IFeedAdapter;
import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
import org.apache.asterix.external.feed.api.ISubscriberRuntime;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
import org.apache.asterix.external.feed.management.FeedId;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -40,7 +41,6 @@
import org.apache.asterix.external.feed.runtime.CollectionRuntime;
import org.apache.asterix.external.feed.runtime.IngestionRuntime;
import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-import org.apache.asterix.external.api.IFeedAdapter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -103,7 +103,7 @@
SubscribableFeedRuntimeId runtimeId = new
SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
partition);
ingestionRuntime = new IngestionRuntime(feedId, runtimeId,
feedFrameWriter, recordDesc,
- adapterRuntimeManager);
+ adapterRuntimeManager, ctx);
feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
feedFrameWriter.open();
} else {
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 3c4c9ad..f22d029 100644
---
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -33,12 +33,15 @@
import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
import org.apache.asterix.external.feed.runtime.FeedRuntime;
import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
public class FeedMetaStoreNodePushable extends
AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -87,6 +90,8 @@
private final String operandId;
private FeedRuntimeInputHandler inputSideHandler;
+
+ private byte[] message = new
byte[MessagingFrameTupleAppender.MAX_MESSAGE_SIZE];
public FeedMetaStoreNodePushable(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider,
int partition, int nPartitions, IOperatorDescriptor coreOperator,
FeedConnectionId feedConnectionId,
@@ -161,6 +166,7 @@
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
try {
+ processFeedMessage(buffer);
inputSideHandler.nextFrame(buffer);
} catch (Exception e) {
e.printStackTrace();
@@ -168,6 +174,16 @@
}
}
+ private void processFeedMessage(ByteBuffer buffer) {
+ // read the message and reduce the number of tuples
+ fta.reset(buffer);
+ int tc = fta.getTupleCount() - 1;
+ int offset = fta.getTupleStartOffset(tc);
+ int len = fta.getTupleLength(tc);
+ System.arraycopy(buffer.array(), offset, message, 0, len);
+ IntSerDeUtils.putInt(buffer.array(),
FrameHelper.getTupleCountOffset(buffer.capacity()), tc);
+ }
+
@Override
public void fail() throws HyracksDataException {
if (LOGGER.isLoggable(Level.WARNING)) {
diff --git
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
new file mode 100644
index 0000000..7822a0d
--- /dev/null
+++
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+public class FeedMessageUtils {
+ public enum MessageType {
+ NULL,
+ ACK_REQUEST
+ }
+
+ public static final byte NULL_FEED_MESSAGE = 0;
+ public static final byte ACK_REQ_FEED_MESSAGE = 1;
+}
diff --git
a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 5346bf2..c293038 100644
---
a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++
b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -83,7 +83,7 @@
try {
parser.configure(configuration, outputType);
parser.setInputStream(in);
- forwarder.initialize(ctx, writer);
+ forwarder.initialize((IHyracksTaskContext) ctx,
writer);
while (true) {
tb.reset();
if (!parser.parse(tb.getDataOutput())) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/605
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iaa23e9f8a909ddcafc1c3ee95181092eb04ee1ad
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>