tkhurana commented on code in PR #2144: URL: https://github.com/apache/phoenix/pull/2144#discussion_r2085583886
########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogWriter.java: ########## @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.phoenix.replication.log.LogFile; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.ExceptionHandler; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.YieldingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; + +/** + * Responsible for writing replication log records using a Disruptor for high throughput and low + * latency appends. It obtains the current active {@link LogFileWriter} from the + * {@link ReplicationLogManager} and appends records to it via a dedicated handler thread. Handles + * sync failures by rolling the writer and then by retrying the in-flight appends with the new + * writer. + * <p> + * A caller will call append() one or more times to insert mutations into the ring buffer. It will + * then call sync() to ensure that all of the mutations pending in the ring buffer are committed to + * the file. This allows for potentially high throughput through batching while also enabling a + * caller to be certain their append()ed mutations, and potentially others, are synced to the + * replication log, before continuing. + */ +public class ReplicationLogWriter implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogWriter.class); + + public static final String REPLICATION_WRITER_RINGBUFFER_SIZE_KEY = + "phoenix.replication.writer.ringbuffer.size"; + public static final int DEFAULT_REPLICATION_WRITER_RINGBUFFER_SIZE = 1024 * 32; // Too big? + public static final String REPLICATION_WRITER_SYNC_TIMEOUT_KEY = + "phoenix.replication.writer.sync.timeout.ms"; + public static final long DEFAULT_REPLICATION_WRITER_SYNC_TIMEOUT = 1000 * 30; + // Note that the total potential time we might spend waiting for a sync is + // SYNC_TIMEOUT * SYNC_RETRIES + SYNC_RETRY_PAUSE * SYNC_RETRIES-1 ~= 94 seconds. + public static final String REPLICATION_WRITER_SYNC_RETRIES_KEY = + "phoenix.replication.writer.sync.retries"; + public static final int DEFAULT_REPLICATION_WRITER_SYNC_RETRIES = 5; + public static final String REPLICATION_WRITER_SYNC_RETRY_PAUSE_KEY = + "phoenix.replication.writer.sync.retry.pause.ms"; + public static final long DEFAULT_REPLICATION_WRITER_SYNC_RETRY_PAUSE = 100; + + private static final byte EVENT_TYPE_DATA = 0; + private static final byte EVENT_TYPE_SYNC = 1; + + private final Configuration conf; + private final ReplicationLogManager logManager; + private final int ringBufferSize; + private final long syncTimeoutMs; + private Disruptor<LogEvent> disruptor; + private RingBuffer<LogEvent> ringBuffer; + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + + public ReplicationLogWriter(Configuration conf, ReplicationLogManager logManager) { + this.conf = conf; + this.logManager = logManager; + this.ringBufferSize = conf.getInt(REPLICATION_WRITER_RINGBUFFER_SIZE_KEY, + DEFAULT_REPLICATION_WRITER_RINGBUFFER_SIZE); + this.syncTimeoutMs = conf.getLong(REPLICATION_WRITER_SYNC_TIMEOUT_KEY, + DEFAULT_REPLICATION_WRITER_SYNC_TIMEOUT); + } + + @SuppressWarnings("unchecked") + public void init() { + // Initialize the Disruptor. We use ProducerType.MULTI because multiple handlers might + // call append concurrently. We use YieldingWaitStrategy for low latency. When the ring + // buffer is full (controlled by REPLICATION_WRITER_RINGBUFFER_SIZE_KEY), producers + // calling ringBuffer.next() will effectively block (by yielding/spinning), creating + // backpressure on the callers. This ensures appends don't proceed until there is space. + disruptor = new Disruptor<>(LogEvent.EVENT_FACTORY, ringBufferSize, + new ThreadFactoryBuilder().setNameFormat("ReplicationLogEventHandler-%d") + .setDaemon(true).build(), + ProducerType.MULTI, new YieldingWaitStrategy()); + disruptor.handleEventsWith(new LogEventHandler(conf, logManager)); + disruptor.setDefaultExceptionHandler(new LogExceptionHandler()); + ringBuffer = disruptor.start(); + started.set(true); + LOG.info("ReplicationLogWriter started with ring buffer size {}", ringBufferSize); + } + + /** + * Append a mutation to the log. This method is non-blocking and returns quickly, unless the + * ring buffer is full. The actual write happens asynchronously. We expect multiple append() + * calls followed by a sync(). The appends will be batched by the Disruptor. Should the ring + * buffer become full, which is not expected under normal operation but could (and should) + * happen if the log file writer is unable to make progress, due to a HDFS level disruption. + * Should we enter that condition this method will block until the append can be inserted. + * @param tableName The name of the HBase table the mutation applies to. + * @param commitId The commit identifier (e.g., SCN) associated with the mutation. Review Comment: Is this basically the timestamp the coproc assigns to the mutation ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
