abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/605

Change subject: Support Sending Messages Alongside Frame Data
......................................................................

Support Sending Messages Alongside Frame Data

This change support sending messages with records. The tuple Appender
reserves 100 bytes for a message. Before sending the frame, it appends
The message in the last tuple position. The message is read from the
task context as the shared object between different operators in the
pipeline. The first use of this feature will be within feeds to request
acks for at least once semantics.

Change-Id: Iaa23e9f8a909ddcafc1c3ee95181092eb04ee1ad
---
M 
asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M asterix-app/src/test/resources/logging.properties
M 
asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M 
asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
A 
asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
M 
asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
15 files changed, 85 insertions(+), 22 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/05/605/1

diff --git 
a/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
 
b/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index cd0d21a..dbb8b70 100644
--- 
a/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ 
b/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -372,7 +372,7 @@
                 }
                 case SUBSCRIBE_FEED: {
                     ILogicalOperator insertOp = new 
InsertDeleteOperator(targetDatasource, varRef, varRefsForLoading,
-                            InsertDeleteOperator.Kind.INSERT, false);
+                            InsertDeleteOperator.Kind.INSERT, false, true);
                     insertOp.getInputs().add(new 
MutableObject<ILogicalOperator>(assign));
                     leafOperator = new SinkOperator();
                     leafOperator.getInputs().add(new 
MutableObject<ILogicalOperator>(insertOp));
diff --git a/asterix-app/src/test/resources/logging.properties 
b/asterix-app/src/test/resources/logging.properties
index 1253c5e..2630a24 100644
--- a/asterix-app/src/test/resources/logging.properties
+++ b/asterix-app/src/test/resources/logging.properties
@@ -79,8 +79,8 @@
 
 
 org.apache.asterix.test.level = INFO
-#org.apache.asterix.level = FINE
-#org.apache.hyracks.algebricks.level = FINE
-#org.apache.hyracks.level = INFO
+org.apache.asterix.level = FINE
+org.apache.hyracks.algebricks.level = FINE
+org.apache.hyracks.level = INFO
 org.apache.asterix.test = INFO
 org.apache.asterix.installer.test = INFO
diff --git 
a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
 
b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
index 27f4fcb..afd8920 100644
--- 
a/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
+++ 
b/asterix-common/src/main/java/org/apache/asterix/common/parse/ITupleForwarder.java
@@ -21,7 +21,7 @@
 import java.util.Map;
 
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 
@@ -38,7 +38,7 @@
 
     public void configure(Map<String, String> configuration) throws 
HyracksDataException;
 
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter 
frameWriter) throws HyracksDataException;
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter frameWriter) 
throws HyracksDataException;
 
     public void addTuple(ArrayTupleBuilder tb) throws HyracksDataException;
 
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
index 116ec09..5deaef0 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/CounterTimerTupleForwarder.java
@@ -28,7 +28,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -69,7 +69,7 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) 
throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) 
throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         appender.reset(frame, true);
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 2a4eaf9..9323e83 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -53,6 +53,7 @@
                 tupleForwarder.addTuple(tb);
             }
         } catch (Throwable th) {
+            th.printStackTrace();
             hde = new HyracksDataException(th);
         }
         try {
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index b46a338..e6deef5 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -22,14 +22,16 @@
 
 import org.apache.asterix.common.parse.ITupleForwarder;
 import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 
 public class FeedTupleForwarder implements ITupleForwarder {
 
@@ -43,10 +45,14 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) 
throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) 
throws HyracksDataException {
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;
         this.appender = new FrameTupleAppender(frame);
+        // Set null feed message
+        ctx.setObject(MessagingFrameTupleAppender.KEY_LENGTH, new 
MutableInt(1));
+        ctx.setObject(MessagingFrameTupleAppender.KEY_MESSAGE, new byte[100]);
+        ctx.setObject(MessagingFrameTupleAppender.KEY_SLOTS, new int[] { 0 });
     }
 
     @Override
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
index 36d41b4..eefc8c2 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FrameFullTupleForwarder.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -42,7 +42,7 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) 
throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) 
throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
index 99cc3d1..186ca80 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/RateControlledTupleForwarder.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksCommonContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
@@ -50,7 +50,7 @@
     }
 
     @Override
-    public void initialize(IHyracksCommonContext ctx, IFrameWriter writer) 
throws HyracksDataException {
+    public void initialize(IHyracksTaskContext ctx, IFrameWriter writer) 
throws HyracksDataException {
         this.appender = new FrameTupleAppender();
         this.frame = new VSizeFrame(ctx);
         this.writer = writer;
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
index 967dc3e..8249fa6 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/CollectionRuntime.java
@@ -27,6 +27,7 @@
 import org.apache.asterix.external.feed.dataflow.FeedRuntimeInputHandler;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 /**
  * Represents the feed runtime that collects feed tuples from another feed.
@@ -40,14 +41,16 @@
     private final ISubscribableRuntime sourceRuntime;       // Runtime that 
provides the data
     private final Map<String, String> feedPolicy;           // Policy 
associated with the feed
     private FeedFrameCollector frameCollector;              // Collector that 
can be plugged into a frame distributor
+    private final IHyracksTaskContext ctx;
 
     public CollectionRuntime(FeedConnectionId connectionId, FeedRuntimeId 
runtimeId,
             FeedRuntimeInputHandler inputSideHandler, IFrameWriter 
outputSideWriter, ISubscribableRuntime sourceRuntime,
-            Map<String, String> feedPolicy) {
+            Map<String, String> feedPolicy, IHyracksTaskContext ctx) {
         super(runtimeId, inputSideHandler, outputSideWriter);
         this.connectionId = connectionId;
         this.sourceRuntime = sourceRuntime;
         this.feedPolicy = feedPolicy;
+        this.ctx = ctx;
     }
 
     public State waitTillCollectionOver() throws InterruptedException {
@@ -93,4 +96,7 @@
         return frameCollector;
     }
 
+    public IHyracksTaskContext getCtx() {
+        return ctx;
+    }
 }
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index fd6fcb3..1801a4f 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -26,16 +26,19 @@
 import org.apache.asterix.external.feed.dataflow.FrameDistributor;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 
 public class IngestionRuntime extends SubscribableRuntime {
 
     private final IAdapterRuntimeManager adapterRuntimeManager;
+    private final IHyracksTaskContext ctx;
 
     public IngestionRuntime(FeedId feedId, FeedRuntimeId runtimeId, 
DistributeFeedFrameWriter feedWriter,
-            RecordDescriptor recordDesc, IAdapterRuntimeManager 
adaptorRuntimeManager) {
+            RecordDescriptor recordDesc, IAdapterRuntimeManager 
adaptorRuntimeManager, IHyracksTaskContext ctx) {
         super(feedId, runtimeId, null, feedWriter, recordDesc);
         this.adapterRuntimeManager = adaptorRuntimeManager;
+        this.ctx = ctx;
     }
 
     @Override
@@ -51,6 +54,8 @@
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Subscribed feed collection [" + collectionRuntime + 
"] to " + this);
         }
+        ctx.setObject("feedId", feedId);
+        collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
     }
 
     @Override
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
index 8916af6..7901f03 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorNodePushable.java
@@ -146,7 +146,7 @@
                 policyAccessor, false, tupleAccessor, recordDesc, feedManager, 
nPartitions);
 
         collectRuntime = new CollectionRuntime(connectionId, runtimeId, 
inputSideHandler, outputSideWriter,
-                sourceRuntime, feedPolicy);
+                sourceRuntime, feedPolicy, ctx);
         
feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, 
collectRuntime);
         sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
     }
@@ -180,7 +180,7 @@
                 new FrameTupleAccessor(recordDesc), recordDesc, feedManager, 
nPartitions);
 
         collectRuntime = new CollectionRuntime(connectionId, runtimeId, 
inputSideHandler, wrapper, sourceRuntime,
-                feedPolicy);
+                feedPolicy, ctx);
         
feedManager.getFeedConnectionManager().registerFeedRuntime(connectionId, 
collectRuntime);
         recordDesc = sourceRuntime.getRecordDescriptor();
         sourceRuntime.subscribeFeed(policyAccessor, collectRuntime);
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index b31f2bf..9398fa1 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -28,11 +28,12 @@
 import org.apache.asterix.external.api.IAdapterFactory;
 import org.apache.asterix.external.api.IAdapterRuntimeManager;
 import org.apache.asterix.external.api.IAdapterRuntimeManager.State;
+import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.asterix.external.feed.api.IFeedManager;
+import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.api.IFeedSubscriptionManager;
 import org.apache.asterix.external.feed.api.IIntakeProgressTracker;
 import org.apache.asterix.external.feed.api.ISubscriberRuntime;
-import org.apache.asterix.external.feed.api.IFeedRuntime.FeedRuntimeType;
 import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter;
 import org.apache.asterix.external.feed.management.FeedId;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
@@ -40,7 +41,6 @@
 import org.apache.asterix.external.feed.runtime.CollectionRuntime;
 import org.apache.asterix.external.feed.runtime.IngestionRuntime;
 import org.apache.asterix.external.feed.runtime.SubscribableFeedRuntimeId;
-import org.apache.asterix.external.api.IFeedAdapter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
@@ -103,7 +103,7 @@
                 SubscribableFeedRuntimeId runtimeId = new 
SubscribableFeedRuntimeId(feedId, FeedRuntimeType.INTAKE,
                         partition);
                 ingestionRuntime = new IngestionRuntime(feedId, runtimeId, 
feedFrameWriter, recordDesc,
-                        adapterRuntimeManager);
+                        adapterRuntimeManager, ctx);
                 
feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime);
                 feedFrameWriter.open();
             } else {
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 3c4c9ad..f22d029 100644
--- 
a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -33,12 +33,15 @@
 import org.apache.asterix.external.feed.policy.FeedPolicyEnforcer;
 import org.apache.asterix.external.feed.runtime.FeedRuntime;
 import org.apache.asterix.external.feed.runtime.FeedRuntimeId;
+import org.apache.hyracks.api.comm.FrameHelper;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IActivity;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.IntSerDeUtils;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
 
 public class FeedMetaStoreNodePushable extends 
AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -87,6 +90,8 @@
     private final String operandId;
 
     private FeedRuntimeInputHandler inputSideHandler;
+
+    private byte[] message = new 
byte[MessagingFrameTupleAppender.MAX_MESSAGE_SIZE];
 
     public FeedMetaStoreNodePushable(IHyracksTaskContext ctx, 
IRecordDescriptorProvider recordDescProvider,
             int partition, int nPartitions, IOperatorDescriptor coreOperator, 
FeedConnectionId feedConnectionId,
@@ -161,6 +166,7 @@
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
+            processFeedMessage(buffer);
             inputSideHandler.nextFrame(buffer);
         } catch (Exception e) {
             e.printStackTrace();
@@ -168,6 +174,16 @@
         }
     }
 
+    private void processFeedMessage(ByteBuffer buffer) {
+        // read the message and reduce the number of tuples
+        fta.reset(buffer);
+        int tc = fta.getTupleCount() - 1;
+        int offset = fta.getTupleStartOffset(tc);
+        int len = fta.getTupleLength(tc);
+        System.arraycopy(buffer.array(), offset, message, 0, len);
+        IntSerDeUtils.putInt(buffer.array(), 
FrameHelper.getTupleCountOffset(buffer.capacity()), tc);
+    }
+
     @Override
     public void fail() throws HyracksDataException {
         if (LOGGER.isLoggable(Level.WARNING)) {
diff --git 
a/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
 
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
new file mode 100644
index 0000000..7822a0d
--- /dev/null
+++ 
b/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedMessageUtils.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util;
+
+public class FeedMessageUtils {
+    public enum MessageType {
+        NULL,
+        ACK_REQUEST
+    }
+
+    public static final byte NULL_FEED_MESSAGE = 0;
+    public static final byte ACK_REQ_FEED_MESSAGE = 1;
+}
diff --git 
a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
 
b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index 5346bf2..c293038 100644
--- 
a/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ 
b/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -83,7 +83,7 @@
                         try {
                             parser.configure(configuration, outputType);
                             parser.setInputStream(in);
-                            forwarder.initialize(ctx, writer);
+                            forwarder.initialize((IHyracksTaskContext) ctx, 
writer);
                             while (true) {
                                 tb.reset();
                                 if (!parser.parse(tb.getDataOutput())) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/605
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iaa23e9f8a909ddcafc1c3ee95181092eb04ee1ad
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to