sodonnel commented on a change in pull request #2507: URL: https://github.com/apache/ozone/pull/2507#discussion_r691135987
########## File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java ########## @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.client.io; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.CanUnbuffer; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; +import org.apache.hadoop.hdds.scm.storage.BlockInputStream; +import org.apache.hadoop.hdds.scm.storage.ByteArrayReader; +import org.apache.hadoop.hdds.scm.storage.ByteBufferReader; +import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Class to read data from an EC Block Group. + */ +public class ECBlockInputStream extends InputStream + implements Seekable, CanUnbuffer, ByteBufferReadable { + + private static final int EOF = -1; + + private final ECReplicationConfig repConfig; + private final int ecChunkSize; + private final BlockInputStreamFactory streamFactory; + private final boolean verifyChecksum; + private final OmKeyLocationInfo blockInfo; + private final DatanodeDetails[] dataLocations; + private final DatanodeDetails[] parityLocations; + private final BlockInputStream[] blockStreams; + private final int maxLocations; + + private int position = 0; + private boolean closed = false; + + public ECBlockInputStream(ECReplicationConfig repConfig, int ecChunkSize, + OmKeyLocationInfo blockInfo, boolean verifyChecksum, + BlockInputStreamFactory streamFactory) { + this.repConfig = repConfig; + this.ecChunkSize = ecChunkSize; + this.verifyChecksum = verifyChecksum; + this.blockInfo = blockInfo; + this.streamFactory = streamFactory; + this.maxLocations = repConfig.getData() + repConfig.getParity(); + this.dataLocations = new DatanodeDetails[repConfig.getData()]; + this.parityLocations = new DatanodeDetails[repConfig.getParity()]; + this.blockStreams = new BlockInputStream[repConfig.getData()]; + + setBlockLocations(this.blockInfo.getPipeline()); + } + + public synchronized boolean hasSufficientLocations() { + // Until we implement "on the fly" recovery, all data location must be + // present and we have enough locations if that is the case. + // + // The number of locations needed is a function of the EC Chunk size. If the + // block length is <= the chunk size, we should only have location 1. If it + // is greater than the chunk size but less than chunk_size * 2, then we must + // have two locations. If it is greater than chunk_size * data_num, then we + // must have all data_num locations. + int expectedDataBlocks = + (int)Math.min( + Math.ceil((double)blockInfo.getLength() / ecChunkSize), + repConfig.getData()); + for (int i=0; i<expectedDataBlocks; i++) { + if (dataLocations[i] == null) { + return false; + } + } + return true; + } + + /** + * Using the current position, returns the index of the blockStream we should + * be reading from. This is the index in the internal array holding the + * stream reference. The block group index will be one greater than this. + * @return + */ + private int currentStreamIndex() { + return ((position / ecChunkSize) % repConfig.getData()); + } + + /** + * Uses the current position and ecChunkSize to determine which of the + * internal block streams the next read should come from. Also opens the + * stream if it has not been opened already. + * @return BlockInput stream to read from. + */ + private BlockInputStream getOrOpenStream() { + int ind = currentStreamIndex(); + BlockInputStream stream = blockStreams[ind]; + if (stream == null) { + // To read an EC block, we create a STANDALONE pipeline that contains the + // single location for the block index we want to read. The EC blocks are + // indexed from 1 to N, however the data locations are stored in the + // dataLocations array indexed from zero. + Pipeline pipeline = Pipeline.newBuilder() + .setReplicationConfig(new StandaloneReplicationConfig( + HddsProtos.ReplicationFactor.ONE)) + .setNodes(Arrays.asList(dataLocations[ind])) + .setId(PipelineID.randomId()) Review comment: Yes I believe so. After the containers are closed, SCM creates a "read pipeline" to let you read the data. This is created on the fly for each location request, and is really just a list of locations. The pipeline object is reused for this. This means that a random ID is created for these read pipelines. The DN does not seem to care about the pipeline ID you used, but the ID needs to be there to build the pipeline object or it complains. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
