tkhurana commented on code in PR #2144: URL: https://github.com/apache/phoenix/pull/2144#discussion_r2085626404
########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogManager.java: ########## @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.io.compress.Compression; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.phoenix.replication.log.LogFile; +import org.apache.phoenix.replication.log.LogFileWriter; +import org.apache.phoenix.replication.log.LogFileWriterContext; +import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages the lifecycle of replication log files on the active cluster side. It handles log + * rotation based on time and size thresholds and provides the currently active LogFileWriter. + * This class is intended to be thread-safe. + * <p> + * TODO: This class will switch between active (synchronous) and store-and-forward fallback + * writer depending on error handling/retry strategy. + */ [email protected](value = { "EI_EXPOSE_REP", "EI_EXPOSE_REP2" }, + justification = "Intentional") +public class ReplicationLogManager implements Closeable { + + /** The path on the standby HDFS where log files should be written. (The "IN" directory.) */ + public static final String REPLICATION_STANDBY_HDFS_URL_KEY = + "phoenix.replication.log.standby.hdfs.url"; + /** + * The path on the active HDFS where log files should be written when we have fallen back to + * store and forward mode. (The "OUT" directory.) + */ + public static final String REPLICATION_FALLBACK_HDFS_URL_KEY = + "phoenix.replication.log.fallback.hdfs.url"; + /** + * The number of shards (subfolders) to maintain in the "IN" directory. + * <p> + * Shard directories have the format shard-NNNNN, e.g. shard-00001. The maximum value is + * 100000. + */ + public static final String REPLICATION_NUM_SHARDS_KEY = "phoenix.replication.log.shards"; + public static final int DEFAULT_REPLICATION_NUM_SHARDS = 1000; + public static final int MAX_REPLICATION_NUM_SHARDS = 100000; + /** Replication log rotation time trigger, default is 1 minute */ + public static final String REPLICATION_LOG_ROTATION_TIME_MS_KEY = + "phoenix.replication.log.rotation.time.ms"; + public static final long DEFAULT_REPLICATION_LOG_ROTATION_TIME_MS = 60 * 1000L; + /** Replication log rotation size trigger, default is 256 MB */ + public static final String REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY = + "phoenix.replication.log.rotation.size.bytes"; + public static final long DEFAULT_REPLICATION_LOG_ROTATION_SIZE_BYTES = 256 * 1024 * 1024L; + /** Replication log rotation size trigger percentage, default is 0.95 */ + public static final String REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE_KEY = + "phoenix.replication.log.rotation.size.percentage"; + public static final double DEFAULT_REPLICATION_LOG_ROTATION_SIZE_PERCENTAGE = 0.95; + /** Replication log compression, default is "NONE" */ + public static final String REPLICATION_LOG_COMPRESSION_ALGORITHM_KEY = + "phoenix.replication.log.compression"; + public static final String DEFAULT_REPLICATION_LOG_COMPRESSION_ALGORITHM = "NONE"; + + public static final String SHARD_DIR_FORMAT = "shard-%04d"; + public static final String FILE_NAME_FORMAT = "%d-%s.plog"; + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogManager.class); + + private static volatile ReplicationLogManager instance; + + private final Configuration conf; + private final RegionServerServices rsServices; + protected FileSystem standbyFs; + protected FileSystem fallbackFs; // For store-and-forward (future use) + protected int numShards; + protected URI standbyUrl; + protected URI fallbackUrl; // For store-and-forward (future use) + private final long rotationTimeMs; + private final long rotationSizeBytes; + private final Compression.Algorithm compression; + private final ReentrantLock lock = new ReentrantLock(); + protected volatile LogFile.Writer currentWriter; // Current writer + //private volatile LogFile.Writer activeWriter; // Current active side writer (future use) + //private volatile LogFile.Writer fallbackWriter; // Current fallback writer (future use) + protected final AtomicLong lastRotationTime = new AtomicLong(); + protected final AtomicLong writerGeneration = new AtomicLong(); + private ScheduledExecutorService rotationExecutor; + private final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final ConcurrentHashMap<Path,Object> shardMap = new ConcurrentHashMap<>(); + + /** + * Gets the singleton instance of the ReplicationLogManager using the lazy initializer pattern. + * Initializes the instance if it hasn't been created yet. + * @param conf Configuration object. + * @return The singleton ReplicationLogManager instance. + * @throws IOException If initialization fails. + */ + public static ReplicationLogManager getInstance(Configuration conf, + RegionServerServices rsServices) throws IOException { Review Comment: The IndexRegionObserver coproc has the RegionCoprocessorEnvironment interface. I didn't find any way to get RegionServerServices from that interface. https://github.com/apache/phoenix/blob/master/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java#L407 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
