This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 03e67bc1cf8fbb63ad59e868ae0287679bde4e5d
Author: Zhong, Yanghong <nju_y...@apache.org>
AuthorDate: Wed May 20 17:15:39 2020 +0800

    KYLIN-4507 Add hack file TCPMemcachedNodeImpl.java
---
 .../memcached/protocol/TCPMemcachedNodeImpl.java   | 641 +++++++++++++++++++++
 1 file changed, 641 insertions(+)

diff --git 
a/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java 
b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
new file mode 100644
index 0000000..22dd730
--- /dev/null
+++ b/cache/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package net.spy.memcached.protocol;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import net.spy.memcached.ConnectionFactory;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedConnection;
+import net.spy.memcached.MemcachedNode;
+import net.spy.memcached.compat.SpyObject;
+import net.spy.memcached.ops.Operation;
+import net.spy.memcached.ops.OperationState;
+import net.spy.memcached.protocol.binary.TapAckOperationImpl;
+
+/**
+ * Represents a node with the memcached cluster, along with buffering and
+ * operation queues.
+ */
+public abstract class TCPMemcachedNodeImpl extends SpyObject implements 
MemcachedNode {
+
+    private final SocketAddress socketAddress;
+    private final ByteBuffer rbuf;
+    private final ByteBuffer wbuf;
+    protected final BlockingQueue<Operation> writeQ;
+    private final BlockingQueue<Operation> readQ;
+    private final BlockingQueue<Operation> inputQueue;
+    private final long opQueueMaxBlockTime;
+    private final long authWaitTime;
+    private final ConnectionFactory connectionFactory;
+    private AtomicInteger reconnectAttempt = new AtomicInteger(1);
+    private SocketChannel channel;
+    private int toWrite = 0;
+    protected Operation optimizedOp = null;
+    private volatile SelectionKey sk = null;
+    private boolean shouldAuth = false;
+    private CountDownLatch authLatch;
+    private ArrayList<Operation> reconnectBlocked;
+    private long defaultOpTimeout;
+    private volatile long lastReadTimestamp = System.nanoTime();
+    private MemcachedConnection connection;
+
+    // operation Future.get timeout counter
+    private final AtomicInteger continuousTimeout = new AtomicInteger(0);
+
+    public TCPMemcachedNodeImpl(SocketAddress sa, SocketChannel c, int 
bufSize, BlockingQueue<Operation> rq,
+            BlockingQueue<Operation> wq, BlockingQueue<Operation> iq, long 
opQueueMaxBlockTime, boolean waitForAuth,
+            long dt, long authWaitTime, ConnectionFactory fact) {
+        super();
+        assert sa != null : "No SocketAddress";
+        assert c != null : "No SocketChannel";
+        assert bufSize > 0 : "Invalid buffer size: " + bufSize;
+        assert rq != null : "No operation read queue";
+        assert wq != null : "No operation write queue";
+        assert iq != null : "No input queue";
+        socketAddress = sa;
+        connectionFactory = fact;
+        this.authWaitTime = authWaitTime;
+        setChannel(c);
+        // Since these buffers are allocated rarely (only on client creation
+        // or reconfigure), and are passed to Channel.read() and 
Channel.write(),
+        // use direct buffers to avoid
+        //   http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6214569
+        rbuf = ByteBuffer.allocateDirect(bufSize);
+        wbuf = ByteBuffer.allocateDirect(bufSize);
+        getWbuf().clear();
+        readQ = rq;
+        writeQ = wq;
+        inputQueue = iq;
+        this.opQueueMaxBlockTime = opQueueMaxBlockTime;
+        shouldAuth = waitForAuth;
+        defaultOpTimeout = dt;
+        setupForAuth();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#copyInputQueue()
+     */
+    public final void copyInputQueue() {
+        Collection<Operation> tmp = new ArrayList<Operation>();
+
+        // don't drain more than we have space to place
+        inputQueue.drainTo(tmp, writeQ.remainingCapacity());
+        writeQ.addAll(tmp);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#destroyInputQueue()
+     */
+    public Collection<Operation> destroyInputQueue() {
+        Collection<Operation> rv = new ArrayList<Operation>();
+        inputQueue.drainTo(rv);
+        return rv;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#setupResend()
+     */
+    public final void setupResend() {
+        // First, reset the current write op, or cancel it if we should
+        // be authenticating
+        Operation op = getCurrentWriteOp();
+        if (shouldAuth && op != null) {
+            op.cancel();
+        } else if (op != null) {
+            ByteBuffer buf = op.getBuffer();
+            if (buf != null) {
+                buf.reset();
+            } else {
+                getLogger().info("No buffer for current write op, removing");
+                removeCurrentWriteOp();
+            }
+        }
+        // Now cancel all the pending read operations. Might be better to
+        // to requeue them.
+        while (hasReadOp()) {
+            op = removeCurrentReadOp();
+            if (op != getCurrentWriteOp()) {
+                getLogger().warn("Discarding partially completed op: %s", op);
+                op.cancel();
+            }
+        }
+
+        while (shouldAuth && hasWriteOp()) {
+            op = removeCurrentWriteOp();
+            getLogger().warn("Discarding partially completed op: %s", op);
+            op.cancel();
+        }
+
+        getWbuf().clear();
+        getRbuf().clear();
+        toWrite = 0;
+    }
+
+    // Prepare the pending operations. Return true if there are any pending
+    // ops
+    private boolean preparePending() {
+        // Copy the input queue into the write queue.
+        copyInputQueue();
+
+        // Now check the ops
+        Operation nextOp = getCurrentWriteOp();
+        while (nextOp != null && nextOp.isCancelled()) {
+            getLogger().info("Removing cancelled operation: %s", nextOp);
+            removeCurrentWriteOp();
+            nextOp = getCurrentWriteOp();
+        }
+        return nextOp != null;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#fillWriteBuffer(boolean)
+     */
+    public final void fillWriteBuffer(boolean shouldOptimize) {
+        if (toWrite == 0 && readQ.remainingCapacity() > 0) {
+            getWbuf().clear();
+            Operation o = getNextWritableOp();
+
+            while (o != null && toWrite < getWbuf().capacity()) {
+                synchronized (o) {
+                    assert o.getState() == OperationState.WRITING;
+
+                    ByteBuffer obuf = o.getBuffer();
+                    assert obuf != null : "Didn't get a write buffer from " + 
o;
+                    int bytesToCopy = Math.min(getWbuf().remaining(), 
obuf.remaining());
+                    byte[] b = new byte[bytesToCopy];
+                    obuf.get(b);
+                    getWbuf().put(b);
+                    getLogger().debug("After copying stuff from %s: %s", o, 
getWbuf());
+                    if (!o.getBuffer().hasRemaining()) {
+                        o.writeComplete();
+                        transitionWriteItem();
+
+                        preparePending();
+                        if (shouldOptimize) {
+                            optimize();
+                        }
+
+                        o = getNextWritableOp();
+                    }
+                    toWrite += bytesToCopy;
+                }
+            }
+            getWbuf().flip();
+            assert toWrite <= getWbuf().capacity() : "toWrite exceeded 
capacity: " + this;
+            assert toWrite == getWbuf().remaining() : "Expected " + toWrite + 
" remaining, got "
+                    + getWbuf().remaining();
+        } else {
+            getLogger().debug("Buffer is full, skipping");
+        }
+    }
+
+    private Operation getNextWritableOp() {
+        Operation o = getCurrentWriteOp();
+        while (o != null && o.getState() == OperationState.WRITE_QUEUED) {
+            synchronized (o) {
+                if (o.isCancelled()) {
+                    getLogger().debug("Not writing cancelled op.");
+                    Operation cancelledOp = removeCurrentWriteOp();
+                    assert o == cancelledOp;
+                } else if (o.isTimedOut(defaultOpTimeout)) {
+                    getLogger().debug("Not writing timed out op.");
+                    Operation timedOutOp = removeCurrentWriteOp();
+                    assert o == timedOutOp;
+                } else {
+                    o.writing();
+                    if (!(o instanceof TapAckOperationImpl)) {
+                        readQ.add(o);
+                    }
+                    return o;
+                }
+                o = getCurrentWriteOp();
+            }
+        }
+        return o;
+    }
+
+    /* (non-Javadoc)
+     * @see net.spy.memcached.MemcachedNode#transitionWriteItem()
+     */
+    public final void transitionWriteItem() {
+        Operation op = removeCurrentWriteOp();
+        assert op != null : "There is no write item to transition";
+        getLogger().debug("Finished writing %s", op);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#optimize()
+     */
+    protected abstract void optimize();
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getCurrentReadOp()
+     */
+    public final Operation getCurrentReadOp() {
+        return readQ.peek();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#removeCurrentReadOp()
+     */
+    public final Operation removeCurrentReadOp() {
+        return readQ.remove();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getCurrentWriteOp()
+     */
+    public final Operation getCurrentWriteOp() {
+        return optimizedOp == null ? writeQ.peek() : optimizedOp;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#removeCurrentWriteOp()
+     */
+    public final Operation removeCurrentWriteOp() {
+        Operation rv = optimizedOp;
+        if (rv == null) {
+            rv = writeQ.remove();
+        } else {
+            optimizedOp = null;
+        }
+        return rv;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#hasReadOp()
+     */
+    public final boolean hasReadOp() {
+        return !readQ.isEmpty();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#hasWriteOp()
+     */
+    public final boolean hasWriteOp() {
+        return !(optimizedOp == null && writeQ.isEmpty());
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see 
net.spy.memcached.MemcachedNode#addOp(net.spy.memcached.ops.Operation)
+     */
+    public final void addOp(Operation op) {
+        try {
+            if (!authLatch.await(authWaitTime, TimeUnit.MILLISECONDS)) {
+                FailureMode mode = connectionFactory.getFailureMode();
+                if (mode == FailureMode.Redistribute || mode == 
FailureMode.Retry) {
+                    getLogger().debug("Redistributing Operation " + op + " 
because auth " + "latch taken longer than "
+                            + authWaitTime + " milliseconds to " + "complete 
on node " + getSocketAddress());
+                    connection.retryOperation(op);
+                } else {
+                    op.cancel();
+                    getLogger().warn("Operation canceled because 
authentication "
+                            + "or reconnection and authentication has " + 
"taken more than " + authWaitTime
+                            + " milliseconds to " + "complete on node " + 
this);
+                    getLogger().debug("Canceled operation %s", op.toString());
+                }
+                return;
+            }
+            if (!inputQueue.offer(op, opQueueMaxBlockTime, 
TimeUnit.MILLISECONDS)) {
+                throw new IllegalStateException(
+                        "Timed out waiting to add " + op + "(max wait=" + 
opQueueMaxBlockTime + "ms)");
+            }
+        } catch (InterruptedException e) {
+            // Restore the interrupted status
+            Thread.currentThread().interrupt();
+            throw new IllegalStateException("Interrupted while waiting to add 
" + op);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * 
net.spy.memcached.MemcachedNode#insertOp(net.spy.memcached.ops.Operation)
+     */
+    public final void insertOp(Operation op) {
+        ArrayList<Operation> tmp = new ArrayList<Operation>(inputQueue.size() 
+ 1);
+        tmp.add(op);
+        inputQueue.drainTo(tmp);
+        inputQueue.addAll(tmp);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getSelectionOps()
+     */
+    public final int getSelectionOps() {
+        int rv = 0;
+        if (getChannel().isConnected()) {
+            if (hasReadOp()) {
+                rv |= SelectionKey.OP_READ;
+            }
+            if (toWrite > 0 || hasWriteOp()) {
+                rv |= SelectionKey.OP_WRITE;
+            }
+        } else {
+            rv = SelectionKey.OP_CONNECT;
+        }
+        return rv;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getRbuf()
+     */
+    public final ByteBuffer getRbuf() {
+        return rbuf;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getWbuf()
+     */
+    public final ByteBuffer getWbuf() {
+        return wbuf;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getSocketAddress()
+     */
+    public final SocketAddress getSocketAddress() {
+        return socketAddress;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#isActive()
+     */
+    public final boolean isActive() {
+        return reconnectAttempt.get() == 0 && getChannel() != null && 
getChannel().isConnected();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#isAuthenticated()
+     */
+    public boolean isAuthenticated() {
+        return (0 == authLatch.getCount());
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#reconnecting()
+     */
+    public final void reconnecting() {
+        reconnectAttempt.incrementAndGet();
+        continuousTimeout.set(0);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#connected()
+     */
+    public final void connected() {
+        reconnectAttempt.set(0);
+        continuousTimeout.set(0);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getReconnectCount()
+     */
+    public final int getReconnectCount() {
+        return reconnectAttempt.get();
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#toString()
+     */
+    @Override
+    public final String toString() {
+        int sops = 0;
+        if (getSk() != null && getSk().isValid()) {
+            sops = getSk().interestOps();
+        }
+        int rsize = readQ.size() + (optimizedOp == null ? 0 : 1);
+        int wsize = writeQ.size();
+        int isize = inputQueue.size();
+        return "{QA sa=" + getSocketAddress() + ", #Rops=" + rsize + ", 
#Wops=" + wsize + ", #iq=" + isize + ", topRop="
+                + getCurrentReadOp() + ", topWop=" + getCurrentWriteOp() + ", 
toWrite=" + toWrite + ", interested="
+                + sops + "}";
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * net.spy.memcached.MemcachedNode#registerChannel
+     * (java.nio.channels.SocketChannel, java.nio.channels.SelectionKey)
+     */
+    public final void registerChannel(SocketChannel ch, SelectionKey skey) {
+        setChannel(ch);
+        setSk(skey);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * 
net.spy.memcached.MemcachedNode#setChannel(java.nio.channels.SocketChannel)
+     */
+    public final void setChannel(SocketChannel to) {
+        assert channel == null || !channel.isOpen() : "Attempting to overwrite 
channel";
+        channel = to;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getChannel()
+     */
+    public final SocketChannel getChannel() {
+        return channel;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see 
net.spy.memcached.MemcachedNode#setSk(java.nio.channels.SelectionKey)
+     */
+    public final void setSk(SelectionKey to) {
+        sk = to;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getSk()
+     */
+    public final SelectionKey getSk() {
+        return sk;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getBytesRemainingInBuffer()
+     */
+    public final int getBytesRemainingToWrite() {
+        return toWrite;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#writeSome()
+     */
+    public final int writeSome() throws IOException {
+        int wrote = channel.write(wbuf);
+        assert wrote >= 0 : "Wrote negative bytes?";
+        toWrite -= wrote;
+        assert toWrite >= 0 : "toWrite went negative after writing " + wrote + 
" bytes for " + this;
+        getLogger().debug("Wrote %d bytes", wrote);
+        return wrote;
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#setContinuousTimeout
+     */
+    public void setContinuousTimeout(boolean timedOut) {
+        if (timedOut && isActive()) {
+            continuousTimeout.incrementAndGet();
+        } else {
+            continuousTimeout.set(0);
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see net.spy.memcached.MemcachedNode#getContinuousTimeout
+     */
+    public int getContinuousTimeout() {
+        return continuousTimeout.get();
+    }
+
+    public final void fixupOps() {
+        // As the selection key can be changed at any point due to node
+        // failure, we'll grab the current volatile value and configure it.
+        SelectionKey s = sk;
+        if (s != null && s.isValid()) {
+            int iops = getSelectionOps();
+            getLogger().debug("Setting interested opts to %d", iops);
+            s.interestOps(iops);
+        } else {
+            getLogger().debug("Selection key is not valid.");
+        }
+    }
+
+    public final void authComplete() {
+        if (reconnectBlocked != null && reconnectBlocked.size() > 0) {
+            inputQueue.addAll(reconnectBlocked);
+        }
+        authLatch.countDown();
+    }
+
+    public final void setupForAuth() {
+        if (shouldAuth) {
+            authLatch = new CountDownLatch(1);
+            if (inputQueue.size() > 0) {
+                reconnectBlocked = new ArrayList<Operation>(inputQueue.size() 
+ 1);
+                inputQueue.drainTo(reconnectBlocked);
+            }
+            assert (inputQueue.size() == 0);
+            setupResend();
+        } else {
+            authLatch = new CountDownLatch(0);
+        }
+    }
+
+    /**
+     * Number of milliseconds since the last read of this node completed.
+     *
+     * @return milliseconds since last read.
+     */
+    public long lastReadDelta() {
+        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
lastReadTimestamp);
+    }
+
+    /**
+     * Mark this node as having just completed a read.
+     */
+    public void completedRead() {
+        lastReadTimestamp = System.nanoTime();
+    }
+
+    @Override
+    public MemcachedConnection getConnection() {
+        return connection;
+    }
+
+    @Override
+    public void setConnection(MemcachedConnection connection) {
+        this.connection = connection;
+    }
+}
\ No newline at end of file

Reply via email to