abdullah alamoudi has submitted this change and it was merged. Change subject: [ASTERIXDB-1443][FEED] Remove Frame Distributor ......................................................................
[ASTERIXDB-1443][FEED] Remove Frame Distributor - user model changes: no - storage format changes: no - interface changes: no details: - FrameDistributor and DistributeFeedFrameWriter are not used anymore. Change-Id: I27c1ff99ce797923dd709d181387560e4f9448a5 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1853 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Xikui Wang <xkk...@gmail.com> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> BAD: Jenkins <jenk...@fulliautomatix.ics.uci.edu> --- D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java D asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java 2 files changed, 0 insertions(+), 302 deletions(-) Approvals: Xikui Wang: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java deleted file mode 100644 index ae2e0b9..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.dataflow; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.asterix.active.EntityId; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -/** - * Provides mechanism for distributing the frames, as received from an operator to a - * set of registered readers. Each reader typically operates at a different pace. Readers - * are isolated from each other to ensure that a slow reader does not impact the progress of - * others. - **/ -public class DistributeFeedFrameWriter implements IFrameWriter { - - /** A unique identifier for the feed to which the incoming tuples belong. **/ - private final EntityId feedId; - - /** - * An instance of FrameDistributor that provides the mechanism for distributing a frame to multiple readers, each - * operating in isolation. - **/ - private final FrameDistributor frameDistributor; - - /** The original frame writer instantiated as part of job creation **/ - private final IFrameWriter writer; - - /** The feed operation whose output is being distributed by the DistributeFeedFrameWriter **/ - private final FeedRuntimeType feedRuntimeType; - - /** The value of the partition 'i' if this is the i'th instance of the associated operator **/ - private final int partition; - - public DistributeFeedFrameWriter(EntityId feedId, IFrameWriter writer, FeedRuntimeType feedRuntimeType, - int partition) throws IOException { - this.feedId = feedId; - this.frameDistributor = new FrameDistributor(); - this.feedRuntimeType = feedRuntimeType; - this.partition = partition; - this.writer = writer; - } - - /** - * @param fpa - * Feed policy accessor - * @param nextOnlyWriter - * the writer which will deliver the buffers - * @param connectionId - * (Dataverse - Dataset - Feed) - * @return A frame collector. - * @throws HyracksDataException - */ - public void subscribe(FeedFrameCollector collector) throws HyracksDataException { - frameDistributor.registerFrameCollector(collector); - } - - public void unsubscribeFeed(FeedConnectionId connectionId) throws HyracksDataException { - frameDistributor.deregisterFrameCollector(connectionId); - } - - @Override - public void close() throws HyracksDataException { - try { - frameDistributor.close(); - } finally { - writer.close(); - } - } - - @Override - public void fail() throws HyracksDataException { - writer.fail(); - } - - @Override - public void nextFrame(ByteBuffer frame) throws HyracksDataException { - frameDistributor.nextFrame(frame); - } - - @Override - public void open() throws HyracksDataException { - writer.open(); - } - - @Override - public String toString() { - return feedId.toString() + feedRuntimeType + "[" + partition + "]"; - } - - @Override - public void flush() throws HyracksDataException { - frameDistributor.flush(); - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java deleted file mode 100644 index 6ca4b77..0000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.feed.dataflow; - -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State; -import org.apache.asterix.external.feed.management.FeedConnectionId; -import org.apache.hyracks.api.comm.IFrameWriter; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.log4j.Logger; - -public class FrameDistributor implements IFrameWriter { - - public static final Logger LOGGER = Logger.getLogger(FrameDistributor.class.getName()); - /** A map storing the registered frame readers ({@code FeedFrameCollector}. **/ - private final Map<FeedConnectionId, FeedFrameCollector> registeredCollectors; - private Throwable rootFailureCause = null; - - public FrameDistributor() throws HyracksDataException { - this.registeredCollectors = new HashMap<FeedConnectionId, FeedFrameCollector>(); - } - - public synchronized void registerFrameCollector(FeedFrameCollector frameCollector) throws HyracksDataException { - if (rootFailureCause != null) { - throw new RuntimeDataException(ErrorCode.FEED_DATAFLOW_FRAME_DISTR_REGISTER_FAILED_DATA_PROVIDER, - rootFailureCause); - } - // registering a new collector. - try { - frameCollector.open(); - } catch (Throwable th) { - rootFailureCause = th; - try { - frameCollector.fail(); - } catch (Throwable failThrowable) { - th.addSuppressed(failThrowable); - } finally { - try { - frameCollector.close(); - } catch (Throwable closeThrowable) { - th.addSuppressed(closeThrowable); - } - } - throw th; - } - registeredCollectors.put(frameCollector.getConnectionId(), frameCollector); - } - - public synchronized void deregisterFrameCollector(FeedFrameCollector frameCollector) throws HyracksDataException { - deregisterFrameCollector(frameCollector.getConnectionId()); - } - - public synchronized void deregisterFrameCollector(FeedConnectionId connectionId) throws HyracksDataException { - if (rootFailureCause != null) { - throw new RuntimeDataException(ErrorCode.FEED_DATAFLOW_FRAME_DISTR_REGISTER_FAILED_DATA_PROVIDER, - rootFailureCause); - } - FeedFrameCollector frameCollector = removeFrameCollector(connectionId); - try { - frameCollector.close(); - } catch (Throwable th) { - rootFailureCause = th; - throw th; - } - } - - public synchronized FeedFrameCollector removeFrameCollector(FeedConnectionId connectionId) { - return registeredCollectors.remove(connectionId); - } - - /* - * Fix. What should be done?: - * 0. mark failure so no one can subscribe or unsubscribe. - * 1. Throw the throwable. - * 2. when fail() is called, call fail on all subscribers - * 3. close all the subscribers. - * (non-Javadoc) - * @see org.apache.hyracks.api.comm.IFrameWriter#nextFrame(java.nio.ByteBuffer) - */ - @Override - public synchronized void nextFrame(ByteBuffer frame) throws HyracksDataException { - if (rootFailureCause != null) { - throw new HyracksDataException(rootFailureCause); - } - for (FeedFrameCollector collector : registeredCollectors.values()) { - try { - collector.nextFrame(frame); - } catch (Throwable th) { - rootFailureCause = th; - throw th; - } - } - } - - @Override - public void fail() throws HyracksDataException { - Collection<FeedFrameCollector> collectors = registeredCollectors.values(); - Iterator<FeedFrameCollector> it = collectors.iterator(); - while (it.hasNext()) { - FeedFrameCollector collector = it.next(); - try { - collector.fail(); - } catch (Throwable th) { - while (it.hasNext()) { - FeedFrameCollector innerCollector = it.next(); - try { - innerCollector.fail(); - } catch (Throwable innerTh) { - th.addSuppressed(innerTh); - } - } - throw th; - } - } - } - - @Override - public void close() throws HyracksDataException { - Collection<FeedFrameCollector> collectors = registeredCollectors.values(); - Iterator<FeedFrameCollector> it = collectors.iterator(); - while (it.hasNext()) { - FeedFrameCollector collector = it.next(); - try { - collector.close(); - } catch (Throwable th) { - while (it.hasNext()) { - FeedFrameCollector innerCollector = it.next(); - try { - innerCollector.close(); - } catch (Throwable innerTh) { - th.addSuppressed(innerTh); - } finally { - innerCollector.setState(State.FINISHED); - } - } - // resume here - throw th; - } finally { - collector.setState(State.FINISHED); - } - } - } - - @Override - public void flush() throws HyracksDataException { - if (rootFailureCause != null) { - throw new HyracksDataException(rootFailureCause); - } - for (FeedFrameCollector collector : registeredCollectors.values()) { - try { - collector.flush(); - } catch (Throwable th) { - rootFailureCause = th; - throw th; - } - } - } - - @Override - public void open() throws HyracksDataException { - // Nothing to do here :) - } -} -- To view, visit https://asterix-gerrit.ics.uci.edu/1853 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I27c1ff99ce797923dd709d181387560e4f9448a5 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com> Gerrit-Reviewer: Ian Maxon <ima...@apache.org> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Murtadha Hubail <hubail...@gmail.com> Gerrit-Reviewer: Till Westmann <ti...@apache.org> Gerrit-Reviewer: Xikui Wang <xkk...@gmail.com> Gerrit-Reviewer: Yingyi Bu <buyin...@gmail.com> Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>