tkhurana commented on code in PR #2144:
URL: https://github.com/apache/phoenix/pull/2144#discussion_r2085583886


##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogWriter.java:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
+/**
+ * Responsible for writing replication log records using a Disruptor for high 
throughput and low
+ * latency appends. It obtains the current active {@link LogFileWriter} from 
the
+ * {@link ReplicationLogManager} and appends records to it via a dedicated 
handler thread. Handles
+ * sync failures by rolling the writer and then by retrying the in-flight 
appends with the new
+ * writer.
+ * <p>
+ * A caller will call append() one or more times to insert mutations into the 
ring buffer. It will
+ * then call sync() to ensure that all of the mutations pending in the ring 
buffer are committed to
+ * the file. This allows for potentially high throughput through batching 
while also enabling a
+ * caller to be certain their append()ed mutations, and potentially others, 
are synced to the
+ * replication log, before continuing.
+ */
+public class ReplicationLogWriter implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogWriter.class);
+
+    public static final String REPLICATION_WRITER_RINGBUFFER_SIZE_KEY =
+        "phoenix.replication.writer.ringbuffer.size";
+    public static final int DEFAULT_REPLICATION_WRITER_RINGBUFFER_SIZE = 1024 
* 32;  // Too big?
+    public static final String REPLICATION_WRITER_SYNC_TIMEOUT_KEY =
+        "phoenix.replication.writer.sync.timeout.ms";
+    public static final long DEFAULT_REPLICATION_WRITER_SYNC_TIMEOUT = 1000 * 
30;
+    // Note that the total potential time we might spend waiting for a sync is
+    // SYNC_TIMEOUT * SYNC_RETRIES + SYNC_RETRY_PAUSE * SYNC_RETRIES-1 ~= 94 
seconds.
+    public static final String REPLICATION_WRITER_SYNC_RETRIES_KEY =
+        "phoenix.replication.writer.sync.retries";
+    public static final int DEFAULT_REPLICATION_WRITER_SYNC_RETRIES = 5;
+    public static final String REPLICATION_WRITER_SYNC_RETRY_PAUSE_KEY =
+        "phoenix.replication.writer.sync.retry.pause.ms";
+    public static final long DEFAULT_REPLICATION_WRITER_SYNC_RETRY_PAUSE = 100;
+
+    private static final byte EVENT_TYPE_DATA = 0;
+    private static final byte EVENT_TYPE_SYNC = 1;
+
+    private final Configuration conf;
+    private final ReplicationLogManager logManager;
+    private final int ringBufferSize;
+    private final long syncTimeoutMs;
+    private Disruptor<LogEvent> disruptor;
+    private RingBuffer<LogEvent> ringBuffer;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public ReplicationLogWriter(Configuration conf, ReplicationLogManager 
logManager) {
+        this.conf = conf;
+        this.logManager = logManager;
+        this.ringBufferSize = 
conf.getInt(REPLICATION_WRITER_RINGBUFFER_SIZE_KEY,
+            DEFAULT_REPLICATION_WRITER_RINGBUFFER_SIZE);
+        this.syncTimeoutMs = conf.getLong(REPLICATION_WRITER_SYNC_TIMEOUT_KEY,
+            DEFAULT_REPLICATION_WRITER_SYNC_TIMEOUT);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void init() {
+        // Initialize the Disruptor. We use ProducerType.MULTI because 
multiple handlers might
+        // call append concurrently. We use YieldingWaitStrategy for low 
latency. When the ring
+        // buffer is full (controlled by 
REPLICATION_WRITER_RINGBUFFER_SIZE_KEY), producers
+        // calling ringBuffer.next() will effectively block (by 
yielding/spinning), creating
+        // backpressure on the callers. This ensures appends don't proceed 
until there is space.
+        disruptor = new Disruptor<>(LogEvent.EVENT_FACTORY, ringBufferSize,
+            new 
ThreadFactoryBuilder().setNameFormat("ReplicationLogEventHandler-%d")
+                .setDaemon(true).build(),
+            ProducerType.MULTI, new YieldingWaitStrategy());
+        disruptor.handleEventsWith(new LogEventHandler(conf, logManager));
+        disruptor.setDefaultExceptionHandler(new LogExceptionHandler());
+        ringBuffer = disruptor.start();
+        started.set(true);
+        LOG.info("ReplicationLogWriter started with ring buffer size {}", 
ringBufferSize);
+    }
+
+    /**
+     * Append a mutation to the log. This method is non-blocking and returns 
quickly, unless the
+     * ring buffer is full. The actual write happens asynchronously. We expect 
multiple append()
+     * calls followed by a sync(). The appends will be batched by the 
Disruptor. Should the ring
+     * buffer become full, which is not expected under normal operation but 
could (and should)
+     * happen if the log file writer is unable to make progress, due to a HDFS 
level disruption.
+     * Should we enter that condition this method will block until the append 
can be inserted.
+     * @param tableName The name of the HBase table the mutation applies to.
+     * @param commitId  The commit identifier (e.g., SCN) associated with the 
mutation.

Review Comment:
   Is this basically the timestamp the coproc assigns to the mutation ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to