http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java deleted file mode 100644 index f7d746d..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogOrganizer.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; -import java.util.concurrent.Future; - -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.QueuedPersistentEvent; -import com.gemstone.gemfire.internal.cache.ForceReattemptException; - -/** - * Manages bucket level operations on sorted oplog files including creation, reading, serde, bloom - * buffering and compaction. Abstracts existence of multiple sorted oplog files - */ -public interface HoplogOrganizer<T extends PersistedEventImpl> extends HoplogSetReader<byte[], T>, - HoplogListener, Closeable { - - /** - * Iterates on the input buffer and persists it in a new sorted oplog. This invocation may block - * if there are too many outstanding write requests. - * - * @param bufferIter - * ordered iterator on a buffer of objects to be persisted - * @param count - * number of K,V pairs expected to be part of flush, 0 if unknown - * @throws IOException - */ - public void flush(Iterator<? extends QueuedPersistentEvent> bufferIter, int count) - throws IOException, ForceReattemptException; - - - /** - * Clear the data in HDFS. This method assumes that the - * dispatcher thread has already been paused, so there should be - * no concurrent flushes to HDFS when this method is called. - * - * @throws IOException - */ - public void clear() throws IOException; - - /** - * returns the compactor associated with this set - */ - public Compactor getCompactor(); - - /** - * Called to execute bucket maintenance activities, like purge expired files - * and create compaction task. Long running activities must be executed - * asynchronously, not on this thread, to avoid impact on other buckets - * @throws IOException - */ - public void performMaintenance() throws IOException; - - /** - * Schedules a compaction task and returns immediately. - * - * @param isMajor true for major compaction, false for minor compaction - * @return future for status of compaction request - */ - public Future<CompactionStatus> forceCompaction(boolean isMajor); - - /** - * Returns the timestamp of the last completed major compaction - * - * @return the timestamp or 0 if a major compaction has not taken place yet - */ - public long getLastMajorCompactionTimestamp(); - - public interface Compactor { - /** - * Requests a compaction operation be performed on this set of sorted oplogs. - * - * @param isMajor true for major compaction - * @param isForced true if the compaction should be carried out even if there - * is only one hoplog to compact - * - * @return true if compaction was performed, false otherwise - * @throws IOException - */ - boolean compact(boolean isMajor, boolean isForced) throws IOException; - - /** - * Stop the current compaction operation in the middle and suspend - * compaction operations. The current current compaction data - * will be thrown away, and no more compaction will be performend - * until resume is called. - */ - void suspend(); - - /** - * Resume compaction operations. - */ - void resume(); - - /** - * @return true if the compactor is not ready or busy - */ - boolean isBusy(boolean isMajor); - - /** - * @return the hdfsStore configuration used by this compactor - */ - public HDFSStore getHdfsStore(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java deleted file mode 100644 index 16939db..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetIterator.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HFileSortedOplog.HFileReader.HFileSortedIterator; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.ByteComparator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.TrackedReference; - -/** - * Provides a merged iterator on set of {@link HFileSortedOplog} - */ -public class HoplogSetIterator implements HoplogIterator<ByteBuffer, ByteBuffer> { - private final List<HFileSortedIterator> iters; - - // Number of entries remaining to be iterated by this scanner - private int entriesRemaining; - - // points at the current iterator holding the next entry - private ByteBuffer currentKey; - private ByteBuffer currentValue; - - public HoplogSetIterator(List<TrackedReference<Hoplog>> targets) throws IOException { - iters = new ArrayList<HFileSortedIterator>(); - for (TrackedReference<Hoplog> oplog : targets) { - HFileSortedIterator iter = (HFileSortedIterator) oplog.get().getReader().scan(); - if (!iter.hasNext()) { - // the oplog is empty, exclude from iterator - continue; - } - - // initialize the iterator - iter.nextBB(); - iters.add(iter); - entriesRemaining += oplog.get().getReader().getEntryCount(); - } - } - - public boolean hasNext() { - return entriesRemaining > 0; - } - - @Override - public ByteBuffer next() throws IOException { - return nextBB(); - } - public ByteBuffer nextBB() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); - } - - seekToMinKeyIter(); - - return currentKey; - } - - private void seekToMinKeyIter() throws IOException { - HFileSortedIterator currentIter = null; - ByteBuffer minKey = null; - - // scan through all hoplog iterators to reach to the iterator with smallest - // key on the head and remove duplicate keys - for (Iterator<HFileSortedIterator> iterator = iters.iterator(); iterator.hasNext();) { - HFileSortedIterator iter = iterator.next(); - - ByteBuffer tmpK = iter.getKeyBB(); - ByteBuffer tmpV = iter.getValueBB(); - if (minKey == null || ByteComparator.compareBytes(tmpK.array(), tmpK.arrayOffset(), tmpK.remaining(), minKey.array(), minKey.arrayOffset(), minKey.remaining()) < 0) { - minKey = tmpK; - currentKey = tmpK; - currentValue = tmpV; - currentIter = iter; - } else { - // remove possible duplicate key entries from iterator - if (seekHigherKeyInIter(minKey, iter) == null) { - // no more keys left in this iterator - iter.close(); - iterator.remove(); - } - } - } - - //seek next key in current iter - if (currentIter != null && seekHigherKeyInIter(minKey, currentIter) == null) { - // no more keys left in this iterator - currentIter.close(); - iters.remove(currentIter); - } - } - - private ByteBuffer seekHigherKeyInIter(ByteBuffer key, HFileSortedIterator iter) throws IOException { - ByteBuffer newK = iter.getKeyBB(); - - // remove all duplicates by incrementing iterator when a key is less than - // equal to current key - while (ByteComparator.compareBytes(newK.array(), newK.arrayOffset(), newK.remaining(), key.array(), key.arrayOffset(), key.remaining()) <= 0) { - entriesRemaining--; - if (iter.hasNext()) { - newK = iter.nextBB(); - } else { - newK = null; - break; - } - } - return newK; - } - - @Override - public ByteBuffer getKey() { - return getKeyBB(); - } - public ByteBuffer getKeyBB() { - if (currentKey == null) { - throw new IllegalStateException(); - } - return currentKey; - } - - @Override - public ByteBuffer getValue() { - return getValueBB(); - } - public ByteBuffer getValueBB() { - if (currentValue == null) { - throw new IllegalStateException(); - } - return currentValue; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - for (HoplogIterator<byte[], byte[]> iter : iters) { - iter.close(); - } - } - - public int getRemainingEntryCount() { - return entriesRemaining; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java deleted file mode 100644 index 789a616..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/HoplogSetReader.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; - -/** - * Reads a sorted oplog file or a merged set of sorted oplogs. - */ -public interface HoplogSetReader<K, V> { - /** - * Returns the value associated with the given key. - */ - V read(K key) throws IOException; - - /** - * Iterators over the entire contents of the sorted file. - * - * @return the sorted iterator - * @throws IOException - */ - HoplogIterator<K, V> scan() throws IOException; - - /** - * Scans the available keys and allows iteration over the interval [from, to) where the starting - * key is included and the ending key is excluded from the results. - * - * @param from - * the start key - * @param to - * the end key - * @return the sorted iterator - * @throws IOException - */ - HoplogIterator<K, V> scan(K from, K to) throws IOException; - - /** - * Scans the keys and allows iteration between the given keys. - * - * @param from - * the start key - * @param fromInclusive - * true if the start key is included in the scan - * @param to - * the end key - * @param toInclusive - * true if the end key is included in the scan - * @return the sorted iterator - * @throws IOException - */ - HoplogIterator<K, V> scan(K from, boolean fromInclusive, K to, boolean toInclusive) throws IOException; - - - /** - * Scans the available keys and allows iteration over the offset - * specified as parameters - * - * - * @param startOffset - * the start offset - * @param length - * bytes to read - * @return the sorted iterator - * @throws IOException - */ - HoplogIterator<K, V> scan(long startOffset, long length) throws IOException; - - /** - * Using Cardinality estimator provides an approximate number of entries - * - * @return the number of entries - */ - long sizeEstimate(); - - /** - * Returns true if the reader has been closed. - * @return true if closed - */ - boolean isClosed(); - - /** - * Allows sorted iteration through a set of keys and values. - */ - public interface HoplogIterator<K, V> { - K getKey(); - - V getValue(); - - /** moves to next element and returns the key object */ - K next() throws IOException; - - boolean hasNext(); - - void close(); - - void remove(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java deleted file mode 100644 index a2926ff..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/SequenceFileHoplog.java +++ /dev/null @@ -1,395 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.gemstone.gemfire.cache.hdfs.internal.hoplog; - -import java.io.Closeable; -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumMap; - -import com.gemstone.gemfire.internal.hll.ICardinality; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; - -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator; -import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile; -import com.gemstone.gemfire.cache.hdfs.internal.org.apache.hadoop.io.SequenceFile.Reader; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; -import com.gemstone.gemfire.internal.Version; - -import org.apache.logging.log4j.Logger; - -/** - * Implements Sequence file based {@link Hoplog} - * - * - */ -public class SequenceFileHoplog extends AbstractHoplog{ - - public SequenceFileHoplog(FileSystem inputFS, Path filePath, - SortedOplogStatistics stats) - throws IOException - { - super(inputFS, filePath, stats); - } - @Override - public void close() throws IOException { - // Nothing to do - } - - @Override - public HoplogReader getReader() throws IOException { - return new SequenceFileReader(); - } - - @Override - /** - * gets the writer for sequence file. - * - * @param keys is not used for SequenceFileHoplog class - */ - public HoplogWriter createWriter(int keys) throws IOException { - return new SequenceFileHoplogWriter(); - } - - @Override - public boolean isClosed() { - return false; - } - - @Override - public void close(boolean clearCache) throws IOException { - // Nothing to do - } - - /** - * Currently, hsync does not update the file size on namenode. So, if last time the - * process died after calling hsync but before calling file close, the file is - * left with an inconsistent file size. This is a workaround that - open the file stream in append - * mode and close it. This fixes the file size on the namenode. - * - * @throws IOException - * @return true if the file size was fixed - */ - public boolean fixFileSize() throws IOException { - // Try to fix the file size - // Loop so that the expected expceptions can be ignored 3 - // times - if (logger.isDebugEnabled()) - logger.debug("{}Fixing size of hoplog " + path, logPrefix); - Exception e = null; - boolean exceptionThrown = false; - for (int i =0; i < 3; i++) { - try { - FSDataOutputStream stream = fsProvider.getFS().append(path); - stream.close(); - stream = null; - } catch (IOException ie) { - exceptionThrown = true; - e = ie; - if (logger.isDebugEnabled()) - logger.debug("{}Retry run " + (i + 1) + ": Hoplog " + path + " is still a temporary " + - "hoplog because the node managing it wasn't shutdown properly last time. Failed to " + - "fix the hoplog because an exception was thrown " + e, logPrefix ); - } - // As either RecoveryInProgressException was thrown or - // Already being created exception was thrown, wait for - // sometime before next retry. - if (exceptionThrown) { - try { - Thread.sleep(5000); - } catch (InterruptedException e1) { - } - exceptionThrown = false; - } else { - // no exception was thrown, break; - return true; - } - } - logger.info (logPrefix, LocalizedMessage.create(LocalizedStrings.DEBUG, "Hoplog " + path + " is still a temporary " + - "hoplog because the node managing it wasn't shutdown properly last time. Failed to " + - "fix the hoplog because an exception was thrown " + e)); - - return false; - } - - @Override - public String toString() { - return "SequenceFileHplog[" + getFileName() + "]"; - } - - private class SequenceFileHoplogWriter implements HoplogWriter { - - private SequenceFile.Writer writer = null; - - public SequenceFileHoplogWriter() throws IOException{ - writer = AbstractHoplog.getSequenceFileWriter(path, conf, logger); - } - - @Override - public void close() throws IOException { - writer.close(); - if (logger.isDebugEnabled()) - logger.debug("{}Completed creating hoplog " + path, logPrefix); - } - - @Override - public void hsync() throws IOException { - writer.hsyncWithSizeUpdate(); - if (logger.isDebugEnabled()) - logger.debug("{}hsync'ed a batch of data to hoplog " + path, logPrefix); - } - - @Override - public void append(byte[] key, byte[] value) throws IOException { - writer.append(new BytesWritable(key), new BytesWritable(value)); - } - - @Override - public void append(ByteBuffer key, ByteBuffer value) throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public void close(EnumMap<Meta, byte[]> metadata) throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - @Override - public long getCurrentSize() throws IOException { - return writer.getLength(); - } - - } - /** - * Sequence file reader. This is currently to be used only by MapReduce jobs and - * test functions - * - */ - public class SequenceFileReader implements HoplogReader, Closeable { - @Override - public byte[] read(byte[] key) throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public HoplogIterator<byte[], byte[]> scan() - throws IOException { - return new SequenceFileIterator(fsProvider.getFS(), path, 0, Long.MAX_VALUE, conf, logger); - } - - @Override - public HoplogIterator<byte[], byte[]> scan( - byte[] from, byte[] to) throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public HoplogIterator<byte[], byte[]> scan( - long startOffset, long length) throws IOException { - return new SequenceFileIterator(fsProvider.getFS(), path, startOffset, length, conf, logger); - } - - @Override - public HoplogIterator<byte[], byte[]> scan( - byte[] from, boolean fromInclusive, byte[] to, boolean toInclusive) - throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public boolean isClosed() { - throw new UnsupportedOperationException("Not supported for Sequence files."); - } - - @Override - public void close() throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files. Close the iterator instead."); - } - - @Override - public ByteBuffer get(byte[] key) throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public BloomFilter getBloomFilter() throws IOException { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public long getEntryCount() { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public ICardinality getCardinalityEstimator() { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public long sizeEstimate() { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - - } - - /** - * Sequence file iterator. This is currently to be used only by MapReduce jobs and - * test functions - * - */ - public static class SequenceFileIterator implements HoplogIterator<byte[], byte[]> { - - SequenceFile.Reader reader = null; - private BytesWritable prefetchedKey = null; - private BytesWritable prefetchedValue = null; - private byte[] currentKey; - private byte[] currentValue; - boolean hasNext = false; - Logger logger; - Path path; - private long start; - private long end; - - public SequenceFileIterator(FileSystem fs, Path path, long startOffset, - long length, Configuration conf, Logger logger) - throws IOException { - Reader.Option optPath = SequenceFile.Reader.file(path); - - // Hadoop has a configuration parameter io.serializations that is a list of serialization - // classes which can be used for obtaining serializers and deserializers. This parameter - // by default contains avro classes. When a sequence file is created, it calls - // SerializationFactory.getSerializer(keyclass). This internally creates objects using - // reflection of all the classes that were part of io.serializations. But since, there is - // no avro class available it throws an exception. - // Before creating a sequenceFile, override the io.serializations parameter and pass only the classes - // that are important to us. - String serializations[] = conf.getStrings("io.serializations", - new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"}); - conf.setStrings("io.serializations", - new String[]{"org.apache.hadoop.io.serializer.WritableSerialization"}); - // create reader - boolean emptyFile = false; - try { - reader = new SequenceFile.Reader(conf, optPath); - }catch (EOFException e) { - // this is ok as the file has ended. just return false that no more records available - emptyFile = true; - } - // reset the configuration to its original value - conf.setStrings("io.serializations", serializations); - this.logger = logger; - this.path = path; - - if (emptyFile) { - hasNext = false; - } else { - // The file should be read from the first sync marker after the start position and - // until the first sync marker after the end position is seen. - this.end = startOffset + length; - if (startOffset > reader.getPosition()) { - reader.sync(startOffset); // sync to start - } - this.start = reader.getPosition(); - this.hasNext = this.start < this.end; - if (hasNext) - readNext(); - } - } - - - public Version getVersion(){ - String version = reader.getMetadata().get(new Text(Meta.GEMFIRE_VERSION.name())).toString(); - return Version.fromOrdinalOrCurrent(Short.parseShort(version)); - } - @Override - public boolean hasNext() { - return hasNext; - } - - @Override - public byte[] next() { - currentKey = prefetchedKey.getBytes(); - currentValue = prefetchedValue.getBytes(); - - readNext(); - - return currentKey; - } - - private void readNext() { - try { - long pos = reader.getPosition(); - prefetchedKey = new BytesWritable(); - prefetchedValue = new BytesWritable(); - hasNext = reader.next(prefetchedKey, prefetchedValue); - // The file should be read from the first sync marker after the start position and - // until the first sync marker after the end position is seen. - if (pos >= end && reader.syncSeen()) { - hasNext = false; - } - } catch (EOFException e) { - // this is ok as the file has ended. just return false that no more records available - hasNext = false; - } - catch (IOException e) { - hasNext = false; - logger.error(LocalizedMessage.create(LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE, path), e); - throw new HDFSIOException( - LocalizedStrings.HOPLOG_FAILED_TO_READ_HDFS_FILE.toLocalizedString(path), e); - } - } - @Override - public void remove() { - throw new UnsupportedOperationException("Not supported for Sequence files"); - } - - @Override - public void close() { - IOUtils.closeStream(reader); - } - - @Override - public byte[] getKey() { - return currentKey; - } - - @Override - public byte[] getValue() { - return currentValue; - } - - /** Returns true iff the previous call to next passed a sync mark.*/ - public boolean syncSeen() { return reader.syncSeen(); } - - /** Return the current byte position in the input file. */ - public synchronized long getPosition() throws IOException { - return reader.getPosition(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java deleted file mode 100644 index f5b63cc..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/AbstractGFRecordReader.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred; - -import java.io.IOException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.lib.CombineFileSplit; - -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent; -import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFKey; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HDFSSplitIterator; - -public class AbstractGFRecordReader - extends - com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.AbstractGFRecordReader - implements RecordReader<GFKey, PersistedEventImpl> { - - /** - * Initializes instance of record reader using file split and job - * configuration - * - * @param split - * @param conf - * @throws IOException - */ - public void initialize(CombineFileSplit split, JobConf conf) throws IOException { - CombineFileSplit cSplit = (CombineFileSplit) split; - Path[] path = cSplit.getPaths(); - long[] start = cSplit.getStartOffsets(); - long[] len = cSplit.getLengths(); - - FileSystem fs = cSplit.getPath(0).getFileSystem(conf); - this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l); - } - - @Override - public boolean next(GFKey key, PersistedEventImpl value) throws IOException { - /* - * if there are more records in the hoplog, iterate to the next record. Set - * key object as is. - */ - - if (!super.hasNext()) { - key.setKey(null); - // TODO make value null; - return false; - } - - super.next(); - - key.setKey(super.getKey().getKey()); - PersistedEventImpl usersValue = super.getValue(); - value.copy(usersValue); - return true; - } - - @Override - public GFKey createKey() { - return new GFKey(); - } - - @Override - public PersistedEventImpl createValue() { - if(this.isSequential) { - return new UnsortedHoplogPersistedEvent(); - } else { - return new SortedHoplogPersistedEvent(); - } - } - - @Override - public long getPos() throws IOException { - // there is no efficient way to find the position of key in hoplog file. - return 0; - } - - @Override - public void close() throws IOException { - super.close(); - } - - @Override - public float getProgress() throws IOException { - return super.getProgressRatio(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java deleted file mode 100644 index 0e0e455..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFInputFormat.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobConfigurable; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.lib.CombineFileSplit; - -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFKey; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil.HoplogOptimizedSplitter; - -public class GFInputFormat extends - com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFInputFormat - implements InputFormat<GFKey, PersistedEventImpl>, JobConfigurable { - - @Override - public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { - this.conf = job; - - Collection<FileStatus> hoplogs = getHoplogs(); - return createSplits(job, hoplogs); - } - - /** - * Creates an input split for every block occupied by hoplogs of the input - * regions - * - * @param job - * @param hoplogs - * @return array of input splits of type file input split - * @throws IOException - */ - private InputSplit[] createSplits(JobConf job, Collection<FileStatus> hoplogs) - throws IOException { - if (hoplogs == null || hoplogs.isEmpty()) { - return new InputSplit[0]; - } - - HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs); - List<org.apache.hadoop.mapreduce.InputSplit> mr2Splits = splitter.getOptimizedSplits(conf); - InputSplit[] splits = new InputSplit[mr2Splits.size()]; - int i = 0; - for (org.apache.hadoop.mapreduce.InputSplit inputSplit : mr2Splits) { - org.apache.hadoop.mapreduce.lib.input.CombineFileSplit mr2Spit; - mr2Spit = (org.apache.hadoop.mapreduce.lib.input.CombineFileSplit) inputSplit; - - CombineFileSplit split = new CombineFileSplit(job, mr2Spit.getPaths(), - mr2Spit.getStartOffsets(), mr2Spit.getLengths(), - mr2Spit.getLocations()); - splits[i] = split; - i++; - } - - return splits; - } - - @Override - public RecordReader<GFKey, PersistedEventImpl> getRecordReader( - InputSplit split, JobConf job, Reporter reporter) throws IOException { - - CombineFileSplit cSplit = (CombineFileSplit) split; - AbstractGFRecordReader reader = new AbstractGFRecordReader(); - reader.initialize(cSplit, job); - return reader; - } - - @Override - public void configure(JobConf job) { - this.conf = job; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java deleted file mode 100644 index 1494e9f..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapred/GFOutputFormat.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.util.Progressable; - -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.client.ClientCache; - -/** - * Output format for gemfire. The records provided to writers created by this - * output format are PUT in a live gemfire cluster. - * - */ -public class GFOutputFormat extends - com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFOutputFormat - implements OutputFormat<Object, Object> { - - @Override - public RecordWriter<Object, Object> getRecordWriter( - FileSystem ignored, JobConf job, String name, Progressable progress) - throws IOException { - ClientCache cache = getClientCacheInstance(job); - return new GFRecordWriter(cache, job); - } - - @Override - public void checkOutputSpecs(FileSystem ignored, JobConf job) - throws IOException { - validateConfiguration(job); - } - - public class GFRecordWriter implements RecordWriter<Object, Object> { - private ClientCache clientCache; - private Region<Object, Object> region; - - public GFRecordWriter(ClientCache cache, Configuration conf) { - this.clientCache = cache; - region = getRegionInstance(conf, clientCache); - } - - @Override - public void write(Object key, Object value) throws IOException { - executePut(region, key, value); - } - - @Override - public void close(Reporter reporter) throws IOException { - closeClientCache(clientCache); - // TODO update reporter - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java deleted file mode 100644 index 2c71b18..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/AbstractGFRecordReader.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; - -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.SortedHoplogPersistedEvent; -import com.gemstone.gemfire.cache.hdfs.internal.UnsortedHoplogPersistedEvent; -import com.gemstone.gemfire.internal.util.BlobHelper; - -public class AbstractGFRecordReader extends - RecordReader<GFKey, PersistedEventImpl> { - - // constant overhead of each KV in hfile. This is used in computing the - // progress of record reader - protected long RECORD_OVERHEAD = 8; - - // accounting for number of bytes already read from the hfile - private long bytesRead; - - protected boolean isSequential; - - protected HDFSSplitIterator splitIterator; - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - CombineFileSplit cSplit = (CombineFileSplit) split; - Path[] path = cSplit.getPaths(); - long[] start = cSplit.getStartOffsets(); - long[] len = cSplit.getLengths(); - - Configuration conf = context.getConfiguration(); - FileSystem fs = cSplit.getPath(0).getFileSystem(conf); - - this.splitIterator = HDFSSplitIterator.newInstance(fs, path, start, len, 0l, 0l); - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - return next(); - } - - protected boolean next() throws IOException { - if (!hasNext()) { - return false; - } - - splitIterator.next(); - bytesRead += (splitIterator.getKey().length + splitIterator.getValue().length); - bytesRead += RECORD_OVERHEAD; - return true; - } - - protected boolean hasNext() throws IOException { - return splitIterator.hasNext(); - } - - @Override - public GFKey getCurrentKey() throws IOException, InterruptedException { - return getKey(); - } - - protected GFKey getKey() throws IOException { - try { - GFKey key = new GFKey(); - key.setKey(BlobHelper.deserializeBlob(splitIterator.getKey())); - return key; - } catch (ClassNotFoundException e) { - // TODO resolve logging - return null; - } - } - - @Override - public PersistedEventImpl getCurrentValue() throws IOException, - InterruptedException { - return getValue(); - } - - protected PersistedEventImpl getValue() throws IOException { - try { - byte[] valueBytes = splitIterator.getValue(); - if(isSequential) { - return UnsortedHoplogPersistedEvent.fromBytes(valueBytes); - } else { - return SortedHoplogPersistedEvent.fromBytes(valueBytes); - } - } catch (ClassNotFoundException e) { - // TODO resolve logging - return null; - } - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return getProgressRatio(); - } - - protected float getProgressRatio() throws IOException { - if (!splitIterator.hasNext()) { - return 1.0f; - } else if (bytesRead > splitIterator.getLength()) { - // the record reader is expected to read more number of bytes as it - // continues till beginning of next block. hence if extra reading has - // started return fixed value - return 0.95f; - } else { - return Math.min(1.0f, bytesRead / (float) (splitIterator.getLength())); - } - } - - @Override - public void close() throws IOException { - splitIterator.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java deleted file mode 100644 index ff64ceb..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFInputFormat.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import com.gemstone.gemfire.cache.hdfs.HDFSStore; -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HDFSRegionDirector.HdfsRegionManager; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.HoplogUtil.HoplogOptimizedSplitter; - -public class GFInputFormat extends InputFormat<GFKey, PersistedEventImpl> - implements Configurable { - public static final String HOME_DIR = "mapreduce.input.gfinputformat.homedir"; - public static final String INPUT_REGION = "mapreduce.input.gfinputformat.inputregion"; - public static final String START_TIME = "mapreduce.input.gfinputformat.starttime"; - public static final String END_TIME = "mapreduce.input.gfinputformat.endtime"; - public static final String CHECKPOINT = "mapreduce.input.gfinputformat.checkpoint"; - - protected Configuration conf; - - @Override - public List<InputSplit> getSplits(JobContext job) throws IOException { - this.conf = job.getConfiguration(); - - Collection<FileStatus> hoplogs = getHoplogs(); - return createSplits(hoplogs); - } - - /** - * Identifies filters provided in the job configuration and creates a list of - * sorted hoplogs. If there are no sorted hoplogs, checks if the region has - * sequential hoplogs - * - * @return list of hoplogs - * @throws IOException - */ - protected Collection<FileStatus> getHoplogs() throws IOException { - String regionName = conf.get(INPUT_REGION); - System.out.println("GFInputFormat: Region Name is " + regionName); - if (regionName == null || regionName.trim().isEmpty()) { - // incomplete job configuration, region name must be provided - return new ArrayList<FileStatus>(); - } - - String home = conf.get(HOME_DIR, HDFSStore.DEFAULT_HOME_DIR); - regionName = HdfsRegionManager.getRegionFolder(regionName); - Path regionPath = new Path(home + "/" + regionName); - FileSystem fs = regionPath.getFileSystem(conf); - - long start = conf.getLong(START_TIME, 0l); - long end = conf.getLong(END_TIME, 0l); - boolean checkpoint = conf.getBoolean(CHECKPOINT, true); - - // if the region contains flush hoplogs then the region is of type RW. - Collection<FileStatus> hoplogs; - hoplogs = HoplogUtil.filterHoplogs(fs, regionPath, start, end, checkpoint); - return hoplogs == null ? new ArrayList<FileStatus>() : hoplogs; - } - - /** - * Creates an input split for every block occupied by hoplogs of the input - * regions - * - * @param hoplogs - * @return list of input splits of type file input split - * @throws IOException - */ - private List<InputSplit> createSplits(Collection<FileStatus> hoplogs) - throws IOException { - List<InputSplit> splits = new ArrayList<InputSplit>(); - if (hoplogs == null || hoplogs.isEmpty()) { - return splits; - } - - HoplogOptimizedSplitter splitter = new HoplogOptimizedSplitter(hoplogs); - return splitter.getOptimizedSplits(conf); - } - - @Override - public RecordReader<GFKey, PersistedEventImpl> createRecordReader( - InputSplit split, TaskAttemptContext context) throws IOException, - InterruptedException { - return new AbstractGFRecordReader(); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public Configuration getConf() { - return conf; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java deleted file mode 100644 index 5bba2c7..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFKey.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.io.WritableComparator; - -import com.gemstone.gemfire.internal.util.BlobHelper; - -public class GFKey implements WritableComparable<GFKey> { - private Object key; - - public Object getKey() { - return key; - } - - public void setKey(Object key) { - this.key = key; - } - - @Override - public void write(DataOutput out) throws IOException { - byte[] bytes = BlobHelper.serializeToBlob(key); - out.writeInt(bytes.length); - out.write(bytes, 0, bytes.length); - } - - @Override - public void readFields(DataInput in) throws IOException { - int len = in.readInt(); - byte[] bytes = new byte[len]; - in.readFully(bytes, 0, len); - try { - key = BlobHelper.deserializeBlob(bytes); - } catch (ClassNotFoundException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - - @Override - public int compareTo(GFKey o) { - try { - byte[] b1 = BlobHelper.serializeToBlob(key); - byte[] b2 = BlobHelper.serializeToBlob(o.key); - return WritableComparator.compareBytes(b1, 0, b1.length, b2, 0, b2.length); - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - return 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java deleted file mode 100644 index 3be2ab0..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/GFOutputFormat.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.InvalidJobConfException; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; - -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.RegionExistsException; -import com.gemstone.gemfire.cache.client.ClientCache; -import com.gemstone.gemfire.cache.client.ClientCacheFactory; -import com.gemstone.gemfire.cache.client.ClientRegionFactory; -import com.gemstone.gemfire.cache.client.ClientRegionShortcut; -import com.gemstone.gemfire.cache.server.CacheServer; -import com.gemstone.gemfire.management.internal.cli.converters.ConnectionEndpointConverter; - -/** - * Output format for gemfire. The records provided to writers created by this - * output format are PUT in a live gemfire cluster. - * - */ -public class GFOutputFormat extends OutputFormat<Object, Object> { - public static final String REGION = "mapreduce.output.gfoutputformat.outputregion"; - public static final String LOCATOR_HOST = "mapreduce.output.gfoutputformat.locatorhost"; - public static final String LOCATOR_PORT = "mapreduce.output.gfoutputformat.locatorport"; - public static final String SERVER_HOST = "mapreduce.output.gfoutputformat.serverhost"; - public static final String SERVER_PORT = "mapreduce.output.gfoutputformat.serverport"; - - @Override - public RecordWriter<Object, Object> getRecordWriter(TaskAttemptContext context) - throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - ClientCache cache = getClientCacheInstance(conf); - return new GFRecordWriter(cache, context.getConfiguration()); - } - - public ClientCache getClientCacheInstance(Configuration conf) { - // if locator host is provided create a client cache instance using - // connection to locator. If locator is not provided and server host is also - // not provided, connect using default locator - ClientCache cache; - String serverHost = conf.get(SERVER_HOST); - if (serverHost == null || serverHost.isEmpty()) { - cache = createGFWriterUsingLocator(conf); - } else { - cache = createGFWriterUsingServer(conf); - } - return cache; - } - - /** - * Creates instance of {@link ClientCache} by connecting to GF cluster through - * locator - */ - public ClientCache createGFWriterUsingLocator(Configuration conf) { - // if locator host is not provided assume localhost - String locator = conf.get(LOCATOR_HOST, - ConnectionEndpointConverter.DEFAULT_LOCATOR_HOST); - // if locator port is not provided assume default locator port 10334 - int port = conf.getInt(LOCATOR_PORT, - ConnectionEndpointConverter.DEFAULT_LOCATOR_PORT); - - // create gemfire client cache instance - ClientCacheFactory ccf = new ClientCacheFactory(); - ccf.addPoolLocator(locator, port); - ClientCache cache = ccf.create(); - return cache; - } - - /** - * Creates instance of {@link ClientCache} by connecting to GF cluster through - * GF server - */ - public ClientCache createGFWriterUsingServer(Configuration conf) { - String server = conf.get(SERVER_HOST); - // if server port is not provided assume default server port, 40404 - int port = conf.getInt(SERVER_PORT, CacheServer.DEFAULT_PORT); - - // create gemfire client cache instance - ClientCacheFactory ccf = new ClientCacheFactory(); - ccf.addPoolServer(server, port); - ClientCache cache = ccf.create(); - return cache; - } - - public Region<Object, Object> getRegionInstance(Configuration conf, - ClientCache cache) { - Region<Object, Object> region; - - // create gemfire region in proxy mode - String regionName = conf.get(REGION); - ClientRegionFactory<Object, Object> regionFactory = cache - .createClientRegionFactory(ClientRegionShortcut.PROXY); - try { - region = regionFactory.create(regionName); - } catch (RegionExistsException e) { - region = cache.getRegion(regionName); - } - - return region; - } - - /** - * Puts a K-V pair in region - * @param region - * @param key - * @param value - */ - public void executePut(Region<Object, Object> region, Object key, Object value) { - region.put(key, value); - } - - /** - * Closes client cache instance - * @param clientCache - */ - public void closeClientCache(ClientCache clientCache) { - if (clientCache != null && !clientCache.isClosed()) { - clientCache.close(); - } - } - - /** - * Validates correctness and completeness of job's output configuration - * - * @param conf - * @throws InvalidJobConfException - */ - protected void validateConfiguration(Configuration conf) - throws InvalidJobConfException { - // User must configure the output region name. - String region = conf.get(REGION); - if (region == null || region.trim().isEmpty()) { - throw new InvalidJobConfException("Output Region name not provided."); - } - - // TODO validate if a client connected to gemfire cluster can be created - } - - @Override - public void checkOutputSpecs(JobContext context) throws IOException, - InterruptedException { - Configuration conf = context.getConfiguration(); - validateConfiguration(conf); - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException, InterruptedException { - return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), - context); - } - - public class GFRecordWriter extends RecordWriter<Object, Object> { - private ClientCache clientCache; - private Region<Object, Object> region; - - public GFRecordWriter(ClientCache cache, Configuration conf) { - this.clientCache = cache; - region = getRegionInstance(conf, clientCache); - } - - @Override - public void write(Object key, Object value) throws IOException, - InterruptedException { - executePut(region, key, value); - } - - @Override - public void close(TaskAttemptContext context) throws IOException, - InterruptedException { - closeClientCache(clientCache); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java deleted file mode 100644 index 869ad0d..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HDFSSplitIterator.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.IOException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; - -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplog; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogSetReader.HoplogIterator; -import com.gemstone.gemfire.i18n.LogWriterI18n; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; - -/** - * Iterates over the records in part of a hoplog. This iterator - * is passed from the map reduce job into the gemfirexd LanguageConnectionContext - * for gemfirexd to use as the iterator during the map phase. - * - */ -public abstract class HDFSSplitIterator { - // data object for holding path, offset and length, of all the blocks this - // iterator needs to iterate on - private CombineFileSplit split; - - // the following members are pointers to current hoplog which is being - // iterated upon - private int currentHopIndex = 0; - private AbstractHoplog hoplog; - protected HoplogIterator<byte[], byte[]> iterator; - byte[] key; - byte[] value; - - private long bytesRead; - protected long RECORD_OVERHEAD = 8; - - private long startTime = 0l; - private long endTime = 0l; - - protected FileSystem fs; - private static final Logger logger = LogService.getLogger(); - protected final String logPrefix = "<" + "HDFSSplitIterator" + "> "; - - public HDFSSplitIterator(FileSystem fs, Path[] paths, long[] offsets, long[] lengths, long startTime, long endTime) throws IOException { - this.fs = fs; - this.split = new CombineFileSplit(paths, offsets, lengths, null); - while(currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){ - logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex))); - currentHopIndex++; - } - if(currentHopIndex == split.getNumPaths()){ - this.hoplog = null; - iterator = null; - } else { - this.hoplog = getHoplog(fs,split.getPath(currentHopIndex)); - iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex)); - } - this.startTime = startTime; - this.endTime = endTime; - } - - /** - * Get the appropriate iterator for the file type. - */ - public static HDFSSplitIterator newInstance(FileSystem fs, Path[] path, - long[] start, long[] len, long startTime, long endTime) - throws IOException { - String fileName = path[0].getName(); - if (fileName.endsWith(AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION)) { - return new StreamSplitIterator(fs, path, start, len, startTime, endTime); - } else { - return new RWSplitIterator(fs, path, start, len, startTime, endTime); - } - } - - public final boolean hasNext() throws IOException { - while (currentHopIndex < split.getNumPaths()) { - if (iterator != null) { - if(iterator.hasNext()) { - return true; - } else { - iterator.close(); - iterator = null; - hoplog.close(); - hoplog = null; - } - } - - if (iterator == null) { - // Iterator is null if this is first read from this iterator or all the - // entries from the previous iterator have been read. create iterator on - // the next hoplog. - currentHopIndex++; - while (currentHopIndex < split.getNumPaths() && !fs.exists(split.getPath(currentHopIndex))){ - logger.warn(LocalizedMessage.create(LocalizedStrings.HOPLOG_CLEANED_UP_BY_JANITOR, split.getPath(currentHopIndex).toString())); - currentHopIndex++; - } - if (currentHopIndex >= split.getNumPaths()) { - return false; - } - hoplog = getHoplog(fs, split.getPath(currentHopIndex)); - iterator = hoplog.getReader().scan(split.getOffset(currentHopIndex), split.getLength(currentHopIndex)); - } - } - - return false; - } - - public final boolean next() throws IOException { - while (hasNext()) { - key = iterator.next(); - value = iterator.getValue(); - bytesRead += (key.length + value.length); - bytesRead += RECORD_OVERHEAD; - - // if any filter is set, check if the event's timestamp matches the - // filter. The events returned by the iterator may not be time ordered. So - // it is important to check filters everytime. - if (startTime > 0 || endTime > 0) { - try { - PersistedEventImpl event = getDeserializedValue(); - long timestamp = event.getTimstamp(); - if (startTime > 0l && timestamp < startTime) { - continue; - } - - if (endTime > 0l && timestamp > endTime) { - continue; - } - } catch (ClassNotFoundException e) { - throw new HDFSIOException("Error reading from HDFS", e); - } - } - - return true; - } - - return false; - } - - public final long getBytesRead() { - return this.bytesRead; - } - - public final byte[] getKey() { - return key; - } - - public abstract PersistedEventImpl getDeserializedValue() - throws ClassNotFoundException, IOException; - - protected abstract AbstractHoplog getHoplog(FileSystem fs, Path path) - throws IOException; - - public final byte[] getValue() { - return value; - } - - public final long getLength() { - return split.getLength(); - } - - public void close() throws IOException { - if (iterator != null) { - iterator.close(); - iterator = null; - } - - if (hoplog != null) { - hoplog.close(); - hoplog.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f0a8a55b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java deleted file mode 100644 index c4c0d1c..0000000 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/hdfs/internal/hoplog/mapreduce/HoplogUtil.java +++ /dev/null @@ -1,463 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; - -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.AbstractHoplogOrganizer.HoplogComparator; -import com.gemstone.gemfire.cache.hdfs.internal.hoplog.HoplogConfig; - -public class HoplogUtil { - /** - * @param regionPath - * HDFS path of the region - * @param fs - * file system associated with the region - * @param type - * type of hoplog to be fetched; flush hoplog or sequence hoplog - * @return All hoplog file paths belonging to the region provided - * @throws IOException - */ - public static Collection<FileStatus> getAllRegionHoplogs(Path regionPath, - FileSystem fs, String type) throws IOException { - return getRegionHoplogs(regionPath, fs, type, 0, 0); - } - - /** - * @param regionPath - * Region path - * @param fs - * file system associated with the region - * @param type - * type of hoplog to be fetched; flush hoplog or sequence hoplog - * @param start - * Exclude files that do not contain records mutated after start time - * @param end - * Exclude files that do not contain records mutated before end time - * @return All hoplog file paths belonging to the region provided - * @throws IOException - */ - public static Collection<FileStatus> getRegionHoplogs(Path regionPath, - FileSystem fs, String type, long start, long end) throws IOException { - Collection<Collection<FileStatus>> allBuckets = getBucketHoplogs( - regionPath, fs, type, start, end); - - ArrayList<FileStatus> hoplogs = new ArrayList<FileStatus>(); - for (Collection<FileStatus> bucket : allBuckets) { - for (FileStatus file : bucket) { - hoplogs.add(file); - } - } - return hoplogs; - } - - public static Collection<Collection<FileStatus>> getBucketHoplogs(Path regionPath, - FileSystem fs, String type, long start, long end) throws IOException { - Collection<Collection<FileStatus>> allBuckets = new ArrayList<Collection<FileStatus>>(); - - // hoplog files names follow this pattern - String HOPLOG_NAME_REGEX = AbstractHoplogOrganizer.HOPLOG_NAME_REGEX + type; - String EXPIRED_HOPLOG_NAME_REGEX = HOPLOG_NAME_REGEX + AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION; - final Pattern pattern = Pattern.compile(HOPLOG_NAME_REGEX); - final Pattern expiredPattern = Pattern.compile(EXPIRED_HOPLOG_NAME_REGEX); - - Path cleanUpIntervalPath = new Path(regionPath.getParent(), HoplogConfig.CLEAN_UP_INTERVAL_FILE_NAME); - long intervalDurationMillis = readCleanUpIntervalMillis(fs, cleanUpIntervalPath); - - // a region directory contains directories for individual buckets. A bucket - // has a integer name. - FileStatus[] bucketDirs = fs.listStatus(regionPath); - - for (FileStatus bucket : bucketDirs) { - if (!bucket.isDirectory()) { - continue; - } - try { - Integer.valueOf(bucket.getPath().getName()); - } catch (NumberFormatException e) { - continue; - } - - ArrayList<FileStatus> bucketHoplogs = new ArrayList<FileStatus>(); - - // identify all the flush hoplogs and seq hoplogs by visiting all the - // bucket directories - FileStatus[] bucketFiles = fs.listStatus(bucket.getPath()); - - Map<String, Long> expiredHoplogs = getExpiredHoplogs(fs, bucketFiles, expiredPattern); - - FileStatus oldestHopAfterEndTS = null; - long oldestHopTS = Long.MAX_VALUE; - long currentTimeStamp = System.currentTimeMillis(); - for (FileStatus file : bucketFiles) { - if (!file.isFile()) { - continue; - } - - Matcher match = pattern.matcher(file.getPath().getName()); - if (!match.matches()) { - continue; - } - - long timeStamp = AbstractHoplogOrganizer.getHoplogTimestamp(match); - if (start > 0 && timeStamp < start) { - // this hoplog contains records less than the start time stamp - continue; - } - - if (end > 0 && timeStamp > end) { - // this hoplog contains records mutated after end time stamp. Ignore - // this hoplog if it is not the oldest. - if (oldestHopTS > timeStamp) { - oldestHopTS = timeStamp; - oldestHopAfterEndTS = file; - } - continue; - } - long expiredTimeStamp = expiredTime(file, expiredHoplogs); - if (expiredTimeStamp > 0 && intervalDurationMillis > 0) { - if ((currentTimeStamp - expiredTimeStamp) > 0.8 * intervalDurationMillis) { - continue; - } - } - bucketHoplogs.add(file); - } - - if (oldestHopAfterEndTS != null) { - long expiredTimeStamp = expiredTime(oldestHopAfterEndTS, expiredHoplogs); - if (expiredTimeStamp <= 0 || intervalDurationMillis <=0 || - (currentTimeStamp - expiredTimeStamp) <= 0.8 * intervalDurationMillis) { - bucketHoplogs.add(oldestHopAfterEndTS); - } - } - - if (bucketHoplogs.size() > 0) { - allBuckets.add(bucketHoplogs); - } - } - - return allBuckets; - } - - private static Map<String, Long> getExpiredHoplogs(FileSystem fs, FileStatus[] bucketFiles, - Pattern expiredPattern) throws IOException{ - Map<String, Long> expiredHoplogs = new HashMap<String,Long>(); - - for(FileStatus file : bucketFiles) { - if(!file.isFile()) { - continue; - } - String fileName = file.getPath().getName(); - Matcher match = expiredPattern.matcher(fileName); - if (!match.matches()){ - continue; - } - expiredHoplogs.put(fileName,file.getModificationTime()); - } - return expiredHoplogs; - } - - private static long expiredTime(FileStatus file, Map<String, Long> expiredHoplogs){ - String expiredMarkerName = file.getPath().getName() + - AbstractHoplogOrganizer.EXPIRED_HOPLOG_EXTENSION; - - long expiredTimeStamp = -1; - if (expiredHoplogs.containsKey(expiredMarkerName)) { - expiredTimeStamp = expiredHoplogs.get(expiredMarkerName); - } - return expiredTimeStamp; - } - - public static long readCleanUpIntervalMillis(FileSystem fs, Path cleanUpIntervalPath) throws IOException{ - if (fs.exists(cleanUpIntervalPath)) { - FSDataInputStream input = new FSDataInputStream(fs.open(cleanUpIntervalPath)); - long intervalDurationMillis = input.readLong(); - input.close(); - return intervalDurationMillis; - } else { - return -1l; - } - } - - public static void exposeCleanupIntervalMillis(FileSystem fs, Path path, long intervalDurationMillis){ - FSDataInputStream input = null; - FSDataOutputStream output = null; - try { - if(fs.exists(path)){ - input = new FSDataInputStream(fs.open(path)); - if (intervalDurationMillis == input.readLong()) { - input.close(); - return; - } - input.close(); - fs.delete(path, true); - } - output = fs.create(path); - output.writeLong(intervalDurationMillis); - output.close(); - } catch (IOException e) { - return; - } finally { - try { - if (input != null){ - input.close(); - } - if (output != null) { - output.close(); - } - } catch(IOException e2) { - - } - } - } - - /** - * @param regionPath - * @param fs - * @return list of latest checkpoint files of all buckets in the region - * @throws IOException - */ - public static Collection<FileStatus> getCheckpointFiles(Path regionPath, - FileSystem fs) throws IOException { - ArrayList<FileStatus> latestSnapshots = new ArrayList<FileStatus>(); - - Collection<Collection<FileStatus>> allBuckets = getBucketHoplogs( - regionPath, fs, AbstractHoplogOrganizer.MAJOR_HOPLOG_EXTENSION, 0, 0); - - // extract the latest major compacted hoplog from each bucket - for (Collection<FileStatus> bucket : allBuckets) { - FileStatus latestSnapshot = null; - for (FileStatus file : bucket) { - if (latestSnapshot == null) { - latestSnapshot = file; - } else { - String name1 = latestSnapshot.getPath().getName(); - String name2 = file.getPath().getName(); - - if (HoplogComparator.compareByName(name1, name2) > 0) { - latestSnapshot = file; - } - } - } - - if (latestSnapshot != null) { - latestSnapshots.add(latestSnapshot); - } - } - - return latestSnapshots; - } - - /** - * Creates a mapping of hoplog to hdfs blocks on disk - * - * @param files - * list of hoplog file status objects - * @return array of hdfs block location objects associated with a hoplog - * @throws IOException - */ - public static Map<FileStatus, BlockLocation[]> getBlocks(Configuration config, - Collection<FileStatus> files) throws IOException { - Map<FileStatus, BlockLocation[]> blocks = new HashMap<FileStatus, BlockLocation[]>(); - if (files == null || files.isEmpty()) { - return blocks; - } - - FileSystem fs = files.iterator().next().getPath().getFileSystem(config); - - for (FileStatus hoplog : files) { - long length = hoplog.getLen(); - BlockLocation[] fileBlocks = fs.getFileBlockLocations(hoplog, 0, length); - blocks.put(hoplog, fileBlocks); - } - - return blocks; - } - - /** - * Filters out hoplogs of a region that do not match time filters and creates - * a list of hoplogs that may be used by hadoop jobs. - * - * @param fs - * file system instance - * @param path - * region path - * @param start - * start time in milliseconds - * @param end - * end time in milliseconds - * @param snapshot - * if true latest snapshot hoplog will be included in the final - * return list - * @return filtered collection of hoplogs - * @throws IOException - */ - public static Collection<FileStatus> filterHoplogs(FileSystem fs, Path path, - long start, long end, boolean snapshot) throws IOException { - ArrayList<FileStatus> hoplogs = new ArrayList<FileStatus>(); - - // if the region contains flush hoplogs or major compacted files then the - // region is of type RW. - // check if the intent is to operate on major compacted files only - if (snapshot) { - hoplogs.addAll(getCheckpointFiles(path, fs)); - } else { - hoplogs.addAll(getRegionHoplogs(path, fs, - AbstractHoplogOrganizer.FLUSH_HOPLOG_EXTENSION, start, end)); - } - - if (hoplogs == null || hoplogs.isEmpty()) { - // there are no sorted hoplogs. Check if sequence hoplogs are present - // there is no checkpoint mode for write only tables - hoplogs.addAll(getRegionHoplogs(path, fs, - AbstractHoplogOrganizer.SEQ_HOPLOG_EXTENSION, start, end)); - } - - return hoplogs == null ? new ArrayList<FileStatus>() : hoplogs; - } - - private HoplogUtil() { - //static methods only. - } - - /** - * This class creates MR splits from hoplog files. This class leverages - * CombineFileInputFormat to create locality, node and rack, aware splits - * - */ - public static class HoplogOptimizedSplitter extends CombineFileInputFormat<Long, Long> { - private Collection<FileStatus> hoplogs; - - public HoplogOptimizedSplitter(Collection<FileStatus> hoplogs) { - this.hoplogs = hoplogs; - } - - @Override - protected List<FileStatus> listStatus(JobContext job) throws IOException { - /** - * listStatus in super collects fileStatus for each file again. It also - * tries to recursively list files in subdirectories. None of this is - * applicable in this case. Splitter has already collected fileStatus for - * all files. So bypassing super's method will improve performance as NN - * chatter will be reduced. Specially helpful if NN is not colocated. - */ - return new ArrayList<FileStatus>(hoplogs); - } - - /** - * Creates an array of splits for the input list of hoplogs. Each split is - * roughly the size of an hdfs block. Hdfs blocks of a hoplog may be smaller - * than hdfs block size, for e.g. if the hoplog is very small. The method - * keeps adding hdfs blocks of a hoplog to a split till the split is less - * than hdfs block size and the block is local to the split. - */ - public List<InputSplit> getOptimizedSplits(Configuration conf) throws IOException { - - if (hoplogs == null || hoplogs.isEmpty()) { - return null; - } - Path[] paths = new Path[hoplogs.size()]; - int i = 0; - for (FileStatus file : hoplogs) { - paths[i] = file.getPath(); - i++; - } - - FileStatus hoplog = hoplogs.iterator().next(); - long blockSize = hoplog.getBlockSize(); - setMaxSplitSize(blockSize); - - Job job = Job.getInstance(conf); - setInputPaths(job, paths); - List<InputSplit> splits = super.getSplits(job); - - // in some cases a split may not get populated with host location - // information. If such a split is created, fill location information of - // the first file in the split - ArrayList<CombineFileSplit> newSplits = new ArrayList<CombineFileSplit>(); - for (Iterator<InputSplit> iter = splits.iterator(); iter.hasNext();) { - CombineFileSplit split = (CombineFileSplit) iter.next(); - if (split.getLocations() != null && split.getLocations().length > 0) { - continue; - } - - paths = split.getPaths(); - if (paths.length == 0) { - continue; - } - long[] starts = split.getStartOffsets(); - long[] ends = split.getLengths(); - - FileSystem fs = paths[0].getFileSystem(conf); - FileStatus file = fs.getFileStatus(paths[0]); - BlockLocation[] blks = fs.getFileBlockLocations(file, starts[0], ends[0]); - if (blks != null && blks.length > 0) { - // hosts found. Need to create a new split and replace the one missing - // hosts. - iter.remove(); - String hosts[] = blks[0].getHosts(); - split = new CombineFileSplit(paths, starts, ends, hosts); - newSplits.add(split); - } - } - splits.addAll(newSplits); - - return splits; - } - - @Override - public List<InputSplit> getSplits(JobContext job) throws IOException { - // a call to this method is invalid. This class is only meant to create - // optimized splits independent of the api type - throw new IllegalStateException(); - } - - @Override - public RecordReader<Long, Long> createRecordReader(InputSplit split, - TaskAttemptContext arg1) throws IOException { - // Record reader creation is managed by GFInputFormat. This method should - // not be called - throw new IllegalStateException(); - } - } -}