http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java deleted file mode 100644 index af0c1db..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/CBlockCli.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.cli; - -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionBuilder; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.hadoop.cblock.client.CBlockVolumeClient; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.PrintStream; -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * The command line tool class. - */ -public class CBlockCli extends Configured implements Tool { - - private static final String CREATE_VOLUME = "createVolume"; - - private static final String DELETE_VOLUME = "deleteVolume"; - - private static final String INFO_VOLUME = "infoVolume"; - - private static final String LIST_VOLUME = "listVolume"; - - private static final String SERVER_ADDR = "serverAddr"; - - private static final String HELP = "help"; - - private static final Logger LOG = - LoggerFactory.getLogger(CBlockCli.class); - private OzoneConfiguration conf; - - private PrintStream printStream; - - private Options options; - - private BasicParser parser; - - private CBlockVolumeClient localProxy; - - public CBlockCli(OzoneConfiguration conf, PrintStream printStream) - throws IOException { - this.printStream = printStream; - this.conf = conf; - this.options = getOptions(); - this.parser = new BasicParser(); - } - - public CBlockCli(OzoneConfiguration conf) throws IOException{ - this(conf, System.out); - } - - private CommandLine parseArgs(String[] argv) - throws ParseException { - return parser.parse(options, argv); - } - - private static Options getOptions() { - Options options = new Options(); - Option serverAddress = OptionBuilder - .withArgName("serverAddress>:<serverPort") - .withLongOpt(SERVER_ADDR) - .withValueSeparator(':') - .hasArgs(2) - .withDescription("specify server address:port") - .create("s"); - options.addOption(serverAddress); - - // taking 4 args: userName, volumeName, volumeSize, blockSize - Option createVolume = OptionBuilder - .withArgName("user> <volume> <volumeSize in [GB/TB]> <blockSize") - .withLongOpt(CREATE_VOLUME) - .withValueSeparator(' ') - .hasArgs(4) - .withDescription("create a fresh new volume") - .create("c"); - options.addOption(createVolume); - - // taking 2 args: userName, volumeName - Option deleteVolume = OptionBuilder - .withArgName("user> <volume") - .withLongOpt(DELETE_VOLUME) - .hasArgs(2) - .withDescription("delete a volume") - .create("d"); - options.addOption(deleteVolume); - - // taking 2 args: userName, volumeName - Option infoVolume = OptionBuilder - .withArgName("user> <volume") - .withLongOpt(INFO_VOLUME) - .hasArgs(2) - .withDescription("info a volume") - .create("i"); - options.addOption(infoVolume); - - // taking 1 arg: userName - Option listVolume = OptionBuilder - .withArgName("user") - .withLongOpt(LIST_VOLUME) - .hasOptionalArgs(1) - .withDescription("list all volumes") - .create("l"); - options.addOption(listVolume); - - Option help = OptionBuilder - .withLongOpt(HELP) - .withDescription("help") - .create("h"); - options.addOption(help); - - return options; - } - - @Override - public int run(String[] args) throws ParseException, IOException { - CommandLine commandLine = parseArgs(args); - if (commandLine.hasOption("s")) { - String[] serverAddrArgs = commandLine.getOptionValues("s"); - LOG.info("server address" + Arrays.toString(serverAddrArgs)); - String serverHost = serverAddrArgs[0]; - int serverPort = Integer.parseInt(serverAddrArgs[1]); - InetSocketAddress serverAddress = - new InetSocketAddress(serverHost, serverPort); - this.localProxy = new CBlockVolumeClient(conf, serverAddress); - } else { - this.localProxy = new CBlockVolumeClient(conf); - } - - if (commandLine.hasOption("h")) { - LOG.info("help"); - help(); - } - - if (commandLine.hasOption("c")) { - String[] createArgs = commandLine.getOptionValues("c"); - LOG.info("create volume:" + Arrays.toString(createArgs)); - createVolume(createArgs); - } - - if (commandLine.hasOption("d")) { - String[] deleteArgs = commandLine.getOptionValues("d"); - LOG.info("delete args:" + Arrays.toString(deleteArgs)); - deleteVolume(deleteArgs); - } - - if (commandLine.hasOption("l")) { - String[] listArg = commandLine.getOptionValues("l"); - LOG.info("list args:" + Arrays.toString(listArg)); - listVolume(listArg); - } - - if (commandLine.hasOption("i")) { - String[] infoArgs = commandLine.getOptionValues("i"); - LOG.info("info args:" + Arrays.toString(infoArgs)); - infoVolume(infoArgs); - } - return 0; - } - - public static void main(String[] argv) throws Exception { - OzoneConfiguration cblockConf = new OzoneConfiguration(); - RPC.setProtocolEngine(cblockConf, CBlockServiceProtocolPB.class, - ProtobufRpcEngine.class); - int res = 0; - Tool shell = new CBlockCli(cblockConf, System.out); - try { - ToolRunner.run(shell, argv); - } catch (Exception ex) { - LOG.error(ex.toString()); - res = 1; - } - System.exit(res); - } - - public static long parseSize(String volumeSizeArgs) throws IOException { - long multiplier = 1; - - Pattern p = Pattern.compile("([0-9]+)([a-zA-Z]+)"); - Matcher m = p.matcher(volumeSizeArgs); - - if (!m.find()) { - throw new IOException("Invalid volume size args " + volumeSizeArgs); - } - - int size = Integer.parseInt(m.group(1)); - String s = m.group(2); - - if (s.equalsIgnoreCase("MB") || - s.equalsIgnoreCase("Mi")) { - multiplier = 1024L * 1024; - } else if (s.equalsIgnoreCase("GB") || - s.equalsIgnoreCase("Gi")) { - multiplier = 1024L * 1024 * 1024; - } else if (s.equalsIgnoreCase("TB") || - s.equalsIgnoreCase("Ti")) { - multiplier = 1024L * 1024 * 1024 * 1024; - } else { - throw new IOException("Invalid volume size args " + volumeSizeArgs); - } - return size * multiplier; - } - - private void createVolume(String[] createArgs) throws IOException { - String userName = createArgs[0]; - String volumeName = createArgs[1]; - long volumeSize = parseSize(createArgs[2]); - int blockSize = Integer.parseInt(createArgs[3])*1024; - localProxy.createVolume(userName, volumeName, volumeSize, blockSize); - } - - private void deleteVolume(String[] deleteArgs) throws IOException { - String userName = deleteArgs[0]; - String volumeName = deleteArgs[1]; - boolean force = false; - if (deleteArgs.length > 2) { - force = Boolean.parseBoolean(deleteArgs[2]); - } - localProxy.deleteVolume(userName, volumeName, force); - } - - private void infoVolume(String[] infoArgs) throws IOException { - String userName = infoArgs[0]; - String volumeName = infoArgs[1]; - VolumeInfo volumeInfo = localProxy.infoVolume(userName, volumeName); - printStream.println(volumeInfo.toString()); - } - - private void listVolume(String[] listArgs) throws IOException { - StringBuilder stringBuilder = new StringBuilder(); - List<VolumeInfo> volumeResponse; - if (listArgs == null) { - volumeResponse = localProxy.listVolume(null); - } else { - volumeResponse = localProxy.listVolume(listArgs[0]); - } - for (int i = 0; i<volumeResponse.size(); i++) { - stringBuilder.append( - String.format("%s:%s\t%d\t%d", volumeResponse.get(i).getUserName(), - volumeResponse.get(i).getVolumeName(), - volumeResponse.get(i).getVolumeSize(), - volumeResponse.get(i).getBlockSize())); - if (i < volumeResponse.size() - 1) { - stringBuilder.append("\n"); - } - } - printStream.println(stringBuilder); - } - - private void help() { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(100, "cblock", "", options, ""); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/package-info.java deleted file mode 100644 index b8b1889..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/cli/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.cli; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java deleted file mode 100644 index ec38c9c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockServiceProtocolClientSideTranslatorPB.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.client; - -import com.google.protobuf.ServiceException; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.proto.CBlockServiceProtocol; -import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos; -import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.ipc.ProtobufHelper; -import org.apache.hadoop.ipc.ProtocolTranslator; -import org.apache.hadoop.ipc.RPC; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -/** - * The client side implement of CBlockServiceProtocol. - */ -@InterfaceAudience.Private -public final class CBlockServiceProtocolClientSideTranslatorPB - implements CBlockServiceProtocol, ProtocolTranslator, Closeable { - - private final CBlockServiceProtocolPB rpcProxy; - - public CBlockServiceProtocolClientSideTranslatorPB( - CBlockServiceProtocolPB rpcProxy) { - this.rpcProxy = rpcProxy; - } - - @Override - public void close() throws IOException { - RPC.stopProxy(rpcProxy); - } - - @Override - public void createVolume(String userName, String volumeName, - long volumeSize, int blockSize) throws IOException { - CBlockServiceProtocolProtos.CreateVolumeRequestProto.Builder req = - CBlockServiceProtocolProtos.CreateVolumeRequestProto.newBuilder(); - req.setUserName(userName); - req.setVolumeName(volumeName); - req.setVolumeSize(volumeSize); - req.setBlockSize(blockSize); - try { - rpcProxy.createVolume(null, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public void deleteVolume(String userName, String volumeName, boolean force) - throws IOException { - CBlockServiceProtocolProtos.DeleteVolumeRequestProto.Builder req = - CBlockServiceProtocolProtos.DeleteVolumeRequestProto.newBuilder(); - req.setUserName(userName); - req.setVolumeName(volumeName); - req.setForce(force); - try { - rpcProxy.deleteVolume(null, req.build()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public Object getUnderlyingProxyObject() { - return rpcProxy; - } - - @Override - public VolumeInfo infoVolume(String userName, String volumeName) - throws IOException { - CBlockServiceProtocolProtos.InfoVolumeRequestProto.Builder req = - CBlockServiceProtocolProtos.InfoVolumeRequestProto.newBuilder(); - req.setUserName(userName); - req.setVolumeName(volumeName); - try { - CBlockServiceProtocolProtos.InfoVolumeResponseProto resp = - rpcProxy.infoVolume(null, req.build()); - return new VolumeInfo(resp.getVolumeInfo().getUserName(), - resp.getVolumeInfo().getVolumeName(), - resp.getVolumeInfo().getVolumeSize(), - resp.getVolumeInfo().getBlockSize(), - resp.getVolumeInfo().getUsage()); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public List<VolumeInfo> listVolume(String userName) throws IOException { - CBlockServiceProtocolProtos.ListVolumeRequestProto.Builder req = - CBlockServiceProtocolProtos.ListVolumeRequestProto.newBuilder(); - if (userName != null) { - req.setUserName(userName); - } - try { - CBlockServiceProtocolProtos.ListVolumeResponseProto resp = - rpcProxy.listVolume(null, req.build()); - List<VolumeInfo> respList = new ArrayList<>(); - for (CBlockServiceProtocolProtos.VolumeInfoProto entry : - resp.getVolumeEntryList()) { - VolumeInfo volumeInfo = new VolumeInfo( - entry.getUserName(), entry.getVolumeName(), entry.getVolumeSize(), - entry.getBlockSize()); - respList.add(volumeInfo); - } - return respList; - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } catch (Exception e) { - throw new IOException("got" + e.getCause() + " " + e.getMessage()); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java deleted file mode 100644 index 9ed54e3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.client; - -import org.apache.hadoop.cblock.CBlockConfigKeys; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; -import org.apache.hadoop.io.retry.RetryPolicies; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.ozone.client.OzoneClientUtils; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.security.UserGroupInformation; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * Implementation of client used by CBlock command line tool. - */ -public class CBlockVolumeClient { - private final CBlockServiceProtocolClientSideTranslatorPB cblockClient; - - public CBlockVolumeClient(OzoneConfiguration conf) throws IOException { - this(conf, null); - } - - public CBlockVolumeClient(OzoneConfiguration conf, - InetSocketAddress serverAddress) throws IOException { - InetSocketAddress address = serverAddress != null ? serverAddress : - OzoneClientUtils.getCblockServiceRpcAddr(conf); - long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class); - int rpcTimeout = Math.toIntExact( - conf.getTimeDuration(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT, - CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS)); - cblockClient = new CBlockServiceProtocolClientSideTranslatorPB( - RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version, - address, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), rpcTimeout, RetryPolicies - .retryUpToMaximumCountWithFixedSleep( - 300, 1, TimeUnit.SECONDS)).getProxy()); - } - - public void createVolume(String userName, String volumeName, - long volumeSize, int blockSize) throws IOException { - cblockClient.createVolume(userName, volumeName, - volumeSize, blockSize); - } - - public void deleteVolume(String userName, String volumeName, boolean force) - throws IOException { - cblockClient.deleteVolume(userName, volumeName, force); - } - - public VolumeInfo infoVolume(String userName, String volumeName) - throws IOException { - return cblockClient.infoVolume(userName, volumeName); - } - - public List<VolumeInfo> listVolume(String userName) - throws IOException { - return cblockClient.listVolume(userName); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/package-info.java deleted file mode 100644 index 761b71e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.client; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java deleted file mode 100644 index 8f6b82b..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/CBlockException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.exception; - -import java.io.IOException; - -/** - * The exception class used in CBlock. - */ -public class CBlockException extends IOException { - public CBlockException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/package-info.java deleted file mode 100644 index 268b8cb..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/exception/package-info.java +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.exception; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java deleted file mode 100644 index 04fe3a4..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import com.google.common.base.Preconditions; -import com.google.common.primitives.Longs; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.AsyncBlockWriter; -import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.scm.storage.ContainerProtocolCalls; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.utils.LevelDBStore; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Paths; - -/** - * The blockWriter task. - */ -public class BlockWriterTask implements Runnable { - private final LogicalBlock block; - private int tryCount; - private final ContainerCacheFlusher flusher; - private final String dbPath; - private final String fileName; - private final int maxRetryCount; - - /** - * Constructs a BlockWriterTask. - * - * @param block - Block Information. - * @param flusher - ContainerCacheFlusher. - */ - public BlockWriterTask(LogicalBlock block, ContainerCacheFlusher flusher, - String dbPath, int tryCount, String fileName, int maxRetryCount) { - this.block = block; - this.flusher = flusher; - this.dbPath = dbPath; - this.tryCount = tryCount; - this.fileName = fileName; - this.maxRetryCount = maxRetryCount; - } - - /** - * When an object implementing interface <code>Runnable</code> is used - * to create a thread, starting the thread causes the object's - * <code>run</code> method to be called in that separately executing - * thread. - * <p> - * The general contract of the method <code>run</code> is that it may - * take any action whatsoever. - * - * @see Thread#run() - */ - @Override - public void run() { - String containerName = null; - XceiverClientSpi client = null; - LevelDBStore levelDBStore = null; - String traceID = flusher.getTraceID(new File(dbPath), block.getBlockID()); - flusher.getLOG().debug( - "Writing block to remote. block ID: {}", block.getBlockID()); - try { - incTryCount(); - Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID()); - client = flusher.getXceiverClientManager().acquireClient(pipeline); - containerName = pipeline.getContainerName(); - byte[] keybuf = Longs.toByteArray(block.getBlockID()); - byte[] data; - long startTime = Time.monotonicNow(); - levelDBStore = flusher.getCacheDB(this.dbPath); - data = levelDBStore.get(keybuf); - Preconditions.checkNotNull(data); - long endTime = Time.monotonicNow(); - Preconditions.checkState(data.length > 0, "Block data is zero length"); - startTime = Time.monotonicNow(); - ContainerProtocolCalls.writeSmallFile(client, containerName, - Long.toString(block.getBlockID()), data, traceID); - endTime = Time.monotonicNow(); - flusher.getTargetMetrics().updateContainerWriteLatency( - endTime - startTime); - flusher.getLOG().debug("Time taken for Write Small File : {} ms", - endTime - startTime); - - flusher.incrementRemoteIO(); - - } catch (Exception ex) { - flusher.getLOG().error("Writing of block:{} failed, We have attempted " + - "to write this block {} times to the container {}.Trace ID:{}", - block.getBlockID(), this.getTryCount(), containerName, traceID, ex); - writeRetryBlock(block); - if (ex instanceof IOException) { - flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks(); - } else { - flusher.getTargetMetrics().incNumWriteGenericExceptionRetryBlocks(); - } - if (this.getTryCount() >= maxRetryCount) { - flusher.getTargetMetrics().incNumWriteMaxRetryBlocks(); - } - } finally { - flusher.incFinishCount(fileName); - if (levelDBStore != null) { - flusher.releaseCacheDB(dbPath); - } - if(client != null) { - flusher.getXceiverClientManager().releaseClient(client); - } - } - } - - - private void writeRetryBlock(LogicalBlock currentBlock) { - boolean append = false; - String retryFileName = - String.format("%s.%d.%s.%s", AsyncBlockWriter.RETRY_LOG_PREFIX, - currentBlock.getBlockID(), Time.monotonicNow(), tryCount); - File logDir = new File(this.dbPath); - if (!logDir.exists() && !logDir.mkdirs()) { - flusher.getLOG().error( - "Unable to create the log directory, Critical error cannot continue"); - return; - } - String log = Paths.get(this.dbPath, retryFileName).toString(); - ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE); - buffer.putLong(currentBlock.getBlockID()); - buffer.flip(); - try { - FileChannel channel = new FileOutputStream(log, append).getChannel(); - channel.write(buffer); - channel.close(); - flusher.processDirtyBlocks(this.dbPath, retryFileName); - } catch (IOException e) { - flusher.getTargetMetrics().incNumFailedRetryLogFileWrites(); - flusher.getLOG().error("Unable to write the retry block. Block ID: {}", - currentBlock.getBlockID(), e); - } - } - - /** - * Increments the try count. This is done each time we try this block - * write to the container. - */ - private void incTryCount() { - tryCount++; - } - - /** - * Get the retry count. - * - * @return int - */ - public int getTryCount() { - return tryCount; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java deleted file mode 100644 index 84b68e3..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockClientProtocolClientSideTranslatorPB.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import com.google.common.primitives.Longs; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.cblock.exception.CBlockException; -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.proto.CBlockClientProtocol; -import org.apache.hadoop.cblock.proto.MountVolumeResponse; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos.ContainerIDProto; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos.ListVolumesRequestProto; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos.ListVolumesResponseProto; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos.MountVolumeRequestProto; -import org.apache.hadoop.cblock.protocol.proto - .CBlockClientServerProtocolProtos.MountVolumeResponseProto; -import org.apache.hadoop.cblock.protocol.proto.CBlockServiceProtocolProtos - .VolumeInfoProto; -import org.apache.hadoop.cblock.protocolPB.CBlockClientServerProtocolPB; -import org.apache.hadoop.ipc.ProtobufHelper; -import org.apache.hadoop.ipc.ProtocolTranslator; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -/** - * The client side of CBlockClientProtocol. - * - * CBlockClientProtocol is the protocol used between cblock client side - * and cblock manager (cblock client side is just the node where jscsi daemon - * process runs. a machines talks to jscsi daemon for mounting a volume). - * - * Right now, the only communication carried by this protocol is for client side - * to request mounting a volume. - */ -public class CBlockClientProtocolClientSideTranslatorPB - implements CBlockClientProtocol, ProtocolTranslator, Closeable { - - private final CBlockClientServerProtocolPB rpcProxy; - - public CBlockClientProtocolClientSideTranslatorPB( - CBlockClientServerProtocolPB rpcProxy) { - this.rpcProxy = rpcProxy; - } - - - @Override - public void close() throws IOException { - RPC.stopProxy(rpcProxy); - } - - @Override - public Object getUnderlyingProxyObject() { - return rpcProxy; - } - - @Override - public MountVolumeResponse mountVolume( - String userName, String volumeName) throws IOException { - MountVolumeRequestProto.Builder - request - = MountVolumeRequestProto - .newBuilder(); - request.setUserName(userName); - request.setVolumeName(volumeName); - try { - MountVolumeResponseProto resp - = rpcProxy.mountVolume(null, request.build()); - if (!resp.getIsValid()) { - throw new CBlockException( - "Not a valid volume:" + userName + ":" + volumeName); - } - List<Pipeline> containerIDs = new ArrayList<>(); - HashMap<String, Pipeline> containerPipelines = new HashMap<>(); - if (resp.getAllContainerIDsList().size() == 0) { - throw new CBlockException("Mount volume request returned no container"); - } - for (ContainerIDProto containerID : - resp.getAllContainerIDsList()) { - if (containerID.hasPipeline()) { - // it should always have a pipeline only except for tests. - Pipeline p = Pipeline.getFromProtoBuf(containerID.getPipeline()); - p.setData(Longs.toByteArray(containerID.getIndex())); - containerIDs.add(p); - containerPipelines.put(containerID.getContainerID(), p); - } else { - throw new CBlockException("ContainerID does not have pipeline!"); - } - } - return new MountVolumeResponse( - resp.getIsValid(), - resp.getUserName(), - resp.getVolumeName(), - resp.getVolumeSize(), - resp.getBlockSize(), - containerIDs, - containerPipelines); - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } - - @Override - public List<VolumeInfo> listVolumes() throws IOException { - try { - List<VolumeInfo> result = new ArrayList<>(); - ListVolumesResponseProto - listVolumesResponseProto = this.rpcProxy.listVolumes(null, - ListVolumesRequestProto.newBuilder() - .build()); - for (VolumeInfoProto volumeInfoProto : - listVolumesResponseProto - .getVolumeEntryList()) { - result.add(new VolumeInfo(volumeInfoProto.getUserName(), - volumeInfoProto.getVolumeName(), volumeInfoProto.getVolumeSize(), - volumeInfoProto.getBlockSize())); - } - return result; - } catch (ServiceException e) { - throw ProtobufHelper.getRemoteException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java deleted file mode 100644 index 2f35668..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java +++ /dev/null @@ -1,440 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule; -import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; -import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.scm.XceiverClientManager; -import org.apache.hadoop.scm.container.common.helpers.Pipeline; -import org.jscsi.target.storage.IStorageModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_TRACE_IO_DEFAULT; - -/** - * The SCSI Target class for CBlockSCSIServer. - */ -final public class CBlockIStorageImpl implements IStorageModule { - private static final Logger LOGGER = - LoggerFactory.getLogger(CBlockIStorageImpl.class); - private static final Logger TRACER = - LoggerFactory.getLogger("TraceIO"); - - private CacheModule cache; - private final long volumeSize; - private final int blockSize; - private final String userName; - private final String volumeName; - private final boolean traceEnabled; - private final Configuration conf; - private final ContainerCacheFlusher flusher; - private List<Pipeline> fullContainerList; - - /** - * private: constructs a SCSI Target. - * - * @param config - config - * @param userName - Username - * @param volumeName - Name of the volume - * @param volumeSize - Size of the volume - * @param blockSize - Size of the block - * @param fullContainerList - Ordered list of containers that make up this - * volume. - * @param flusher - flusher which is used to flush data from - * level db cache to containers - * @throws IOException - Throws IOException. - */ - private CBlockIStorageImpl(Configuration config, String userName, - String volumeName, long volumeSize, int blockSize, - List<Pipeline> fullContainerList, ContainerCacheFlusher flusher) { - this.conf = config; - this.userName = userName; - this.volumeName = volumeName; - this.volumeSize = volumeSize; - this.blockSize = blockSize; - this.fullContainerList = new ArrayList<>(fullContainerList); - this.flusher = flusher; - this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO, - DFS_CBLOCK_TRACE_IO_DEFAULT); - } - - /** - * private: initialize the cache. - * - * @param xceiverClientManager - client manager that is used for creating new - * connections to containers. - * @param metrics - target metrics to maintain metrics for target server - * @throws IOException - Throws IOException. - */ - private void initCache(XceiverClientManager xceiverClientManager, - CBlockTargetMetrics metrics) throws IOException { - this.cache = CBlockLocalCache.newBuilder() - .setConfiguration(conf) - .setVolumeName(this.volumeName) - .setUserName(this.userName) - .setPipelines(this.fullContainerList) - .setClientManager(xceiverClientManager) - .setBlockSize(blockSize) - .setVolumeSize(volumeSize) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - this.cache.start(); - } - - /** - * Gets a new builder for CBlockStorageImpl. - * - * @return builder - */ - public static Builder newBuilder() { - return new Builder(); - } - - /** - * Get Cache. - * - * @return - Cache - */ - public CacheModule getCache() { - return cache; - } - - /** - * Returns block size of this volume. - * - * @return int size of block for this volume. - */ - @Override - public int getBlockSize() { - return blockSize; - } - - /** - * Checks the index boundary of a block address. - * - * @param logicalBlockAddress the index of the first block of data to be read - * or written - * @param transferLengthInBlocks the total number of consecutive blocks about - * to be read or written - * @return 0 == Success, 1 indicates the LBA address is out of bounds and 2 - * indicates that LBA + transfer size is out of bounds. - */ - @Override - public int checkBounds(long logicalBlockAddress, int transferLengthInBlocks) { - long sizeInBlocks = volumeSize / blockSize; - int res = 0; - if (logicalBlockAddress < 0 || logicalBlockAddress >= sizeInBlocks) { - res = 1; - } - - if (transferLengthInBlocks < 0 || - logicalBlockAddress + transferLengthInBlocks > sizeInBlocks) { - if (res == 0) { - res = 2; - } - } - return res; - } - - /** - * Number of blocks that make up this volume. - * - * @return long - count of blocks. - */ - @Override - public long getSizeInBlocks() { - return volumeSize / blockSize; - } - - /** - * Reads the number of bytes that can be read into the bytes buffer from the - * location indicated. - * - * @param bytes the array into which the data will be copied will be filled - * with data from storage - * @param storageIndex the position of the first byte to be copied - * @throws IOException - */ - @Override - public void read(byte[] bytes, long storageIndex) throws IOException { - int startingIdxInBlock = (int) storageIndex % blockSize; - int idxInBytes = 0; - if (this.traceEnabled) { - TRACER.info("Task=ReadStart,length={},location={}", - bytes.length, storageIndex); - } - while (idxInBytes < bytes.length - 1) { - long blockId = (storageIndex + idxInBytes) / blockSize; - byte[] dataBytes; - - try { - LogicalBlock block = this.cache.get(blockId); - dataBytes = block.getData().array(); - - if (this.traceEnabled) { - TRACER.info("Task=ReadBlock,BlockID={},length={},SHA={}", - blockId, - dataBytes.length, - dataBytes.length > 0 ? DigestUtils.sha256Hex(dataBytes) : null); - } - } catch (IOException e) { - // For an non-existing block cache.get will return a block with zero - // bytes filled. So any error here is a real error. - LOGGER.error("getting errors when reading data:" + e); - throw e; - } - - int length = blockSize - startingIdxInBlock; - if (length > bytes.length - idxInBytes) { - length = bytes.length - idxInBytes; - } - if (dataBytes.length >= length) { - System.arraycopy(dataBytes, startingIdxInBlock, bytes, idxInBytes, - length); - } - startingIdxInBlock = 0; - idxInBytes += length; - } - if (this.traceEnabled) { - TRACER.info("Task=ReadEnd,length={},location={},SHA={}", - bytes.length, storageIndex, DigestUtils.sha256Hex(bytes)); - } - } - - @Override - public void write(byte[] bytes, long storageIndex) throws IOException { - int startingIdxInBlock = (int) storageIndex % blockSize; - int idxInBytes = 0; - if (this.traceEnabled) { - TRACER.info("Task=WriteStart,length={},location={},SHA={}", - bytes.length, storageIndex, - bytes.length > 0 ? DigestUtils.sha256Hex(bytes) : null); - } - - ByteBuffer dataByte = ByteBuffer.allocate(blockSize); - while (idxInBytes < bytes.length - 1) { - long blockId = (storageIndex + idxInBytes) / blockSize; - int length = blockSize - startingIdxInBlock; - if (length > bytes.length - idxInBytes) { - length = bytes.length - idxInBytes; - } - System.arraycopy(bytes, idxInBytes, dataByte.array(), startingIdxInBlock, - length); - this.cache.put(blockId, dataByte.array()); - - if (this.traceEnabled) { - TRACER.info("Task=WriteBlock,BlockID={},length={},SHA={}", - blockId, dataByte.array().length, - dataByte.array().length > 0 ? - DigestUtils.sha256Hex(dataByte.array()) : null); - } - dataByte.clear(); - startingIdxInBlock = 0; - idxInBytes += length; - } - - if (this.traceEnabled) { - TRACER.info("Task=WriteEnd,length={},location={} ", - bytes.length, storageIndex); - } - } - - @Override - public void close() throws IOException { - try { - cache.close(); - } catch (IllegalStateException ise) { - LOGGER.error("Can not close the storage {}", ise); - throw ise; - } - } - - /** - * Builder class for CBlocklocalCache. - */ - public static class Builder { - private String userName; - private String volumeName; - private long volumeSize; - private int blockSize; - private List<Pipeline> containerList; - private Configuration conf; - private XceiverClientManager clientManager; - private ContainerCacheFlusher flusher; - private CBlockTargetMetrics metrics; - - /** - * Constructs a builder. - */ - Builder() { - - } - - public Builder setFlusher(ContainerCacheFlusher cacheFlusher) { - this.flusher = cacheFlusher; - return this; - } - - /** - * set config. - * - * @param config - config - * @return Builder - */ - public Builder setConf(Configuration config) { - this.conf = config; - return this; - } - - /** - * set user name. - * - * @param cblockUserName - user name - * @return Builder - */ - public Builder setUserName(String cblockUserName) { - this.userName = cblockUserName; - return this; - } - - /** - * set volume name. - * - * @param cblockVolumeName -- volume name - * @return Builder - */ - public Builder setVolumeName(String cblockVolumeName) { - this.volumeName = cblockVolumeName; - return this; - } - - /** - * set volume size. - * - * @param cblockVolumeSize -- set volume size. - * @return Builder - */ - public Builder setVolumeSize(long cblockVolumeSize) { - this.volumeSize = cblockVolumeSize; - return this; - } - - /** - * set block size. - * - * @param cblockBlockSize -- block size - * @return Builder - */ - public Builder setBlockSize(int cblockBlockSize) { - this.blockSize = cblockBlockSize; - return this; - } - - /** - * Set contianer list. - * - * @param cblockContainerList - set the pipeline list - * @return Builder - */ - public Builder setContainerList(List<Pipeline> cblockContainerList) { - this.containerList = cblockContainerList; - return this; - } - - /** - * Set client manager. - * - * @param xceiverClientManager -- sets the client manager. - * @return Builder - */ - public Builder setClientManager(XceiverClientManager xceiverClientManager) { - this.clientManager = xceiverClientManager; - return this; - } - - /** - * Set Cblock Target Metrics. - * - * @param targetMetrics -- sets the cblock target metrics - * @return Builder - */ - public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) { - this.metrics = targetMetrics; - return this; - } - - /** - * Builds the CBlockStorageImpl. - * - * @return builds the CBlock Scsi Target. - */ - public CBlockIStorageImpl build() throws IOException { - if (StringUtils.isBlank(userName)) { - throw new IllegalArgumentException("User name cannot be null or empty" + - "."); - } - if (StringUtils.isBlank(volumeName)) { - throw new IllegalArgumentException("Volume name cannot be null or " + - "empty"); - } - - if (volumeSize < 1) { - throw new IllegalArgumentException("Volume size cannot be negative or" + - " zero."); - } - - if (blockSize < 1) { - throw new IllegalArgumentException("Block size cannot be negative or " + - "zero."); - } - - if (containerList == null || containerList.size() == 0) { - throw new IllegalArgumentException("Container list cannot be null or " + - "empty"); - } - if (clientManager == null) { - throw new IllegalArgumentException("Client manager cannot be null"); - } - if (conf == null) { - throw new IllegalArgumentException("Configuration cannot be null"); - } - - if (flusher == null) { - throw new IllegalArgumentException("Flusher Cannot be null."); - } - CBlockIStorageImpl impl = new CBlockIStorageImpl(this.conf, this.userName, - this.volumeName, this.volumeSize, this.blockSize, this.containerList, - this.flusher); - impl.initCache(this.clientManager, this.metrics); - return impl; - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java deleted file mode 100644 index 6367c61..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockManagerHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import org.apache.hadoop.cblock.meta.VolumeInfo; -import org.apache.hadoop.cblock.proto.MountVolumeResponse; - -import java.io.IOException; -import java.util.List; - -/** - * This class is the handler of CBlockManager used by target server - * to communicate with CBlockManager. - * - * More specifically, this class will expose local methods to target - * server, and make RPC calls to CBlockManager accordingly - */ -public class CBlockManagerHandler { - - private final CBlockClientProtocolClientSideTranslatorPB handler; - - public CBlockManagerHandler( - CBlockClientProtocolClientSideTranslatorPB handler) { - this.handler = handler; - } - - public MountVolumeResponse mountVolume( - String userName, String volumeName) throws IOException { - return handler.mountVolume(userName, volumeName); - } - - public List<VolumeInfo> listVolumes() throws IOException { - return handler.listVolumes(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java deleted file mode 100644 index e7df0cf..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java +++ /dev/null @@ -1,334 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.cblock.jscsiHelper; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.apache.hadoop.metrics2.lib.MutableRate; - -/** - * This class is for maintaining the various Cblock Target statistics - * and publishing them through the metrics interfaces. - * This also registers the JMX MBean for RPC. - * - * This class maintains stats like cache hit and miss ratio - * as well as the latency time of read and write ops. - */ -public class CBlockTargetMetrics { - // IOPS based Metrics - @Metric private MutableCounterLong numReadOps; - @Metric private MutableCounterLong numWriteOps; - @Metric private MutableCounterLong numReadCacheHits; - @Metric private MutableCounterLong numReadCacheMiss; - @Metric private MutableCounterLong numDirectBlockWrites; - - // Cblock internal Metrics - @Metric private MutableCounterLong numDirtyLogBlockRead; - @Metric private MutableCounterLong numBytesDirtyLogRead; - @Metric private MutableCounterLong numBytesDirtyLogWritten; - @Metric private MutableCounterLong numBlockBufferFlushCompleted; - @Metric private MutableCounterLong numBlockBufferFlushTriggered; - @Metric private MutableCounterLong numBlockBufferUpdates; - @Metric private MutableCounterLong numRetryLogBlockRead; - @Metric private MutableCounterLong numBytesRetryLogRead; - - // Failure Metrics - @Metric private MutableCounterLong numReadLostBlocks; - @Metric private MutableCounterLong numFailedReadBlocks; - @Metric private MutableCounterLong numWriteIOExceptionRetryBlocks; - @Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks; - @Metric private MutableCounterLong numFailedDirectBlockWrites; - @Metric private MutableCounterLong numIllegalDirtyLogFiles; - @Metric private MutableCounterLong numFailedDirtyLogFileDeletes; - @Metric private MutableCounterLong numFailedBlockBufferFlushes; - @Metric private MutableCounterLong numInterruptedBufferWaits; - @Metric private MutableCounterLong numFailedRetryLogFileWrites; - @Metric private MutableCounterLong numWriteMaxRetryBlocks; - @Metric private MutableCounterLong numFailedReleaseLevelDB; - - // Latency based Metrics - @Metric private MutableRate dbReadLatency; - @Metric private MutableRate containerReadLatency; - @Metric private MutableRate dbWriteLatency; - @Metric private MutableRate containerWriteLatency; - @Metric private MutableRate blockBufferFlushLatency; - @Metric private MutableRate directBlockWriteLatency; - - public CBlockTargetMetrics() { - } - - public static CBlockTargetMetrics create() { - MetricsSystem ms = DefaultMetricsSystem.instance(); - return ms.register("CBlockTargetMetrics", - "CBlock Target Metrics", - new CBlockTargetMetrics()); - } - - public void incNumReadOps() { - numReadOps.incr(); - } - - public void incNumWriteOps() { - numWriteOps.incr(); - } - - public void incNumReadCacheHits() { - numReadCacheHits.incr(); - } - - public void incNumReadCacheMiss() { - numReadCacheMiss.incr(); - } - - public void incNumReadLostBlocks() { - numReadLostBlocks.incr(); - } - - public void incNumDirectBlockWrites() { - numDirectBlockWrites.incr(); - } - - public void incNumWriteIOExceptionRetryBlocks() { - numWriteIOExceptionRetryBlocks.incr(); - } - - public void incNumWriteGenericExceptionRetryBlocks() { - numWriteGenericExceptionRetryBlocks.incr(); - } - - public void incNumFailedDirectBlockWrites() { - numFailedDirectBlockWrites.incr(); - } - - public void incNumFailedReadBlocks() { - numFailedReadBlocks.incr(); - } - - public void incNumBlockBufferFlushCompleted() { - numBlockBufferFlushCompleted.incr(); - } - - public void incNumBlockBufferFlushTriggered() { - numBlockBufferFlushTriggered.incr(); - } - - public void incNumDirtyLogBlockRead() { - numDirtyLogBlockRead.incr(); - } - - public void incNumBytesDirtyLogRead(int bytes) { - numBytesDirtyLogRead.incr(bytes); - } - - public void incNumBlockBufferUpdates() { - numBlockBufferUpdates.incr(); - } - - public void incNumRetryLogBlockRead() { - numRetryLogBlockRead.incr(); - } - - public void incNumBytesRetryLogRead(int bytes) { - numBytesRetryLogRead.incr(bytes); - } - - public void incNumBytesDirtyLogWritten(int bytes) { - numBytesDirtyLogWritten.incr(bytes); - } - - public void incNumFailedBlockBufferFlushes() { - numFailedBlockBufferFlushes.incr(); - } - - public void incNumInterruptedBufferWaits() { - numInterruptedBufferWaits.incr(); - } - - public void incNumIllegalDirtyLogFiles() { - numIllegalDirtyLogFiles.incr(); - } - - public void incNumFailedDirtyLogFileDeletes() { - numFailedDirtyLogFileDeletes.incr(); - } - - public void incNumFailedRetryLogFileWrites() { - numFailedRetryLogFileWrites.incr(); - } - - public void incNumWriteMaxRetryBlocks() { - numWriteMaxRetryBlocks.incr(); - } - - public void incNumFailedReleaseLevelDB() { - numFailedReleaseLevelDB.incr(); - } - - public void updateDBReadLatency(long latency) { - dbReadLatency.add(latency); - } - - public void updateContainerReadLatency(long latency) { - containerReadLatency.add(latency); - } - - public void updateDBWriteLatency(long latency) { - dbWriteLatency.add(latency); - } - - public void updateContainerWriteLatency(long latency) { - containerWriteLatency.add(latency); - } - - public void updateDirectBlockWriteLatency(long latency) { - directBlockWriteLatency.add(latency); - } - - public void updateBlockBufferFlushLatency(long latency) { - blockBufferFlushLatency.add(latency); - } - - @VisibleForTesting - public long getNumReadOps() { - return numReadOps.value(); - } - - @VisibleForTesting - public long getNumWriteOps() { - return numWriteOps.value(); - } - - @VisibleForTesting - public long getNumReadCacheHits() { - return numReadCacheHits.value(); - } - - @VisibleForTesting - public long getNumReadCacheMiss() { - return numReadCacheMiss.value(); - } - - @VisibleForTesting - public long getNumReadLostBlocks() { - return numReadLostBlocks.value(); - } - - @VisibleForTesting - public long getNumDirectBlockWrites() { - return numDirectBlockWrites.value(); - } - - @VisibleForTesting - public long getNumFailedDirectBlockWrites() { - return numFailedDirectBlockWrites.value(); - } - - @VisibleForTesting - public long getNumFailedReadBlocks() { - return numFailedReadBlocks.value(); - } - - @VisibleForTesting - public long getNumWriteIOExceptionRetryBlocks() { - return numWriteIOExceptionRetryBlocks.value(); - } - - @VisibleForTesting - public long getNumWriteGenericExceptionRetryBlocks() { - return numWriteGenericExceptionRetryBlocks.value(); - } - - @VisibleForTesting - public long getNumBlockBufferFlushCompleted() { - return numBlockBufferFlushCompleted.value(); - } - - @VisibleForTesting - public long getNumBlockBufferFlushTriggered() { - return numBlockBufferFlushTriggered.value(); - } - - @VisibleForTesting - public long getNumDirtyLogBlockRead() { - return numDirtyLogBlockRead.value(); - } - - @VisibleForTesting - public long getNumBytesDirtyLogReads() { - return numBytesDirtyLogRead.value(); - } - - @VisibleForTesting - public long getNumBlockBufferUpdates() { - return numBlockBufferUpdates.value(); - } - - @VisibleForTesting - public long getNumRetryLogBlockRead() { - return numRetryLogBlockRead.value(); - } - - @VisibleForTesting - public long getNumBytesRetryLogReads() { - return numBytesRetryLogRead.value(); - } - - @VisibleForTesting - public long getNumBytesDirtyLogWritten() { - return numBytesDirtyLogWritten.value(); - } - - @VisibleForTesting - public long getNumFailedBlockBufferFlushes() { - return numFailedBlockBufferFlushes.value(); - } - - @VisibleForTesting - public long getNumInterruptedBufferWaits() { - return numInterruptedBufferWaits.value(); - } - - @VisibleForTesting - public long getNumIllegalDirtyLogFiles() { - return numIllegalDirtyLogFiles.value(); - } - - @VisibleForTesting - public long getNumFailedDirtyLogFileDeletes() { - return numFailedDirtyLogFileDeletes.value(); - } - - @VisibleForTesting - public long getNumFailedRetryLogFileWrites() { - return numFailedRetryLogFileWrites.value(); - } - - @VisibleForTesting - public long getNumWriteMaxRetryBlocks() { - return numWriteMaxRetryBlocks.value(); - } - - @VisibleForTesting - public long getNumFailedReleaseLevelDB() { - return numFailedReleaseLevelDB.value(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java deleted file mode 100644 index 6c5c564..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetServer.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.cblock.jscsiHelper; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.cblock.proto.MountVolumeResponse; -import org.apache.hadoop.cblock.util.KeyUtil; -import org.apache.hadoop.conf.OzoneConfiguration; -import org.apache.hadoop.scm.XceiverClientManager; -import org.jscsi.target.Configuration; -import org.jscsi.target.Target; -import org.jscsi.target.TargetServer; - -import java.io.IOException; -import java.util.HashMap; - -/** - * This class extends JSCSI target server, which is a ISCSI target that can be - * recognized by a remote machine with ISCSI installed. - */ -public final class CBlockTargetServer extends TargetServer { - private final OzoneConfiguration conf; - private final CBlockManagerHandler cBlockManagerHandler; - private final XceiverClientManager xceiverClientManager; - private final ContainerCacheFlusher containerCacheFlusher; - private final CBlockTargetMetrics metrics; - - public CBlockTargetServer(OzoneConfiguration ozoneConfig, - Configuration jscsiConf, - CBlockManagerHandler cBlockManagerHandler, - CBlockTargetMetrics metrics) - throws IOException { - super(jscsiConf); - this.cBlockManagerHandler = cBlockManagerHandler; - this.xceiverClientManager = new XceiverClientManager(ozoneConfig); - this.conf = ozoneConfig; - this.containerCacheFlusher = new ContainerCacheFlusher(this.conf, - xceiverClientManager, metrics); - this.metrics = metrics; - LOGGER.info("Starting flusher thread."); - Thread flushListenerThread = new Thread(containerCacheFlusher); - flushListenerThread.setDaemon(true); - flushListenerThread.start(); - } - - public static void main(String[] args) throws Exception { - } - - @Override - public boolean isValidTargetName(String checkTargetName) { - if (!KeyUtil.isValidVolumeKey(checkTargetName)) { - return false; - } - String userName = KeyUtil.getUserNameFromVolumeKey(checkTargetName); - String volumeName = KeyUtil.getVolumeFromVolumeKey(checkTargetName); - if (userName == null || volumeName == null) { - return false; - } - try { - MountVolumeResponse result = - cBlockManagerHandler.mountVolume(userName, volumeName); - if (!result.getIsValid()) { - LOGGER.error("Not a valid volume:" + checkTargetName); - return false; - } - String volumeKey = KeyUtil.getVolumeKey(result.getUserName(), - result.getVolumeName()); - if (!targets.containsKey(volumeKey)) { - LOGGER.info("Mounting Volume. username: {} volume:{}", - userName, volumeName); - CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder() - .setUserName(userName) - .setVolumeName(volumeName) - .setVolumeSize(result.getVolumeSize()) - .setBlockSize(result.getBlockSize()) - .setContainerList(result.getContainerList()) - .setClientManager(xceiverClientManager) - .setConf(this.conf) - .setFlusher(containerCacheFlusher) - .setCBlockTargetMetrics(metrics) - .build(); - Target target = new Target(volumeKey, volumeKey, ozoneStore); - targets.put(volumeKey, target); - } - } catch (IOException e) { - LOGGER.error("Can not connect to server when validating target!" - + e.getMessage()); - } - return targets.containsKey(checkTargetName); - } - - @Override - public String[] getTargetNames() { - try { - if (cBlockManagerHandler != null) { - return cBlockManagerHandler.listVolumes(). - stream().map( - volumeInfo -> volumeInfo.getUserName() + ":" + volumeInfo - .getVolumeName()).toArray(String[]::new); - } else { - return new String[0]; - } - } catch (IOException e) { - LOGGER.error("Can't list existing volumes", e); - return new String[0]; - } - } - - @VisibleForTesting - public HashMap<String, Target> getTargets() { - return targets; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org