abdullah alamoudi has submitted this change and it was merged. Change subject: Fixed Feed Connect Statement ......................................................................
Fixed Feed Connect Statement This change includes two fixes: 1. Feed connect doesn't return until the connection is complete. 2. When using wait for completion, it waits until all the jobs complete. Change-Id: I416bf4917b1f5cea687d1202c435f7183136cf1f Reviewed-on: https://asterix-gerrit.ics.uci.edu/726 Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java M asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java M asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java M asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java R asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.query.aql D asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.sleep.aql M asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql M asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql R asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.ddl.aql D asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.query.aql D asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.7.ddl.aql M asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql R asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.ddl.aql D asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql D asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql R asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.server.aql D asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql R asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.sleep.aql R asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.update.aql R asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.query.aql R asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.server.aql R asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.ddl.aql C asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.query.aql D asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.sleep.aql M asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql C asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.ddl.aql M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java A asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java A asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java M asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java M asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java 31 files changed, 277 insertions(+), 291 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Jenkins: Verified diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java index d729680..04f20fb 100644 --- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java +++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedJobNotificationHandler.java @@ -32,15 +32,15 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.commons.lang3.StringUtils; -import org.apache.asterix.app.external.FeedLifecycleListener.Message; +import org.apache.asterix.app.external.FeedLifecycleListener.FeedEvent; import org.apache.asterix.app.external.FeedWorkCollection.SubscribeFeedWork; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.external.feed.api.FeedOperationCounter; import org.apache.asterix.external.feed.api.IFeedJoint; -import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber; -import org.apache.asterix.external.feed.api.IIntakeProgressTracker; import org.apache.asterix.external.feed.api.IFeedJoint.State; +import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber; import org.apache.asterix.external.feed.api.IFeedLifecycleEventSubscriber.FeedLifecycleEvent; +import org.apache.asterix.external.feed.api.IIntakeProgressTracker; import org.apache.asterix.external.feed.management.FeedConnectionId; import org.apache.asterix.external.feed.management.FeedConnectionRequest; import org.apache.asterix.external.feed.management.FeedId; @@ -59,6 +59,7 @@ import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor; import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies; import org.apache.asterix.om.util.AsterixAppContextInfo; +import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor; @@ -78,37 +79,43 @@ private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName()); - private final LinkedBlockingQueue<Message> inbox; + private final LinkedBlockingQueue<FeedEvent> inbox; private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> eventSubscribers; private final Map<JobId, FeedJobInfo> jobInfos; private final Map<FeedId, FeedIntakeInfo> intakeJobInfos; private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos; - private final Map<FeedId, List<IFeedJoint>> feedPipeline; + private final Map<FeedId, Pair<FeedOperationCounter, List<IFeedJoint>>> feedPipeline; private final Map<FeedConnectionId, Pair<IIntakeProgressTracker, Long>> feedIntakeProgressTrackers; - public FeedJobNotificationHandler(LinkedBlockingQueue<Message> inbox) { + public FeedJobNotificationHandler(LinkedBlockingQueue<FeedEvent> inbox) { this.inbox = inbox; this.jobInfos = new HashMap<JobId, FeedJobInfo>(); this.intakeJobInfos = new HashMap<FeedId, FeedIntakeInfo>(); this.connectJobInfos = new HashMap<FeedConnectionId, FeedConnectJobInfo>(); - this.feedPipeline = new HashMap<FeedId, List<IFeedJoint>>(); + this.feedPipeline = new HashMap<FeedId, Pair<FeedOperationCounter, List<IFeedJoint>>>(); this.eventSubscribers = new HashMap<FeedConnectionId, List<IFeedLifecycleEventSubscriber>>(); this.feedIntakeProgressTrackers = new HashMap<FeedConnectionId, Pair<IIntakeProgressTracker, Long>>(); } @Override public void run() { - Message mesg; + FeedEvent event; while (true) { try { - mesg = inbox.take(); - switch (mesg.messageKind) { + event = inbox.take(); + switch (event.eventKind) { case JOB_START: - handleJobStartMessage(mesg); + handleJobStartEvent(event); break; case JOB_FINISH: - handleJobFinishMessage(mesg); + handleJobFinishEvent(event); + break; + case PROVIDER_READY: + handleProviderReady(event); + break; + default: + LOGGER.log(Level.WARNING, "Unknown Feed Event"); break; } } catch (Exception e) { @@ -121,11 +128,11 @@ public void registerFeedIntakeProgressTracker(FeedConnectionId connectionId, IIntakeProgressTracker feedIntakeProgressTracker) { if (feedIntakeProgressTrackers.get(connectionId) == null) { - this.feedIntakeProgressTrackers.put(connectionId, new Pair<IIntakeProgressTracker, Long>( - feedIntakeProgressTracker, 0L)); + this.feedIntakeProgressTrackers.put(connectionId, + new Pair<IIntakeProgressTracker, Long>(feedIntakeProgressTracker, 0L)); } else { - throw new IllegalStateException(" Progress tracker for connection " + connectionId - + " is alreader registered"); + throw new IllegalStateException( + " Progress tracker for connection " + connectionId + " is alreader registered"); } } @@ -149,29 +156,35 @@ return connectJobInfos.values(); } - public void registerFeedJoint(IFeedJoint feedJoint) { - List<IFeedJoint> feedJointsOnPipeline = feedPipeline.get(feedJoint.getOwnerFeedId()); + public synchronized void registerFeedJoint(IFeedJoint feedJoint, int numOfPrividers) { + Pair<FeedOperationCounter, List<IFeedJoint>> feedJointsOnPipeline = feedPipeline + .get(feedJoint.getOwnerFeedId()); + if (feedJointsOnPipeline == null) { - feedJointsOnPipeline = new ArrayList<IFeedJoint>(); + feedJointsOnPipeline = new Pair<FeedOperationCounter, List<IFeedJoint>>( + new FeedOperationCounter(numOfPrividers, 1), new ArrayList<IFeedJoint>()); feedPipeline.put(feedJoint.getOwnerFeedId(), feedJointsOnPipeline); - feedJointsOnPipeline.add(feedJoint); + feedJointsOnPipeline.second.add(feedJoint); } else { - if (!feedJointsOnPipeline.contains(feedJoint)) { - feedJointsOnPipeline.add(feedJoint); + if (!feedJointsOnPipeline.second.contains(feedJoint)) { + feedJointsOnPipeline.first.setJobsCount(feedJointsOnPipeline.first.getJobsCount() + 1); + feedJointsOnPipeline.second.add(feedJoint); } else { throw new IllegalArgumentException("Feed joint " + feedJoint + " already registered"); } } } - public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec) throws HyracksDataException { + public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpec) + throws HyracksDataException { if (jobInfos.get(jobId) != null) { throw new IllegalStateException("Feed job already registered"); } - List<IFeedJoint> joints = feedPipeline.get(feedId); + Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.containsKey(feedId) ? feedPipeline.get(feedId) + : null; IFeedJoint intakeJoint = null; - for (IFeedJoint joint : joints) { + for (IFeedJoint joint : pair.second) { if (joint.getType().equals(IFeedJoint.FeedJointType.INTAKE)) { intakeJoint = joint; break; @@ -181,6 +194,7 @@ if (intakeJoint != null) { FeedIntakeInfo intakeJobInfo = new FeedIntakeInfo(jobId, FeedJobState.CREATED, FeedJobInfo.JobType.INTAKE, feedId, intakeJoint, jobSpec); + pair.first.setFeedJobInfo(intakeJobInfo); intakeJobInfos.put(feedId, intakeJobInfo); jobInfos.put(jobId, intakeJobInfo); @@ -188,8 +202,8 @@ LOGGER.info("Registered feed intake [" + jobId + "]" + " for feed " + feedId); } } else { - throw new HyracksDataException("Could not register feed intake job [" + jobId + "]" + " for feed " - + feedId); + throw new HyracksDataException( + "Could not register feed intake job [" + jobId + "]" + " for feed " + feedId); } } @@ -199,7 +213,7 @@ throw new IllegalStateException("Feed job already registered"); } - List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId); + List<IFeedJoint> feedJoints = feedPipeline.get(sourceFeedId).second; FeedConnectionId cid = null; IFeedJoint sourceFeedJoint = null; for (IFeedJoint joint : feedJoints) { @@ -238,7 +252,7 @@ intakeJobInfos.remove(info.getFeedId()); if (!info.getState().equals(FeedJobState.UNDER_RECOVERY)) { - List<IFeedJoint> joints = feedPipeline.get(info.getFeedId()); + List<IFeedJoint> joints = feedPipeline.get(info.getFeedId()).second; joints.remove(info.getIntakeFeedJoint()); if (LOGGER.isLoggable(Level.INFO)) { @@ -252,7 +266,7 @@ } - private void handleJobStartMessage(Message message) throws Exception { + private void handleJobStartEvent(FeedEvent message) throws Exception { FeedJobInfo jobInfo = jobInfos.get(message.jobId); switch (jobInfo.getJobType()) { case INTAKE: @@ -265,7 +279,7 @@ } - private void handleJobFinishMessage(Message message) throws Exception { + private void handleJobFinishEvent(FeedEvent message) throws Exception { FeedJobInfo jobInfo = jobInfos.get(message.jobId); switch (jobInfo.getJobType()) { case INTAKE: @@ -276,12 +290,22 @@ break; case FEED_CONNECT: if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Collect Job finished for " + (FeedConnectJobInfo) jobInfo); + LOGGER.info("Collect Job finished for " + jobInfo); } handleFeedCollectJobFinishMessage((FeedConnectJobInfo) jobInfo); break; } + } + private void handleProviderReady(FeedEvent message) { + FeedIntakeInfo jobInfo = (FeedIntakeInfo) jobInfos.get(message.jobId); + Pair<FeedOperationCounter, List<IFeedJoint>> feedCounter = feedPipeline.get(message.feedId); + feedCounter.first.setProvidersCount(feedCounter.first.getProvidersCount() - 1);; + if (feedCounter.first.getProvidersCount() == 0) { + jobInfo.getIntakeFeedJoint().setState(State.ACTIVE); + jobInfo.setState(FeedJobState.ACTIVE); + notifyFeedEventSubscribers(jobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED); + } } private synchronized void handleIntakeJobStartMessage(FeedIntakeInfo intakeJobInfo) throws Exception { @@ -306,19 +330,16 @@ } // intakeLocations is an ordered list; element at position i corresponds to location of i'th instance of operator intakeJobInfo.setIntakeLocation(intakeLocations); - intakeJobInfo.getIntakeFeedJoint().setState(State.ACTIVE); - intakeJobInfo.setState(FeedJobState.ACTIVE); - - // notify event listeners - notifyFeedEventSubscribers(intakeJobInfo, FeedLifecycleEvent.FEED_INTAKE_STARTED); } private void handleCollectJobStartMessage(FeedConnectJobInfo cInfo) throws RemoteException, ACIDException { // set locations of feed sub-operations (intake, compute, store) setLocations(cInfo); + Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(cInfo.getConnectionId().getFeedId()); + pair.first.setJobsCount(pair.first.getJobsCount() + 1); // activate joints - List<IFeedJoint> joints = feedPipeline.get(cInfo.getConnectionId().getFeedId()); + List<IFeedJoint> joints = pair.second; for (IFeedJoint joint : joints) { if (joint.getProvider().equals(cInfo.getConnectionId())) { joint.setState(State.ACTIVE); @@ -328,7 +349,6 @@ } } cInfo.setState(FeedJobState.ACTIVE); - // register activity in metadata registerFeedActivity(cInfo); // notify event listeners @@ -413,20 +433,25 @@ return connectJobInfos.get(connectionId).getState(); } - private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, Message message) throws Exception { + private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo intakeInfo, FeedEvent message) throws Exception { IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc(); JobInfo info = hcc.getJobInfo(message.jobId); JobStatus status = info.getStatus(); - FeedLifecycleEvent event; - event = status.equals(JobStatus.FAILURE) ? FeedLifecycleEvent.FEED_INTAKE_FAILURE - : FeedLifecycleEvent.FEED_ENDED; - + FeedId feedId = intakeInfo.getFeedId(); + Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline.get(feedId); + pair.first.setJobsCount(pair.first.getJobsCount() - 1); + if (status.equals(JobStatus.FAILURE)) { + pair.first.setFailedIngestion(true); + } // remove feed joints deregisterFeedIntakeJob(message.jobId); // notify event listeners - notifyFeedEventSubscribers(intakeInfo, event); - + if (pair.first.getJobsCount() == 0) { + feedPipeline.remove(feedId); + notifyFeedEventSubscribers(intakeInfo, pair.first.isFailedIngestion() + ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : FeedLifecycleEvent.FEED_ENDED); + } } private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo cInfo) throws Exception { @@ -446,7 +471,8 @@ IFeedJoint feedJoint = cInfo.getSourceFeedJoint(); feedJoint.removeReceiver(connectionId); if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription"); + LOGGER.info( + "Subscription " + cInfo.getConnectionId() + " completed successfully. Removed subscription"); } removeFeedJointsPostPipelineTermination(cInfo.getConnectionId()); } @@ -457,6 +483,15 @@ feedIntakeProgressTrackers.remove(cInfo.getConnectionId()); } deregisterFeedActivity(cInfo); + + Pair<FeedOperationCounter, List<IFeedJoint>> pair = feedPipeline + .get(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId()); + pair.first.setJobsCount(pair.first.getJobsCount() - 1); + if (pair.first.getJobsCount() == 0) { + notifyFeedEventSubscribers(pair.first.getFeedJobInfo(), pair.first.isFailedIngestion() + ? FeedLifecycleEvent.FEED_INTAKE_FAILURE : FeedLifecycleEvent.FEED_ENDED); + feedPipeline.remove(cInfo.getSourceFeedJoint().getFeedJointKey().getFeedId()); + } // notify event listeners FeedLifecycleEvent event = failure ? FeedLifecycleEvent.FEED_COLLECT_FAILURE : FeedLifecycleEvent.FEED_ENDED; @@ -486,11 +521,11 @@ feedActivityDetails.put(FeedActivity.FeedActivityDetails.FEED_CONNECT_TIMESTAMP, (new Date()).toString()); try { - FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(), cInfo - .getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(), + FeedActivity feedActivity = new FeedActivity(cInfo.getConnectionId().getFeedId().getDataverse(), + cInfo.getConnectionId().getFeedId().getFeedName(), cInfo.getConnectionId().getDatasetName(), feedActivityDetails); - CentralFeedManager.getInstance().getFeedLoadManager() - .reportFeedActivity(cInfo.getConnectionId(), feedActivity); + CentralFeedManager.getInstance().getFeedLoadManager().reportFeedActivity(cInfo.getConnectionId(), + feedActivity); } catch (Exception e) { e.printStackTrace(); @@ -514,7 +549,7 @@ public void removeFeedJointsPostPipelineTermination(FeedConnectionId connectionId) { FeedConnectJobInfo cInfo = connectJobInfos.get(connectionId); - List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId()); + List<IFeedJoint> feedJoints = feedPipeline.get(connectionId.getFeedId()).second; IFeedJoint sourceJoint = cInfo.getSourceFeedJoint(); List<FeedConnectionId> all = sourceJoint.getReceivers(); @@ -534,7 +569,7 @@ } public List<String> getFeedComputeLocations(FeedId feedId) { - List<IFeedJoint> feedJoints = feedPipeline.get(feedId); + List<IFeedJoint> feedJoints = feedPipeline.get(feedId).second; for (IFeedJoint joint : feedJoints) { if (joint.getFeedJointKey().getFeedId().equals(feedId)) { return connectJobInfos.get(joint.getProvider()).getComputeLocations(); @@ -578,7 +613,8 @@ //============================ public boolean isFeedPointAvailable(FeedJointKey feedJointKey) { - List<IFeedJoint> joints = feedPipeline.get(feedJointKey.getFeedId()); + List<IFeedJoint> joints = feedPipeline.containsKey(feedJointKey.getFeedId()) + ? feedPipeline.get(feedJointKey.getFeedId()).second : null; if (joints != null && !joints.isEmpty()) { for (IFeedJoint joint : joints) { if (joint.getFeedJointKey().equals(feedJointKey)) { @@ -598,7 +634,8 @@ } public IFeedJoint getFeedJoint(FeedJointKey feedPointKey) { - List<IFeedJoint> joints = feedPipeline.get(feedPointKey.getFeedId()); + List<IFeedJoint> joints = feedPipeline.containsKey(feedPointKey.getFeedId()) + ? feedPipeline.get(feedPointKey.getFeedId()).second : null; if (joints != null && !joints.isEmpty()) { for (IFeedJoint joint : joints) { if (joint.getFeedJointKey().equals(feedPointKey)) { @@ -615,7 +652,8 @@ return feedJoint; } else { String jointKeyString = feedJointKey.getStringRep(); - List<IFeedJoint> jointsOnPipeline = feedPipeline.get(feedJointKey.getFeedId()); + List<IFeedJoint> jointsOnPipeline = feedPipeline.containsKey(feedJointKey.getFeedId()) + ? feedPipeline.get(feedJointKey.getFeedId()).second : null; IFeedJoint candidateJoint = null; if (jointsOnPipeline != null) { for (IFeedJoint joint : jointsOnPipeline) { @@ -638,7 +676,7 @@ } public IFeedJoint getFeedPoint(FeedId sourceFeedId, IFeedJoint.FeedJointType type) { - List<IFeedJoint> joints = feedPipeline.get(sourceFeedId); + List<IFeedJoint> joints = feedPipeline.get(sourceFeedId).second; for (IFeedJoint joint : joints) { if (joint.getType().equals(type)) { return joint; diff --git a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java index 8e44af4..d7129b8 100644 --- a/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java +++ b/asterix-app/src/main/java/org/apache/asterix/app/external/FeedLifecycleListener.java @@ -88,7 +88,7 @@ public static FeedLifecycleListener INSTANCE = new FeedLifecycleListener(); private static final ILangCompilationProvider compilationProvider = new AqlCompilationProvider(); - private final LinkedBlockingQueue<Message> jobEventInbox; + private final LinkedBlockingQueue<FeedEvent> jobEventInbox; private final LinkedBlockingQueue<IClusterManagementWorkResponse> responseInbox; private final Map<FeedCollectInfo, List<String>> dependentFeeds = new HashMap<FeedCollectInfo, List<String>>(); private final Map<FeedConnectionId, LinkedBlockingQueue<String>> feedReportQueue; @@ -99,7 +99,7 @@ private ClusterState state; private FeedLifecycleListener() { - this.jobEventInbox = new LinkedBlockingQueue<Message>(); + this.jobEventInbox = new LinkedBlockingQueue<FeedEvent>(); this.feedJobNotificationHandler = new FeedJobNotificationHandler(jobEventInbox); this.responseInbox = new LinkedBlockingQueue<IClusterManagementWorkResponse>(); this.feedWorkRequestResponseHandler = new FeedWorkRequestResponseHandler(responseInbox); @@ -114,14 +114,14 @@ @Override public void notifyJobStart(JobId jobId) throws HyracksException { if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) { - jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_START)); + jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.JOB_START)); } } @Override public void notifyJobFinish(JobId jobId) throws HyracksException { if (feedJobNotificationHandler.isRegisteredFeedJob(jobId)) { - jobEventInbox.add(new Message(jobId, Message.MessageKind.JOB_FINISH)); + jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.JOB_FINISH)); } else { if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("NO NEED TO NOTIFY JOB FINISH!"); @@ -178,19 +178,26 @@ return feedJobNotificationHandler.getFeedJobState(connectionId); } - public static class Message { + public static class FeedEvent { public JobId jobId; + public FeedId feedId; - public enum MessageKind { + public enum EventKind { JOB_START, - JOB_FINISH + JOB_FINISH, + PROVIDER_READY } - public MessageKind messageKind; + public EventKind eventKind; - public Message(JobId jobId, MessageKind msgKind) { + public FeedEvent(JobId jobId, EventKind eventKind) { + this(jobId, eventKind, null); + } + + public FeedEvent(JobId jobId, EventKind eventKind, FeedId feedId) { this.jobId = jobId; - this.messageKind = msgKind; + this.eventKind = eventKind; + this.feedId = feedId; } } @@ -469,8 +476,8 @@ return feedJobNotificationHandler.isFeedPointAvailable(feedJointKey); } - public void registerFeedJoint(IFeedJoint feedJoint) { - feedJobNotificationHandler.registerFeedJoint(feedJoint); + public void registerFeedJoint(IFeedJoint feedJoint, int numOfPrividers) { + feedJobNotificationHandler.registerFeedJoint(feedJoint, numOfPrividers); } public IFeedJoint getFeedJoint(FeedJointKey feedJointKey) { @@ -496,4 +503,8 @@ return feedJobNotificationHandler.getFeedCollectJobId(connectionId); } + public void notifyProviderReady(FeedId feedId, JobId jobId) { + jobEventInbox.add(new FeedEvent(jobId, FeedEvent.EventKind.PROVIDER_READY, feedId)); + } + } diff --git a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java index 213b090..67bf3bb 100644 --- a/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java +++ b/asterix-app/src/main/java/org/apache/asterix/aql/translator/QueryTranslator.java @@ -26,7 +26,6 @@ import java.io.InputStreamReader; import java.rmi.RemoteException; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Iterator; @@ -2205,8 +2204,9 @@ metadataProvider, policyAccessor); //adapter configuration are valid at this stage //register the feed joints (these are auto-de-registered) + int numOfPrividers = pair.second.getPartitionConstraint().getLocations().length; for (IFeedJoint fj : triple.third) { - FeedLifecycleListener.INSTANCE.registerFeedJoint(fj); + FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, numOfPrividers); } JobUtils.runJob(hcc, pair.first, false); /* @@ -2220,7 +2220,7 @@ eventSubscriber.assertEvent(FeedLifecycleEvent.FEED_INTAKE_STARTED); } else { for (IFeedJoint fj : triple.third) { - FeedLifecycleListener.INSTANCE.registerFeedJoint(fj); + FeedLifecycleListener.INSTANCE.registerFeedJoint(fj, 0); } } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java index 258bc35..2378032 100644 --- a/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java +++ b/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java @@ -25,9 +25,10 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.asterix.app.external.FeedLifecycleListener; import org.apache.asterix.common.messaging.AbstractApplicationMessage; -import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage; import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage; +import org.apache.asterix.common.messaging.PreparePartitionsFailbackResponseMessage; import org.apache.asterix.common.messaging.ReportMaxResourceIdMessage; import org.apache.asterix.common.messaging.ReportMaxResourceIdRequestMessage; import org.apache.asterix.common.messaging.ResourceIdRequestMessage; @@ -36,6 +37,7 @@ import org.apache.asterix.common.messaging.TakeoverPartitionsResponseMessage; import org.apache.asterix.common.messaging.api.IApplicationMessage; import org.apache.asterix.common.messaging.api.ICCMessageBroker; +import org.apache.asterix.external.feed.message.FeedProviderReadyMessage; import org.apache.asterix.om.util.AsterixClusterProperties; import org.apache.hyracks.api.messages.IMessage; import org.apache.hyracks.api.util.JavaSerializationUtils; @@ -79,12 +81,20 @@ case COMPLETE_FAILBACK_RESPONSE: handleCompleteFailbcakResponse(message); break; + case FEED_PROVIDER_READY: + handleFeedProviderReady(message); + break; default: LOGGER.warning("Unknown message: " + absMessage.getMessageType()); break; } } + private void handleFeedProviderReady(IMessage message) { + FeedProviderReadyMessage msg = (FeedProviderReadyMessage) message; + FeedLifecycleListener.INSTANCE.notifyProviderReady(msg.getFeedId(), msg.getJobId()); + } + private synchronized void handleResourceIdRequest(IMessage message, String nodeId) throws Exception { ResourceIdRequestMessage msg = (ResourceIdRequestMessage) message; ResourceIdRequestResponseMessage reponse = new ResourceIdRequestResponseMessage(); diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.query.aql similarity index 92% rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql rename to asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.query.aql index 864ce01..b696974 100644 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.query.aql @@ -23,7 +23,7 @@ */ use dataverse KeyVerse; -for $d in dataset KVStore -order by meta().id -limit 5 -return meta().id; +count( + for $d in dataset KVStore + return $d +); diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.sleep.aql deleted file mode 100644 index db6954e..0000000 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.3.sleep.aql +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Description : Create a change feed and test ingestion of records - * Expected Res : Success - * Date : 24th Feb 2016 - */ -4000 \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql index b696974..28e9a15 100644 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.4.query.aql @@ -24,6 +24,7 @@ use dataverse KeyVerse; count( - for $d in dataset KVStore - return $d +for $d in dataset KVStore +distinct by meta()."key" +return 1 ); diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql index 28e9a15..7423399 100644 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.5.query.aql @@ -23,8 +23,12 @@ */ use dataverse KeyVerse; -count( + for $d in dataset KVStore -distinct by meta()."key" -return 1 -); +group by $vb := meta().vbucket with $d +order by $vb +limit 5 +return { + "vbucket": $vb, + "tuple_count": count($d) +}; diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.ddl.aql similarity index 100% rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql rename to asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.ddl.aql diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.query.aql deleted file mode 100644 index 7423399..0000000 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.6.query.aql +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Description : Create a change feed and test ingestion of records - * Expected Res : Success - * Date : 24th Feb 2016 - */ -use dataverse KeyVerse; - - -for $d in dataset KVStore -group by $vb := meta().vbucket with $d -order by $vb -limit 5 -return { - "vbucket": $vb, - "tuple_count": count($d) -}; diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.7.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.7.ddl.aql deleted file mode 100644 index d282e66..0000000 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/change-feed-with-meta-pk-in-meta/change-feed-with-meta-pk-in-meta.7.ddl.aql +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Description : Create a change feed and test ingestion of records - * Expected Res : Success - * Date : 24th Feb 2016 - */ -drop dataverse KeyVerse; \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql index 4d2f9c4..4c1635f 100644 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.2.update.aql @@ -26,3 +26,4 @@ set wait-for-completion-feed "false"; connect feed TweetFeed to dataset Tweets; +disconnect feed TweetFeed from dataset Tweets; \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.ddl.aql similarity index 100% rename from asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.5.ddl.aql rename to asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.ddl.aql diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql deleted file mode 100644 index e70df33..0000000 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.3.sleep.aql +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Description : Drop a dataverse with disconnected feed - * Expected Res : Success - * Date : 24th Feb 2016 - */ -3000 \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql deleted file mode 100644 index 34d6285..0000000 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/drop-dataverse-with-disconnected-feed/drop-dataverse-with-disconnected-feed.4.update.aql +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Description : Drop a dataverse with disconnected feed - * Expected Res : Success - * Date : 24th Feb 2016 - */ - -use dataverse experiments; -disconnect feed TweetFeed from dataset Tweets; \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.server.aql similarity index 100% rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.server.aql rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.server.aql diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql deleted file mode 100644 index eb18795..0000000 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.3.sleep.aql +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Description : Create a socket feed with a client that pushes - * 10 records. The feed is connected to a dataset that is then - * queried for the data. - * Expected Res : Success - * Date : 24th Feb 2016 - */ -3000 \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.sleep.aql similarity index 100% rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.sleep.aql rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.4.sleep.aql diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.update.aql similarity index 100% rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.update.aql rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.5.update.aql diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.query.aql similarity index 100% rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.query.aql rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.6.query.aql diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.server.aql similarity index 100% rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.server.aql rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.7.server.aql diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.ddl.aql similarity index 100% rename from asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.9.ddl.aql rename to asterix-app/src/test/resources/runtimets/queries/feeds/feed-push-socket/feed-push-socket.8.ddl.aql diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.query.aql similarity index 92% copy from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql copy to asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.query.aql index 864ce01..b696974 100644 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.query.aql +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.query.aql @@ -23,7 +23,7 @@ */ use dataverse KeyVerse; -for $d in dataset KVStore -order by meta().id -limit 5 -return meta().id; +count( + for $d in dataset KVStore + return $d +); diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.sleep.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.sleep.aql deleted file mode 100644 index db6954e..0000000 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.3.sleep.aql +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Description : Create a change feed and test ingestion of records - * Expected Res : Success - * Date : 24th Feb 2016 - */ -4000 \ No newline at end of file diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql index b696974..864ce01 100644 --- a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql +++ b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.4.query.aql @@ -23,7 +23,7 @@ */ use dataverse KeyVerse; -count( - for $d in dataset KVStore - return $d -); +for $d in dataset KVStore +order by meta().id +limit 5 +return meta().id; diff --git a/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql b/asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.ddl.aql similarity index 100% copy from asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.6.ddl.aql copy to asterix-app/src/test/resources/runtimets/queries/feeds/feed-with-meta-pk-in-meta/feed-with-meta-pk-in-meta.5.ddl.aql diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java index 5d2e263..fba74e8 100644 --- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java +++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java @@ -35,7 +35,8 @@ PREPARE_PARTITIONS_FAILBACK_RESPONSE, COMPLETE_FAILBACK_REQUEST, COMPLETE_FAILBACK_RESPONSE, - REPLICA_EVENT + REPLICA_EVENT, + FEED_PROVIDER_READY } public abstract ApplicationMessageType getMessageType(); diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java new file mode 100644 index 0000000..7b74ef9 --- /dev/null +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/api/FeedOperationCounter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.api; + +import org.apache.asterix.external.feed.watch.FeedJobInfo; + +public class FeedOperationCounter { + private FeedJobInfo feedJobInfo; + private int providersCount; + private int jobsCount; + private boolean failedIngestion = false; + + public FeedOperationCounter(int providersCount, int jobsCount) { + this.providersCount = providersCount; + this.jobsCount = jobsCount; + } + + public int getProvidersCount() { + return providersCount; + } + + public void setProvidersCount(int providersCount) { + this.providersCount = providersCount; + } + + public int getJobsCount() { + return jobsCount; + } + + public void setJobsCount(int jobsCount) { + this.jobsCount = jobsCount; + } + + public boolean isFailedIngestion() { + return failedIngestion; + } + + public void setFailedIngestion(boolean failedIngestion) { + this.failedIngestion = failedIngestion; + } + + public FeedJobInfo getFeedJobInfo() { + return feedJobInfo; + } + + public void setFeedJobInfo(FeedJobInfo feedJobInfo) { + this.feedJobInfo = feedJobInfo; + } +} diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java new file mode 100644 index 0000000..4c81c5b --- /dev/null +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedProviderReadyMessage.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.feed.message; + +import org.apache.asterix.common.messaging.AbstractApplicationMessage; +import org.apache.asterix.external.feed.management.FeedId; +import org.apache.hyracks.api.job.JobId; + +public class FeedProviderReadyMessage extends AbstractApplicationMessage { + + private static final long serialVersionUID = 1L; + private final FeedId feedId; + private final JobId jobId; + + public FeedProviderReadyMessage(FeedId feedId, JobId jobId) { + this.feedId = feedId; + this.jobId = jobId; + } + + @Override + public ApplicationMessageType getMessageType() { + return ApplicationMessageType.FEED_PROVIDER_READY; + } + + public FeedId getFeedId() { + return feedId; + } + + public JobId getJobId() { + return jobId; + } +} diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java index d0348c2..8475e45 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedCollectOperatorDescriptor.java @@ -147,22 +147,7 @@ } private IngestionRuntime getIntakeRuntime(SubscribableFeedRuntimeId subscribableRuntimeId) { - int waitCycleCount = 0; - ISubscribableRuntime ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId); - while (ingestionRuntime == null && waitCycleCount < 1000) { - try { - Thread.sleep(3000); - waitCycleCount++; - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("waiting to obtain ingestion runtime for subscription " + subscribableRuntimeId); - } - } catch (InterruptedException e) { - e.printStackTrace(); - break; - } - ingestionRuntime = subscriptionManager.getSubscribableRuntime(subscribableRuntimeId); - } - return (IngestionRuntime) ingestionRuntime; + return (IngestionRuntime) subscriptionManager.getSubscribableRuntime(subscribableRuntimeId); } public ConnectionLocation getSubscriptionLocation() { diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java index c1748d9..6771010 100644 --- a/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java +++ b/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java @@ -36,6 +36,7 @@ import org.apache.asterix.external.feed.api.ISubscriberRuntime; import org.apache.asterix.external.feed.dataflow.DistributeFeedFrameWriter; import org.apache.asterix.external.feed.management.FeedId; +import org.apache.asterix.external.feed.message.FeedProviderReadyMessage; import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager; import org.apache.asterix.external.feed.runtime.CollectionRuntime; @@ -115,6 +116,9 @@ ingestionRuntime = new IngestionRuntime(feedId, runtimeId, feedFrameWriter, recordDesc, adapterRuntimeManager, ctx); feedSubscriptionManager.registerFeedSubscribableRuntime(ingestionRuntime); + // Notify FeedJobNotificationHandler that this provider is ready to receive subscription requests. + ctx.sendApplicationMessageToCC(new FeedProviderReadyMessage(feedId, ctx.getJobletContext().getJobId()), + null); feedFrameWriter.open(); } else { if (ingestionRuntime.getAdapterRuntimeManager().getState().equals(State.INACTIVE_INGESTION)) { -- To view, visit https://asterix-gerrit.ics.uci.edu/726 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I416bf4917b1f5cea687d1202c435f7183136cf1f Gerrit-PatchSet: 5 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
