tkhurana commented on code in PR #2144:
URL: https://github.com/apache/phoenix/pull/2144#discussion_r2085787077


##########
phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogWriter.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.phoenix.replication.log.LogFile;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
+import 
org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
+/**
+ * Responsible for writing replication log records using a Disruptor for high 
throughput and low
+ * latency appends. It obtains the current active {@link LogFileWriter} from 
the
+ * ReplicationLogManager and appends records to it via a dedicated handler 
thread. Handles sync
+ * failures by rolling the writer and then by retrying the in-flight appends 
with the new writer.
+ * <p>
+ * A caller will call append() one or more times to insert mutations into the 
ring buffer. It will
+ * then call sync() to ensure that all of the mutations pending in the ring 
buffer are committed to
+ * the file. This allows for potentially high throughput through batching 
while also enabling a
+ * caller to be certain their append()ed mutations, and potentially others, 
are synced to the
+ * replication log, before continuing.
+ */
[email protected](value = { "EI_EXPOSE_REP", 
"EI_EXPOSE_REP2" },
+    justification = "Intentional")
+public class ReplicationLogWriter implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogWriter.class);
+
+    public static final String REPLICATION_WRITER_RINGBUFFER_SIZE_KEY =
+        "phoenix.replication.writer.ringbuffer.size";
+    public static final int DEFAULT_REPLICATION_WRITER_RINGBUFFER_SIZE = 1024 
* 32;  // Too big?
+    public static final String REPLICATION_WRITER_SYNC_TIMEOUT_KEY =
+        "phoenix.replication.writer.sync.timeout.ms";
+    public static final long DEFAULT_REPLICATION_WRITER_SYNC_TIMEOUT = 1000 * 
30;
+    // Note that the total potential time we might spend waiting for a sync is
+    // SYNC_TIMEOUT * SYNC_RETRIES + SYNC_RETRY_PAUSE * SYNC_RETRIES-1 ~= 94 
seconds.
+    public static final String REPLICATION_WRITER_SYNC_RETRIES_KEY =
+        "phoenix.replication.writer.sync.retries";
+    public static final int DEFAULT_REPLICATION_WRITER_SYNC_RETRIES = 5;
+    public static final String REPLICATION_WRITER_SYNC_RETRY_PAUSE_KEY =
+        "phoenix.replication.writer.sync.retry.pause.ms";
+    public static final long DEFAULT_REPLICATION_WRITER_SYNC_RETRY_PAUSE = 100;
+
+    private static final byte EVENT_TYPE_DATA = 0;
+    private static final byte EVENT_TYPE_SYNC = 1;
+
+    private final Configuration conf;
+    private final ReplicationLogManager logManager;
+    private final int ringBufferSize;
+    private final long syncTimeoutMs;
+    private Disruptor<LogEvent> disruptor;
+    private RingBuffer<LogEvent> ringBuffer;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public ReplicationLogWriter(Configuration conf, ReplicationLogManager 
logManager) {

Review Comment:
   Is it expected that each rpc thread will instantiate its own 
ReplicationLogWriter() object ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to