abdullah alamoudi has submitted this change and it was merged.

Change subject: [ASTERIXDB-1443][FEED] Remove Frame Distributor
......................................................................


[ASTERIXDB-1443][FEED] Remove Frame Distributor

- user model changes: no
- storage format changes: no
- interface changes: no

details:
- FrameDistributor and DistributeFeedFrameWriter are not used
  anymore.

Change-Id: I27c1ff99ce797923dd709d181387560e4f9448a5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1853
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Xikui Wang <xkk...@gmail.com>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
BAD: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
---
D 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
D 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
2 files changed, 0 insertions(+), 302 deletions(-)

Approvals:
  Xikui Wang: Looks good to me, approved
  Jenkins: Verified; No violations found; No violations found; Verified



diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
deleted file mode 100644
index ae2e0b9..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/DistributeFeedFrameWriter.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.active.EntityId;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-/**
- * Provides mechanism for distributing the frames, as received from an 
operator to a
- * set of registered readers. Each reader typically operates at a different 
pace. Readers
- * are isolated from each other to ensure that a slow reader does not impact 
the progress of
- * others.
- **/
-public class DistributeFeedFrameWriter implements IFrameWriter {
-
-    /** A unique identifier for the feed to which the incoming tuples belong. 
**/
-    private final EntityId feedId;
-
-    /**
-     * An instance of FrameDistributor that provides the mechanism for 
distributing a frame to multiple readers, each
-     * operating in isolation.
-     **/
-    private final FrameDistributor frameDistributor;
-
-    /** The original frame writer instantiated as part of job creation **/
-    private final IFrameWriter writer;
-
-    /** The feed operation whose output is being distributed by the 
DistributeFeedFrameWriter **/
-    private final FeedRuntimeType feedRuntimeType;
-
-    /** The value of the partition 'i' if this is the i'th instance of the 
associated operator **/
-    private final int partition;
-
-    public DistributeFeedFrameWriter(EntityId feedId, IFrameWriter writer, 
FeedRuntimeType feedRuntimeType,
-            int partition) throws IOException {
-        this.feedId = feedId;
-        this.frameDistributor = new FrameDistributor();
-        this.feedRuntimeType = feedRuntimeType;
-        this.partition = partition;
-        this.writer = writer;
-    }
-
-    /**
-     * @param fpa
-     *            Feed policy accessor
-     * @param nextOnlyWriter
-     *            the writer which will deliver the buffers
-     * @param connectionId
-     *            (Dataverse - Dataset - Feed)
-     * @return A frame collector.
-     * @throws HyracksDataException
-     */
-    public void subscribe(FeedFrameCollector collector) throws 
HyracksDataException {
-        frameDistributor.registerFrameCollector(collector);
-    }
-
-    public void unsubscribeFeed(FeedConnectionId connectionId) throws 
HyracksDataException {
-        frameDistributor.deregisterFrameCollector(connectionId);
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        try {
-            frameDistributor.close();
-        } finally {
-            writer.close();
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        writer.fail();
-    }
-
-    @Override
-    public void nextFrame(ByteBuffer frame) throws HyracksDataException {
-        frameDistributor.nextFrame(frame);
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        writer.open();
-    }
-
-    @Override
-    public String toString() {
-        return feedId.toString() + feedRuntimeType + "[" + partition + "]";
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        frameDistributor.flush();
-    }
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
deleted file mode 100644
index 6ca4b77..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameDistributor.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.feed.dataflow;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.external.feed.dataflow.FeedFrameCollector.State;
-import org.apache.asterix.external.feed.management.FeedConnectionId;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Logger;
-
-public class FrameDistributor implements IFrameWriter {
-
-    public static final Logger LOGGER = 
Logger.getLogger(FrameDistributor.class.getName());
-    /** A map storing the registered frame readers ({@code 
FeedFrameCollector}. **/
-    private final Map<FeedConnectionId, FeedFrameCollector> 
registeredCollectors;
-    private Throwable rootFailureCause = null;
-
-    public FrameDistributor() throws HyracksDataException {
-        this.registeredCollectors = new HashMap<FeedConnectionId, 
FeedFrameCollector>();
-    }
-
-    public synchronized void registerFrameCollector(FeedFrameCollector 
frameCollector) throws HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new 
RuntimeDataException(ErrorCode.FEED_DATAFLOW_FRAME_DISTR_REGISTER_FAILED_DATA_PROVIDER,
-                    rootFailureCause);
-        }
-        // registering a new collector.
-        try {
-            frameCollector.open();
-        } catch (Throwable th) {
-            rootFailureCause = th;
-            try {
-                frameCollector.fail();
-            } catch (Throwable failThrowable) {
-                th.addSuppressed(failThrowable);
-            } finally {
-                try {
-                    frameCollector.close();
-                } catch (Throwable closeThrowable) {
-                    th.addSuppressed(closeThrowable);
-                }
-            }
-            throw th;
-        }
-        registeredCollectors.put(frameCollector.getConnectionId(), 
frameCollector);
-    }
-
-    public synchronized void deregisterFrameCollector(FeedFrameCollector 
frameCollector) throws HyracksDataException {
-        deregisterFrameCollector(frameCollector.getConnectionId());
-    }
-
-    public synchronized void deregisterFrameCollector(FeedConnectionId 
connectionId) throws HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new 
RuntimeDataException(ErrorCode.FEED_DATAFLOW_FRAME_DISTR_REGISTER_FAILED_DATA_PROVIDER,
-                    rootFailureCause);
-        }
-        FeedFrameCollector frameCollector = removeFrameCollector(connectionId);
-        try {
-            frameCollector.close();
-        } catch (Throwable th) {
-            rootFailureCause = th;
-            throw th;
-        }
-    }
-
-    public synchronized FeedFrameCollector 
removeFrameCollector(FeedConnectionId connectionId) {
-        return registeredCollectors.remove(connectionId);
-    }
-
-    /*
-     * Fix. What should be done?:
-     * 0. mark failure so no one can subscribe or unsubscribe.
-     * 1. Throw the throwable.
-     * 2. when fail() is called, call fail on all subscribers
-     * 3. close all the subscribers.
-     * (non-Javadoc)
-     * @see 
org.apache.hyracks.api.comm.IFrameWriter#nextFrame(java.nio.ByteBuffer)
-     */
-    @Override
-    public synchronized void nextFrame(ByteBuffer frame) throws 
HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new HyracksDataException(rootFailureCause);
-        }
-        for (FeedFrameCollector collector : registeredCollectors.values()) {
-            try {
-                collector.nextFrame(frame);
-            } catch (Throwable th) {
-                rootFailureCause = th;
-                throw th;
-            }
-        }
-    }
-
-    @Override
-    public void fail() throws HyracksDataException {
-        Collection<FeedFrameCollector> collectors = 
registeredCollectors.values();
-        Iterator<FeedFrameCollector> it = collectors.iterator();
-        while (it.hasNext()) {
-            FeedFrameCollector collector = it.next();
-            try {
-                collector.fail();
-            } catch (Throwable th) {
-                while (it.hasNext()) {
-                    FeedFrameCollector innerCollector = it.next();
-                    try {
-                        innerCollector.fail();
-                    } catch (Throwable innerTh) {
-                        th.addSuppressed(innerTh);
-                    }
-                }
-                throw th;
-            }
-        }
-    }
-
-    @Override
-    public void close() throws HyracksDataException {
-        Collection<FeedFrameCollector> collectors = 
registeredCollectors.values();
-        Iterator<FeedFrameCollector> it = collectors.iterator();
-        while (it.hasNext()) {
-            FeedFrameCollector collector = it.next();
-            try {
-                collector.close();
-            } catch (Throwable th) {
-                while (it.hasNext()) {
-                    FeedFrameCollector innerCollector = it.next();
-                    try {
-                        innerCollector.close();
-                    } catch (Throwable innerTh) {
-                        th.addSuppressed(innerTh);
-                    } finally {
-                        innerCollector.setState(State.FINISHED);
-                    }
-                }
-                // resume here
-                throw th;
-            } finally {
-                collector.setState(State.FINISHED);
-            }
-        }
-    }
-
-    @Override
-    public void flush() throws HyracksDataException {
-        if (rootFailureCause != null) {
-            throw new HyracksDataException(rootFailureCause);
-        }
-        for (FeedFrameCollector collector : registeredCollectors.values()) {
-            try {
-                collector.flush();
-            } catch (Throwable th) {
-                rootFailureCause = th;
-                throw th;
-            }
-        }
-    }
-
-    @Override
-    public void open() throws HyracksDataException {
-        // Nothing to do here :)
-    }
-}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1853
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I27c1ff99ce797923dd709d181387560e4f9448a5
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <bamou...@gmail.com>
Gerrit-Reviewer: Ian Maxon <ima...@apache.org>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Murtadha Hubail <hubail...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xkk...@gmail.com>
Gerrit-Reviewer: Yingyi Bu <buyin...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <bamou...@gmail.com>

Reply via email to