http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java deleted file mode 100644 index 503112d..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ /dev/null @@ -1,236 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.ozoneimpl; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.statemachine.StateContext; -import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc; -import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; -import org.apache.hadoop.ozone.container.common.volume.HddsVolume; -import org.apache.hadoop.ozone.container.common.volume.VolumeSet; - -import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; -import org.apache.hadoop.ozone.container.replication - .OnDemandContainerReplicationSource; -import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; - -/** - * Ozone main class sets up the network servers and initializes the container - * layer. - */ -public class OzoneContainer { - - public static final Logger LOG = LoggerFactory.getLogger( - OzoneContainer.class); - - private final HddsDispatcher hddsDispatcher; - private final DatanodeDetails dnDetails; - private final OzoneConfiguration config; - private final VolumeSet volumeSet; - private final ContainerSet containerSet; - private final Map<ReplicationType, XceiverServerSpi> servers; - - /** - * Construct OzoneContainer object. - * @param datanodeDetails - * @param conf - * @throws DiskOutOfSpaceException - * @throws IOException - */ - public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration - conf, StateContext context) throws IOException { - this.dnDetails = datanodeDetails; - this.config = conf; - this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf); - this.containerSet = new ContainerSet(); - buildContainerSet(); - hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet, - context); - servers = new HashMap<>(); - servers.put(ReplicationType.STAND_ALONE, - new XceiverServerGrpc(datanodeDetails, config, hddsDispatcher, - createReplicationService())); - servers.put(ReplicationType.RATIS, XceiverServerRatis - .newXceiverServerRatis(datanodeDetails, config, hddsDispatcher, - context)); - } - - private GrpcReplicationService createReplicationService() { - return new GrpcReplicationService( - new OnDemandContainerReplicationSource(containerSet)); - } - - /** - * Build's container map. - */ - public void buildContainerSet() { - Iterator<HddsVolume> volumeSetIterator = volumeSet.getVolumesList() - .iterator(); - ArrayList<Thread> volumeThreads = new ArrayList<Thread>(); - - //TODO: diskchecker should be run before this, to see how disks are. - // And also handle disk failure tolerance need to be added - while (volumeSetIterator.hasNext()) { - HddsVolume volume = volumeSetIterator.next(); - File hddsVolumeRootDir = volume.getHddsRootDir(); - Thread thread = new Thread(new ContainerReader(volumeSet, volume, - containerSet, config)); - thread.start(); - volumeThreads.add(thread); - } - - try { - for (int i = 0; i < volumeThreads.size(); i++) { - volumeThreads.get(i).join(); - } - } catch (InterruptedException ex) { - LOG.info("Volume Threads Interrupted exception", ex); - } - - } - - /** - * Starts serving requests to ozone container. - * - * @throws IOException - */ - public void start() throws IOException { - LOG.info("Attempting to start container services."); - for (XceiverServerSpi serverinstance : servers.values()) { - serverinstance.start(); - } - hddsDispatcher.init(); - } - - /** - * Stop Container Service on the datanode. - */ - public void stop() { - //TODO: at end of container IO integration work. - LOG.info("Attempting to stop container services."); - for(XceiverServerSpi serverinstance: servers.values()) { - serverinstance.stop(); - } - hddsDispatcher.shutdown(); - } - - - @VisibleForTesting - public ContainerSet getContainerSet() { - return containerSet; - } - /** - * Returns container report. - * @return - container report. - * @throws IOException - */ - public StorageContainerDatanodeProtocolProtos.ContainerReportsProto - getContainerReport() throws IOException { - return this.containerSet.getContainerReport(); - } - - public PipelineReportsProto getPipelineReport() { - PipelineReportsProto.Builder pipelineReportsProto = - PipelineReportsProto.newBuilder(); - for (XceiverServerSpi serverInstance : servers.values()) { - pipelineReportsProto - .addAllPipelineReport(serverInstance.getPipelineReport()); - } - return pipelineReportsProto.build(); - } - - /** - * Submit ContainerRequest. - * @param request - * @param replicationType - * @param pipelineID - */ - public void submitContainerRequest( - ContainerProtos.ContainerCommandRequestProto request, - ReplicationType replicationType, - PipelineID pipelineID) throws IOException { - LOG.info("submitting {} request over {} server for container {}", - request.getCmdType(), replicationType, request.getContainerID()); - Preconditions.checkState(servers.containsKey(replicationType)); - servers.get(replicationType).submitRequest(request, pipelineID); - } - - private int getPortByType(ReplicationType replicationType) { - return servers.containsKey(replicationType) ? - servers.get(replicationType).getIPCPort() : INVALID_PORT; - } - - /** - * Returns the container servers IPC port. - * - * @return Container servers IPC port. - */ - public int getContainerServerPort() { - return getPortByType(ReplicationType.STAND_ALONE); - } - - /** - * Returns the Ratis container Server IPC port. - * - * @return Ratis port. - */ - public int getRatisContainerServerPort() { - return getPortByType(ReplicationType.RATIS); - } - - /** - * Returns node report of container storage usage. - */ - public StorageContainerDatanodeProtocolProtos.NodeReportProto getNodeReport() - throws IOException { - return volumeSet.getNodeReport(); - } - - @VisibleForTesting - public ContainerDispatcher getDispatcher() { - return this.hddsDispatcher; - } - - public VolumeSet getVolumeSet() { - return volumeSet; - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/package-info.java deleted file mode 100644 index c99c038..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.ozoneimpl; -/** - Ozone main that calls into the container layer -**/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java deleted file mode 100644 index 9511241..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.io.Closeable; -import java.nio.file.Path; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; - -/** - * Service to download container data from other datanodes. - * <p> - * The implementation of this interface should copy the raw container data in - * compressed form to working directory. - * <p> - * A smart implementation would use multiple sources to do parallel download. - */ -public interface ContainerDownloader extends Closeable { - - CompletableFuture<Path> getContainerDataFromReplicas(long containerId, - List<DatanodeDetails> sources); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java deleted file mode 100644 index 69582f7..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicationSource.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Contract to prepare provide the container in binary form.. - * <p> - * Prepare will be called when container is closed. An implementation could - * precache any binary representation of a container and store the pre packede - * images. - */ -public interface ContainerReplicationSource { - - /** - * Prepare for the replication. - * - * @param containerId The name of the container the package. - */ - void prepare(long containerId); - - /** - * Copy the container data to an output stream. - * - * @param containerId Container to replicate - * @param destination The destination stream to copy all the container data. - * @throws IOException - */ - void copyData(long containerId, OutputStream destination) - throws IOException; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java deleted file mode 100644 index 827b9d6..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerReplicator.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -/** - * Service to do the real replication task. - * - * An implementation should download the container and im - */ -public interface ContainerReplicator { - void replicate(ReplicationTask task); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java deleted file mode 100644 index f7fd8a4..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerStreamingOutput.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.StreamingOutput; -import java.io.IOException; -import java.io.OutputStream; - -/** - * JAX-RS streaming output to return the binary container data. - */ -public class ContainerStreamingOutput implements StreamingOutput { - - private long containerId; - - private ContainerReplicationSource containerReplicationSource; - - public ContainerStreamingOutput(long containerId, - ContainerReplicationSource containerReplicationSource) { - this.containerId = containerId; - this.containerReplicationSource = containerReplicationSource; - } - - @Override - public void write(OutputStream outputStream) - throws IOException, WebApplicationException { - containerReplicationSource.copyData(containerId, outputStream); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java deleted file mode 100644 index 5ef5841..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.io.FileInputStream; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.List; -import java.util.concurrent.CompletableFuture; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.interfaces.Handler; -import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; -import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Default replication implementation. - * <p> - * This class does the real job. Executes the download and import the container - * to the container set. - */ -public class DownloadAndImportReplicator implements ContainerReplicator { - - private static final Logger LOG = - LoggerFactory.getLogger(DownloadAndImportReplicator.class); - - private final ContainerSet containerSet; - - private final ContainerDispatcher containerDispatcher; - - private final ContainerDownloader downloader; - - private final TarContainerPacker packer; - - public DownloadAndImportReplicator( - ContainerSet containerSet, - ContainerDispatcher containerDispatcher, - ContainerDownloader downloader, - TarContainerPacker packer) { - this.containerSet = containerSet; - this.containerDispatcher = containerDispatcher; - this.downloader = downloader; - this.packer = packer; - } - - public void importContainer(long containerID, Path tarFilePath) { - try { - ContainerData originalContainerData; - try (FileInputStream tempContainerTarStream = new FileInputStream( - tarFilePath.toFile())) { - byte[] containerDescriptorYaml = - packer.unpackContainerDescriptor(tempContainerTarStream); - originalContainerData = ContainerDataYaml.readContainer( - containerDescriptorYaml); - } - - try (FileInputStream tempContainerTarStream = new FileInputStream( - tarFilePath.toFile())) { - - Handler handler = containerDispatcher.getHandler( - originalContainerData.getContainerType()); - - Container container = handler.importContainer(containerID, - originalContainerData.getMaxSize(), - tempContainerTarStream, - packer); - - containerSet.addContainer(container); - } - - } catch (Exception e) { - LOG.error( - "Can't import the downloaded container data id=" + containerID, - e); - try { - Files.delete(tarFilePath); - } catch (Exception ex) { - LOG.error( - "Container import is failed and the downloaded file can't be " - + "deleted: " - + tarFilePath.toAbsolutePath().toString()); - } - } - } - - @Override - public void replicate(ReplicationTask task) { - long containerID = task.getContainerId(); - - List<DatanodeDetails> sourceDatanodes = task.getSources(); - - LOG.info("Starting replication of container {} from {}", containerID, - sourceDatanodes); - - CompletableFuture<Path> tempTarFile = downloader - .getContainerDataFromReplicas(containerID, - sourceDatanodes); - - try { - //wait for the download. This thread pool is limiting the paralell - //downloads, so it's ok to block here and wait for the full download. - Path path = tempTarFile.get(); - LOG.info("Container {} is downloaded, starting to import.", - containerID); - importContainer(containerID, path); - LOG.info("Container {} is replicated successfully", containerID); - task.setStatus(Status.DONE); - } catch (Exception e) { - LOG.error("Container replication was unsuccessful .", e); - task.setStatus(Status.FAILED); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java deleted file mode 100644 index 3aafb0c..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.replication; - -import java.io.BufferedOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.concurrent.CompletableFuture; - -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .CopyContainerRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .CopyContainerResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto - .IntraDatanodeProtocolServiceGrpc; -import org.apache.hadoop.hdds.protocol.datanode.proto - .IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceStub; -import org.apache.hadoop.ozone.OzoneConfigKeys; - -import com.google.common.base.Preconditions; -import org.apache.ratis.shaded.io.grpc.ManagedChannel; -import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Client to read container data from Grpc. - */ -public class GrpcReplicationClient { - - private static final Logger LOG = - LoggerFactory.getLogger(GrpcReplicationClient.class); - - private final ManagedChannel channel; - - private final IntraDatanodeProtocolServiceStub client; - - private final Path workingDirectory; - - public GrpcReplicationClient(String host, - int port, Path workingDir) { - - channel = NettyChannelBuilder.forAddress(host, port) - .usePlaintext() - .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE) - .build(); - client = IntraDatanodeProtocolServiceGrpc.newStub(channel); - this.workingDirectory = workingDir; - - } - - public CompletableFuture<Path> download(long containerId) { - CopyContainerRequestProto request = - CopyContainerRequestProto.newBuilder() - .setContainerID(containerId) - .setLen(-1) - .setReadOffset(0) - .build(); - - CompletableFuture<Path> response = new CompletableFuture<>(); - - Path destinationPath = - getWorkingDirectory().resolve("container-" + containerId + ".tar.gz"); - - client.download(request, - new StreamDownloader(containerId, response, destinationPath)); - return response; - } - - private Path getWorkingDirectory() { - return workingDirectory; - } - - public void shutdown() { - channel.shutdown(); - } - - /** - * Grpc stream observer to ComletableFuture adapter. - */ - public static class StreamDownloader - implements StreamObserver<CopyContainerResponseProto> { - - private final CompletableFuture<Path> response; - - private final long containerId; - - private BufferedOutputStream stream; - - private Path outputPath; - - public StreamDownloader(long containerId, CompletableFuture<Path> response, - Path outputPath) { - this.response = response; - this.containerId = containerId; - this.outputPath = outputPath; - try { - outputPath = Preconditions.checkNotNull(outputPath); - Path parentPath = Preconditions.checkNotNull(outputPath.getParent()); - Files.createDirectories(parentPath); - stream = - new BufferedOutputStream(new FileOutputStream(outputPath.toFile())); - } catch (IOException e) { - throw new RuntimeException("OutputPath can't be used: " + outputPath, - e); - } - - } - - @Override - public void onNext(CopyContainerResponseProto chunk) { - try { - stream.write(chunk.getData().toByteArray()); - } catch (IOException e) { - response.completeExceptionally(e); - } - } - - @Override - public void onError(Throwable throwable) { - try { - stream.close(); - LOG.error("Container download was unsuccessfull", throwable); - try { - Files.delete(outputPath); - } catch (IOException ex) { - LOG.error( - "Error happened during the download but can't delete the " - + "temporary destination.", ex); - } - response.completeExceptionally(throwable); - } catch (IOException e) { - response.completeExceptionally(e); - } - } - - @Override - public void onCompleted() { - try { - stream.close(); - LOG.info("Container is downloaded to {}", outputPath); - response.complete(outputPath); - } catch (IOException e) { - response.completeExceptionally(e); - } - - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java deleted file mode 100644 index d8f696f..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.replication; - -import java.io.ByteArrayOutputStream; -import java.io.Closeable; -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .CopyContainerRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .CopyContainerResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto - .IntraDatanodeProtocolServiceGrpc; - -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Service to make containers available for replication. - */ -public class GrpcReplicationService extends - IntraDatanodeProtocolServiceGrpc.IntraDatanodeProtocolServiceImplBase { - - private static final Logger LOG = - LoggerFactory.getLogger(GrpcReplicationService.class); - - private final ContainerReplicationSource containerReplicationSource; - - public GrpcReplicationService( - ContainerReplicationSource containerReplicationSource) { - this.containerReplicationSource = containerReplicationSource; - } - - @Override - public void download(CopyContainerRequestProto request, - StreamObserver<CopyContainerResponseProto> responseObserver) { - LOG.info("Streaming container data ({}) to other datanode", - request.getContainerID()); - try { - GrpcOutputStream outputStream = - new GrpcOutputStream(responseObserver, request.getContainerID()); - containerReplicationSource - .copyData(request.getContainerID(), outputStream); - - } catch (IOException e) { - LOG.error("Can't stream the container data", e); - responseObserver.onError(e); - } - } - - private static class GrpcOutputStream extends OutputStream - implements Closeable { - - private static final int BUFFER_SIZE_IN_BYTES = 1024 * 1024; - - private final StreamObserver<CopyContainerResponseProto> responseObserver; - - private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - - private long containerId; - - private int readOffset = 0; - - private int writtenBytes; - - GrpcOutputStream( - StreamObserver<CopyContainerResponseProto> responseObserver, - long containerId) { - this.responseObserver = responseObserver; - this.containerId = containerId; - } - - @Override - public void write(int b) throws IOException { - try { - buffer.write(b); - if (buffer.size() > BUFFER_SIZE_IN_BYTES) { - flushBuffer(false); - } - } catch (Exception ex) { - responseObserver.onError(ex); - } - } - - private void flushBuffer(boolean eof) { - if (buffer.size() > 0) { - CopyContainerResponseProto response = - CopyContainerResponseProto.newBuilder() - .setContainerID(containerId) - .setData(ByteString.copyFrom(buffer.toByteArray())) - .setEof(eof) - .setReadOffset(readOffset) - .setLen(buffer.size()) - .build(); - responseObserver.onNext(response); - readOffset += buffer.size(); - writtenBytes += buffer.size(); - buffer.reset(); - } - } - - @Override - public void close() throws IOException { - flushBuffer(true); - LOG.info("{} bytes written to the rpc stream from container {}", - writtenBytes, containerId); - responseObserver.onCompleted(); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java deleted file mode 100644 index d557b54..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.io.IOException; -import java.io.OutputStream; - -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerPacker; -import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; - -import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A naive implementation of the replication source which creates a tar file - * on-demand without pre-create the compressed archives. - */ -public class OnDemandContainerReplicationSource - implements ContainerReplicationSource { - - private static final Logger LOG = - LoggerFactory.getLogger(ContainerReplicationSource.class); - - private ContainerSet containerSet; - - private ContainerPacker packer = new TarContainerPacker(); - - public OnDemandContainerReplicationSource( - ContainerSet containerSet) { - this.containerSet = containerSet; - } - - @Override - public void prepare(long containerId) { - - } - - @Override - public void copyData(long containerId, OutputStream destination) - throws IOException { - - Container container = containerSet.getContainer(containerId); - - Preconditions - .checkNotNull(container, "Container is not found " + containerId); - - switch (container.getContainerType()) { - case KeyValueContainer: - packer.pack(container, - destination); - break; - default: - LOG.warn("Container type " + container.getContainerType() - + " is not replicable as no compression algorithm for that."); - } - - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java deleted file mode 100644 index 1d8d5f6..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; - -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; -import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Single point to schedule the downloading tasks based on priorities. - */ -public class ReplicationSupervisor { - - private static final Logger LOG = - LoggerFactory.getLogger(ReplicationSupervisor.class); - - private final Set<Worker> threadPool = new HashSet<>(); - - private final Map<Long, ReplicationTask> queue = new TreeMap(); - - private final ContainerSet containerSet; - - private final ContainerReplicator replicator; - - private final int poolSize; - - public ReplicationSupervisor( - ContainerSet containerSet, - ContainerReplicator replicator, int poolSize) { - this.containerSet = containerSet; - this.replicator = replicator; - this.poolSize = poolSize; - } - - public synchronized void addTask(ReplicationTask task) { - queue.putIfAbsent(task.getContainerId(), task); - synchronized (threadPool) { - threadPool.notify(); - } - } - - public void start() { - for (int i = 0; i < poolSize; i++) { - Worker worker = new Worker(); - Thread thread = new Thread(worker, "ContainerReplication-" + i); - thread.setDaemon(true); - thread.start(); - threadPool.add(worker); - } - } - - public synchronized ReplicationTask selectTask() { - for (ReplicationTask task : queue.values()) { - if (task.getStatus() == Status.QUEUED) { - if (containerSet.getContainer(task.getContainerId()) == null) { - task.setStatus(Status.DOWNLOADING); - return task; - } else { - LOG.debug("Container {} has already been downloaded.", - task.getContainerId()); - queue.remove(task.getContainerId()); - } - } else if (task.getStatus() == Status.FAILED) { - LOG.error( - "Container {} can't be downloaded from any of the datanodes.", - task.getContainerId()); - queue.remove(task.getContainerId()); - } else if (task.getStatus() == Status.DONE) { - queue.remove(task.getContainerId()); - LOG.info("Container {} is replicated.", task.getContainerId()); - } - } - //no available task. - return null; - } - - public void stop() { - for (Worker worker : threadPool) { - worker.stop(); - } - } - - @VisibleForTesting - public int getQueueSize() { - return queue.size(); - } - - private class Worker implements Runnable { - - private boolean running = true; - - @Override - public void run() { - try { - while (running) { - ReplicationTask task = selectTask(); - if (task == null) { - synchronized (threadPool) { - threadPool.wait(); - } - } else { - replicator.replicate(task); - } - } - } catch (Exception ex) { - LOG.error("Error on doing replication", ex); - try { - Thread.sleep(200); - } catch (InterruptedException e) { - LOG.error("Error on waiting after failed replication task", e); - } - } - } - - public void stop() { - running = false; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java deleted file mode 100644 index 9019811..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.time.Instant; -import java.util.List; -import java.util.Objects; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; - -/** - * The task to download a container from the sources. - */ -public class ReplicationTask { - - private volatile Status status = Status.QUEUED; - - private final long containerId; - - private List<DatanodeDetails> sources; - - private final Instant queued = Instant.now(); - - public ReplicationTask(long containerId, - List<DatanodeDetails> sources) { - this.containerId = containerId; - this.sources = sources; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ReplicationTask that = (ReplicationTask) o; - return containerId == that.containerId; - } - - @Override - public int hashCode() { - return Objects.hash(containerId); - } - - public long getContainerId() { - return containerId; - } - - public List<DatanodeDetails> getSources() { - return sources; - } - - public Status getStatus() { - return status; - } - - public void setStatus( - Status status) { - this.status = status; - } - - @Override - public String toString() { - return "ReplicationTask{" + - "status=" + status + - ", containerId=" + containerId + - ", sources=" + sources + - ", queued=" + queued + - '}'; - } - - public Instant getQueued() { - return queued; - } - - /** - * Status of the replication. - */ - public enum Status { - QUEUED, - DOWNLOADING, - FAILED, - DONE - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java deleted file mode 100644 index a461a98..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.replication; - -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; -import org.apache.hadoop.ozone.OzoneConfigKeys; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Simple ContainerDownloaderImplementation to download the missing container - * from the first available datanode. - * <p> - * This is not the most effective implementation as it uses only one source - * for he container download. - */ -public class SimpleContainerDownloader implements ContainerDownloader { - - private static final Logger LOG = - LoggerFactory.getLogger(SimpleContainerDownloader.class); - - private final Path workingDirectory; - - private ExecutorService executor; - - public SimpleContainerDownloader(Configuration conf) { - - String workDirString = - conf.get(OzoneConfigKeys.OZONE_CONTAINER_COPY_WORKDIR); - - if (workDirString == null) { - workingDirectory = Paths.get(System.getProperty("java.io.tmpdir")) - .resolve("container-copy"); - } else { - workingDirectory = Paths.get(workDirString); - } - - ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Container downloader thread - %d").build(); - executor = Executors.newSingleThreadExecutor(build); - LOG.info("Starting container downloader service to copy " - + "containers to replicate."); - } - - @Override - public CompletableFuture<Path> getContainerDataFromReplicas(long containerId, - List<DatanodeDetails> sourceDatanodes) { - - CompletableFuture<Path> result = null; - for (DatanodeDetails datanode : sourceDatanodes) { - try { - - if (result == null) { - GrpcReplicationClient grpcReplicationClient = - new GrpcReplicationClient(datanode.getIpAddress(), - datanode.getPort(Name.STANDALONE).getValue(), - workingDirectory); - result = grpcReplicationClient.download(containerId); - } else { - result = result.thenApply(CompletableFuture::completedFuture) - .exceptionally(t -> { - LOG.error("Error on replicating container: " + containerId, t); - GrpcReplicationClient grpcReplicationClient = - new GrpcReplicationClient(datanode.getIpAddress(), - datanode.getPort(Name.STANDALONE).getValue(), - workingDirectory); - return grpcReplicationClient.download(containerId); - }).thenCompose(Function.identity()); - } - } catch (Exception ex) { - LOG.error(String.format( - "Container %s download from datanode %s was unsuccessful. " - + "Trying the next datanode", containerId, datanode), ex); - } - - } - return result; - - } - - @Override - public void close() throws IOException { - try { - executor.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOG.error("Can't stop container downloader gracefully", e); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java deleted file mode 100644 index 38a853c..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; -/** - Classes to replicate container data between datanodes. -**/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/package-info.java deleted file mode 100644 index 1a51012..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone; - -/** - * Generic ozone specific classes. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java deleted file mode 100644 index e3b5370..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.protocol; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; - -import java.io.IOException; - -/** - * The protocol spoken between datanodes and SCM. For specifics please the - * Protoc file that defines this protocol. - */ -@InterfaceAudience.Private -public interface StorageContainerDatanodeProtocol { - /** - * Returns SCM version. - * @return Version info. - */ - SCMVersionResponseProto getVersion(SCMVersionRequestProto versionRequest) - throws IOException; - - /** - * Used by data node to send a Heartbeat. - * @param heartbeat Heartbeat - * @return - SCMHeartbeatResponseProto - * @throws IOException - */ - SCMHeartbeatResponseProto sendHeartbeat(SCMHeartbeatRequestProto heartbeat) - throws IOException; - - /** - * Register Datanode. - * @param datanodeDetails - Datanode Details. - * @param nodeReport - Node Report. - * @param containerReportsRequestProto - Container Reports. - * @return SCM Command. - */ - SCMRegisteredResponseProto register( - DatanodeDetailsProto datanodeDetails, - NodeReportProto nodeReport, - ContainerReportsProto containerReportsRequestProto, - PipelineReportsProto pipelineReports) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java deleted file mode 100644 index b3c3eb3..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.protocol; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; -import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; -import org.apache.hadoop.ozone.protocol.commands.SCMCommand; - -import java.util.List; - -/** - * The protocol spoken between datanodes and SCM. - * - * Please note that the full protocol spoken between a datanode and SCM is - * separated into 2 interfaces. One interface that deals with node state and - * another interface that deals with containers. - * - * This interface has functions that deals with the state of datanode. - */ -@InterfaceAudience.Private -public interface StorageContainerNodeProtocol { - /** - * Gets the version info from SCM. - * @param versionRequest - version Request. - * @return - returns SCM version info and other required information needed - * by datanode. - */ - VersionResponse getVersion(SCMVersionRequestProto versionRequest); - - /** - * Register the node if the node finds that it is not registered with any SCM. - * @param datanodeDetails DatanodeDetails - * @param nodeReport NodeReportProto - * @param pipelineReport PipelineReportsProto - * @return SCMHeartbeatResponseProto - */ - RegisteredCommand register(DatanodeDetails datanodeDetails, - NodeReportProto nodeReport, - PipelineReportsProto pipelineReport); - - /** - * Send heartbeat to indicate the datanode is alive and doing well. - * @param datanodeDetails - Datanode ID. - * @return SCMheartbeat response list - */ - List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails); - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java deleted file mode 100644 index 4d328d3..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/VersionResponse.java +++ /dev/null @@ -1,154 +0,0 @@ - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.protocol; - -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.KeyValue; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * Version response class. - */ -public class VersionResponse { - private final int version; - private final Map<String, String> values; - - /** - * Creates a version response class. - * @param version - * @param values - */ - public VersionResponse(int version, Map<String, String> values) { - this.version = version; - this.values = values; - } - - /** - * Creates a version Response class. - * @param version - */ - public VersionResponse(int version) { - this.version = version; - this.values = new HashMap<>(); - } - - /** - * Returns a new Builder. - * @return - Builder. - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Returns this class from protobuf message. - * @param response - SCMVersionResponseProto - * @return VersionResponse - */ - public static VersionResponse getFromProtobuf(SCMVersionResponseProto - response) { - return new VersionResponse(response.getSoftwareVersion(), - response.getKeysList().stream() - .collect(Collectors.toMap(KeyValue::getKey, - KeyValue::getValue))); - } - - /** - * Adds a value to version Response. - * @param key - String - * @param value - String - */ - public void put(String key, String value) { - if (this.values.containsKey(key)) { - throw new IllegalArgumentException("Duplicate key in version response"); - } - values.put(key, value); - } - - /** - * Return a protobuf message. - * @return SCMVersionResponseProto. - */ - public SCMVersionResponseProto getProtobufMessage() { - - List<KeyValue> list = new LinkedList<>(); - for (Map.Entry<String, String> entry : values.entrySet()) { - list.add(KeyValue.newBuilder().setKey(entry.getKey()). - setValue(entry.getValue()).build()); - } - return - SCMVersionResponseProto.newBuilder() - .setSoftwareVersion(this.version) - .addAllKeys(list).build(); - } - - public String getValue(String key) { - return this.values.get(key); - } - - /** - * Builder class. - */ - public static class Builder { - private int version; - private Map<String, String> values; - - Builder() { - values = new HashMap<>(); - } - - /** - * Sets the version. - * @param ver - version - * @return Builder - */ - public Builder setVersion(int ver) { - this.version = ver; - return this; - } - - /** - * Adds a value to version Response. - * @param key - String - * @param value - String - */ - public Builder addValue(String key, String value) { - if (this.values.containsKey(key)) { - throw new IllegalArgumentException("Duplicate key in version response"); - } - values.put(key, value); - return this; - } - - /** - * Builds the version response. - * @return VersionResponse. - */ - public VersionResponse build() { - return new VersionResponse(this.version, this.values); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java deleted file mode 100644 index c2c20a4..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.protocol.commands; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CloseContainerCommandProto; -import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; - -/** - * Asks datanode to close a container. - */ -public class CloseContainerCommand - extends SCMCommand<CloseContainerCommandProto> { - - private HddsProtos.ReplicationType replicationType; - private PipelineID pipelineID; - - public CloseContainerCommand(long containerID, - HddsProtos.ReplicationType replicationType, - PipelineID pipelineID) { - super(containerID); - this.replicationType = replicationType; - this.pipelineID = pipelineID; - } - - /** - * Returns the type of this command. - * - * @return Type - */ - @Override - public SCMCommandProto.Type getType() { - return SCMCommandProto.Type.closeContainerCommand; - } - - /** - * Gets the protobuf message of this object. - * - * @return A protobuf message. - */ - @Override - public byte[] getProtoBufMessage() { - return getProto().toByteArray(); - } - - public CloseContainerCommandProto getProto() { - return CloseContainerCommandProto.newBuilder() - .setContainerID(getId()) - .setCmdId(getId()) - .setReplicationType(replicationType) - .setPipelineID(pipelineID.getProtobuf()) - .build(); - } - - public static CloseContainerCommand getFromProtobuf( - CloseContainerCommandProto closeContainerProto) { - Preconditions.checkNotNull(closeContainerProto); - return new CloseContainerCommand(closeContainerProto.getCmdId(), - closeContainerProto.getReplicationType(), - PipelineID.getFromProtobuf(closeContainerProto.getPipelineID())); - } - - public long getContainerID() { - return getId(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java deleted file mode 100644 index 69337fb..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandForDatanode.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.protocol.commands; - -import java.util.UUID; - -import com.google.protobuf.GeneratedMessage; -import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; - -/** - * Command for the datanode with the destination address. - */ -public class CommandForDatanode<T extends GeneratedMessage> implements - IdentifiableEventPayload { - - private final UUID datanodeId; - - private final SCMCommand<T> command; - - public CommandForDatanode(UUID datanodeId, SCMCommand<T> command) { - this.datanodeId = datanodeId; - this.command = command; - } - - public UUID getDatanodeId() { - return datanodeId; - } - - public SCMCommand<T> getCommand() { - return command; - } - - public long getId() { - return command.getId(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java deleted file mode 100644 index 4b3ce84..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CommandStatus.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.ozone.protocol.commands; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.CommandStatus.Status; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; - -/** - * A class that is used to communicate status of datanode commands. - */ -public class CommandStatus { - - private SCMCommandProto.Type type; - private Long cmdId; - private Status status; - private String msg; - - CommandStatus(Type type, Long cmdId, Status status, String msg) { - this.type = type; - this.cmdId = cmdId; - this.status = status; - this.msg = msg; - } - - public Type getType() { - return type; - } - - public Long getCmdId() { - return cmdId; - } - - public Status getStatus() { - return status; - } - - public String getMsg() { - return msg; - } - - /** - * To allow change of status once commandStatus is initialized. - * - * @param status - */ - public void setStatus(Status status) { - this.status = status; - } - - public void setStatus(boolean cmdExecuted) { - setStatus(cmdExecuted ? Status.EXECUTED : Status.FAILED); - } - - /** - * Returns a CommandStatus from the protocol buffers. - * - * @param cmdStatusProto - protoBuf Message - * @return CommandStatus - */ - public CommandStatus getFromProtoBuf( - StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) { - return CommandStatusBuilder.newBuilder() - .setCmdId(cmdStatusProto.getCmdId()) - .setStatus(cmdStatusProto.getStatus()) - .setType(cmdStatusProto.getType()) - .setMsg(cmdStatusProto.getMsg()) - .build(); - } - /** - * Returns a CommandStatus from the protocol buffers. - * - * @return StorageContainerDatanodeProtocolProtos.CommandStatus - */ - public StorageContainerDatanodeProtocolProtos.CommandStatus - getProtoBufMessage() { - StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder = - StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder() - .setCmdId(this.getCmdId()) - .setStatus(this.getStatus()) - .setType(this.getType()); - if (this.getMsg() != null) { - builder.setMsg(this.getMsg()); - } - return builder.build(); - } - - /** - * Builder class for CommandStatus. - */ - public static class CommandStatusBuilder { - - private SCMCommandProto.Type type; - private Long cmdId; - private StorageContainerDatanodeProtocolProtos.CommandStatus.Status status; - private String msg; - - CommandStatusBuilder() { - } - - public static CommandStatusBuilder newBuilder() { - return new CommandStatusBuilder(); - } - - public Type getType() { - return type; - } - - public Long getCmdId() { - return cmdId; - } - - public Status getStatus() { - return status; - } - - public String getMsg() { - return msg; - } - - public CommandStatusBuilder setType(Type commandType) { - this.type = commandType; - return this; - } - - public CommandStatusBuilder setCmdId(Long commandId) { - this.cmdId = commandId; - return this; - } - - public CommandStatusBuilder setStatus(Status commandStatus) { - this.status = commandStatus; - return this; - } - - public CommandStatusBuilder setMsg(String message) { - this.msg = message; - return this; - } - - public CommandStatus build() { - return new CommandStatus(type, cmdId, status, msg); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java deleted file mode 100644 index 2659ab3..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlockCommandStatus.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.protocol.commands; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto; - -public class DeleteBlockCommandStatus extends CommandStatus { - - private ContainerBlocksDeletionACKProto blocksDeletionAck = null; - - public DeleteBlockCommandStatus(Type type, Long cmdId, - StorageContainerDatanodeProtocolProtos.CommandStatus.Status status, - String msg, ContainerBlocksDeletionACKProto blocksDeletionAck) { - super(type, cmdId, status, msg); - this.blocksDeletionAck = blocksDeletionAck; - } - - public void setBlocksDeletionAck( - ContainerBlocksDeletionACKProto deletionAck) { - blocksDeletionAck = deletionAck; - } - - @Override - public CommandStatus getFromProtoBuf( - StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) { - return DeleteBlockCommandStatusBuilder.newBuilder() - .setBlockDeletionAck(cmdStatusProto.getBlockDeletionAck()) - .setCmdId(cmdStatusProto.getCmdId()) - .setStatus(cmdStatusProto.getStatus()) - .setType(cmdStatusProto.getType()) - .setMsg(cmdStatusProto.getMsg()) - .build(); - } - - @Override - public StorageContainerDatanodeProtocolProtos.CommandStatus getProtoBufMessage() { - StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder = - StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder() - .setCmdId(this.getCmdId()) - .setStatus(this.getStatus()) - .setType(this.getType()); - if (blocksDeletionAck != null) { - builder.setBlockDeletionAck(blocksDeletionAck); - } - if (this.getMsg() != null) { - builder.setMsg(this.getMsg()); - } - return builder.build(); - } - - public static final class DeleteBlockCommandStatusBuilder - extends CommandStatusBuilder { - private ContainerBlocksDeletionACKProto blocksDeletionAck = null; - - public static DeleteBlockCommandStatusBuilder newBuilder() { - return new DeleteBlockCommandStatusBuilder(); - } - - public DeleteBlockCommandStatusBuilder setBlockDeletionAck( - ContainerBlocksDeletionACKProto deletionAck) { - this.blocksDeletionAck = deletionAck; - return this; - } - - @Override - public CommandStatus build() { - return new DeleteBlockCommandStatus(getType(), getCmdId(), getStatus(), - getMsg(), blocksDeletionAck); - } - - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/2c392da8/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java deleted file mode 100644 index 07feeff..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.protocol.commands; - -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMCommandProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.DeleteBlocksCommandProto; - -import java.util.List; - -/** - * A SCM command asks a datanode to delete a number of blocks. - */ -public class DeleteBlocksCommand extends - SCMCommand<DeleteBlocksCommandProto> { - - private List<DeletedBlocksTransaction> blocksTobeDeleted; - - - public DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks) { - super(); - this.blocksTobeDeleted = blocks; - } - - // Should be called only for protobuf conversion - private DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks, - long id) { - super(id); - this.blocksTobeDeleted = blocks; - } - - public List<DeletedBlocksTransaction> blocksTobeDeleted() { - return this.blocksTobeDeleted; - } - - @Override - public SCMCommandProto.Type getType() { - return SCMCommandProto.Type.deleteBlocksCommand; - } - - @Override - public byte[] getProtoBufMessage() { - return getProto().toByteArray(); - } - - public static DeleteBlocksCommand getFromProtobuf( - DeleteBlocksCommandProto deleteBlocksProto) { - return new DeleteBlocksCommand(deleteBlocksProto - .getDeletedBlocksTransactionsList(), deleteBlocksProto.getCmdId()); - } - - public DeleteBlocksCommandProto getProto() { - return DeleteBlocksCommandProto.newBuilder() - .setCmdId(getId()) - .addAllDeletedBlocksTransactions(blocksTobeDeleted).build(); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org