http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/function/TestOutputStream.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/function/TestOutputStream.cpp b/depends/libhdfs3/test/function/TestOutputStream.cpp new file mode 100644 index 0000000..da0162f --- /dev/null +++ b/depends/libhdfs3/test/function/TestOutputStream.cpp @@ -0,0 +1,774 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "client/FileSystem.h" +#include "client/InputStream.h" +#include "client/OutputStream.h" +#include "DateTime.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "gtest/gtest.h" +#include "Memory.h" +#include "TestUtil.h" +#include "Thread.h" +#include "XmlConfig.h" + +#include <iostream> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#ifndef TEST_HDFS_PREFIX +#define TEST_HDFS_PREFIX "./" +#endif + +#define BASE_DIR TEST_HDFS_PREFIX"/testOutputStream/" + +using namespace Hdfs; +using namespace Hdfs::Internal; + +class TestOutputStream: public ::testing::Test { +public: + TestOutputStream() : + conf("function-test.xml") { + conf.set("output.default.packetsize", 1024); + fs = new FileSystem(conf); + fs->connect(); + superfs = new FileSystem(conf); + superfs->connect(conf.getString("dfs.default.uri"), HDFS_SUPERUSER, NULL); + superfs->setWorkingDirectory(fs->getWorkingDirectory().c_str()); + + try { + superfs->deletePath(BASE_DIR, true); + } catch (...) { + } + + superfs->mkdirs(BASE_DIR, 0755); + superfs->setOwner(TEST_HDFS_PREFIX, USER, NULL); + superfs->setOwner(BASE_DIR, USER, NULL); + } + + ~TestOutputStream() { + try { + superfs->deletePath(BASE_DIR, true); + } catch (...) { + } + + fs->disconnect(); + delete fs; + superfs->disconnect(); + delete superfs; + } + + /** + * size the size will be the size of a chunk or the size of a packet or the size of a block, the default blocksize is 2048 + */ + void CheckWrite(size_t size, int flag) { + char buffer[size], buffer2[size - 100], buffer3[size + 100], readBuffer[2560]; + //check write a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.close()); + ASSERT_NO_THROW(ins.open(*fs, BASE_DIR"testWrite", true)); + ASSERT_NO_THROW(ins.readFully(readBuffer, sizeof(buffer))); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer), 0), true); + ASSERT_NO_THROW(ins.close()); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + //check write less than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.close()); + ASSERT_NO_THROW(ins.open(*fs, BASE_DIR"testWrite", false)); + ASSERT_NO_THROW(ins.readFully(readBuffer, sizeof(buffer2))); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer2), 0), true); + ASSERT_NO_THROW(ins.close()); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + //check write greater than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.close()); + ASSERT_NO_THROW(ins.open(*fs, BASE_DIR"testWrite", true)); + ASSERT_NO_THROW(ins.readFully(readBuffer, sizeof(buffer3))); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer3), 0), true); + ASSERT_NO_THROW(ins.close()); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + } + + void CheckOverWrite(size_t size, int flag) { + char buffer[size], buffer2[size - 100], buffer3[size + 100], readBuffer[2560]; + + //check overwrite a chunk|packet|block + if (flag == Overwrite) { + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create, 0644, false, 0, 2048)); + ASSERT_NO_THROW(ous.close()); + } + + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.close()); + ASSERT_NO_THROW(ins.open(*fs, BASE_DIR"testWrite", true)); + ASSERT_NO_THROW(ins.readFully(readBuffer, sizeof(buffer))); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer), 0), true); + ASSERT_NO_THROW(ins.close()); + //check overwrite less than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.close()); + ASSERT_NO_THROW(ins.open(*fs, BASE_DIR"testWrite", false)); + ASSERT_NO_THROW(ins.readFully(readBuffer, sizeof(buffer2))); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer2), 0), true); + ASSERT_NO_THROW(ins.close()); + //check overwrite greater than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.close()); + ASSERT_NO_THROW(ins.open(*fs, BASE_DIR"testWrite", true)); + ASSERT_NO_THROW(ins.readFully(readBuffer, sizeof(buffer3))); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer3), 0), true); + ASSERT_NO_THROW(ins.close()); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + } + + void TestWrite(size_t size, int flag) { + char buffer[size], buffer2[size - 100], buffer3[size + 100]; + FillBuffer(buffer, sizeof(buffer), 0); + //when outputstream is not opened, test the append function + ASSERT_THROW(ous.append(buffer, sizeof(buffer)), HdfsIOException); + //Test write a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + //Test write less than a chunk|packet|block + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + //Test write greater than a chunk|packet|block + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.close()); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + //Test write a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.close()); + + //Test write less than a chunk|packet|block + if (flag == Create) { + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + } + + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.close()); + + //Test write greater than a chunk|packet|block + if (flag == Create) { + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + } + + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.close()); + } + + void TestAppend(size_t size, int flag) { + char buffer[size], buffer2[size - 100], buffer3[size + 100], readBuffer[2560]; + //Test Append a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + //Test write less than a chunk|packet|block + FillBuffer(buffer2, sizeof(buffer2), 1); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + //Test write greater than a chunk|packet|block + FillBuffer(buffer3, sizeof(buffer3), 2); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.close()); + ins.open(*fs, BASE_DIR"testWrite", true); + ins.readFully(readBuffer, sizeof(buffer)); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer), 0), true); + ins.seek(sizeof(buffer)); + ins.readFully(readBuffer, sizeof(buffer2)); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer2), 1), true); + ins.seek(sizeof(buffer2) + sizeof(buffer)); + ins.readFully(readBuffer, sizeof(buffer3)); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer3), 2), true); + //Test write a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.close()); + //Test write less than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer2, sizeof(buffer2), 1); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.close()); + //Test write greater than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer3, sizeof(buffer3), 2); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.close()); + ins.seek(sizeof(buffer) + sizeof(buffer2) + sizeof(buffer3)); + ins.readFully(readBuffer, sizeof(buffer)); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer), 0), true); + ins.seek(2 * sizeof(buffer) + sizeof(buffer2) + sizeof(buffer3)); + ins.readFully(readBuffer, sizeof(buffer2)); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer2), 1), true); + ins.seek(2 * sizeof(buffer) + 2 * sizeof(buffer2) + sizeof(buffer3)); + ins.readFully(readBuffer, sizeof(buffer3)); + ASSERT_EQ(CheckBuffer(readBuffer, sizeof(buffer3), 2), true); + ins.close(); + } + + void TestAppendSyncBlock(size_t size, int flag) { + char buffer[size], buffer2[size - 100], buffer3[size + 100]; + //Test Append a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.sync()); + //Test write less than a chunk|packet|block + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.sync()); + //Test write greater than a chunk|packet|block + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //Test write a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //Test write less than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //Test write greater than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + } + + void TestCreateSyncBlock(size_t size, int flag) { + char buffer[size], buffer2[size - 100], buffer3[size + 100]; + //Test Append a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.sync()); + //Test write less than a chunk|packet|block + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.sync()); + //Test write greater than a chunk|packet|block + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //Test write a chunk|packet|block + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //Test write less than a chunk|packet|block + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //Test write greater than a chunk|packet|block + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + } + + void TestOverwriteSyncBlock(size_t size, int flag) { + char buffer[size], buffer2[size - 100], buffer3[size + 100]; + //Test Append a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.sync()); + //Test write less than a chunk|packet|block + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.sync()); + //Test write greater than a chunk|packet|block + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //Test write a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //Test write less than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //Test write greater than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + } + + void TestFlush(size_t size, int flag) { + char buffer[size], buffer2[size - 100], buffer3[size + 100]; + //Test Append a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.flush()); + //Test write less than a chunk|packet|block + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.flush()); + //Test write greater than a chunk|packet|block + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.flush()); + ASSERT_NO_THROW(ous.close()); + //Test write a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.flush()); + ASSERT_NO_THROW(ous.close()); + //Test write less than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.flush()); + ASSERT_NO_THROW(ous.close()); + //Test write greater than a chunk|packet|block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", flag, 0644, false, 0, 2048)); + FillBuffer(buffer3, sizeof(buffer3), 0); + ASSERT_NO_THROW(ous.append(buffer3, sizeof(buffer3))); + ASSERT_NO_THROW(ous.flush()); + ASSERT_NO_THROW(ous.close()); + } + +protected: + Config conf; + FileSystem * fs; + FileSystem * superfs; + InputStream ins; + OutputStream ous; +}; + +TEST_F(TestOutputStream, TestOpenFile_OpenFailed) { + { + //invalid path + OutputStream os; + EXPECT_THROW(os.open(*fs, "", Create), InvalidParameter); + } + { + //invalid path + OutputStream os; + EXPECT_THROW(os.open(*fs, NULL, Create), InvalidParameter); + } + { + //unconnect filesystem + FileSystem fs(conf); + OutputStream os; + EXPECT_THROW(os.open(fs, BASE_DIR"a", Create), HdfsIOException); + } + { + //path already exist as directory. + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR, Create), FileAlreadyExistsException); + } + { + //invalid flag + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR"a", Append | Overwrite), + InvalidParameter); + } + { + //invalid flag + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR"a", Create | Append | Overwrite), + InvalidParameter); + } + { + //invalid flag + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR"a", 0100000000), InvalidParameter); + } + { + //invalid flag + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR"a", -1), InvalidParameter); + } + { + //invalid flag + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR"a", 0), InvalidParameter); + } + { + //invalid permission. + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR"a", Create, (1u << 10)), + InvalidParameter); + } + { + //invalid replica number. + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR"a", Create, 0644, false, -1), + InvalidParameter); + } + { + //invalid block size. + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR"a", Create, 0644, false, 0, -1), + InvalidParameter); + } + { + //invalid block size. + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR"a", Create, 0644, false, 0, 1234), + InvalidParameter); + } + { + //Overwrite non-exist file + OutputStream os; + EXPECT_THROW(os.open(*fs, BASE_DIR"a", Overwrite), + FileNotFoundException); + } +} + +TEST_F(TestOutputStream, TestOpenFileForWrite) { + ASSERT_NO_THROW( + ous.open(*fs, BASE_DIR"//////b/c/d/././e/../../../../a", Create)); + //open an opened file + OutputStream other; + EXPECT_THROW(other.open(*fs, BASE_DIR"a", Create), + AlreadyBeingCreatedException); + ous.close(); + //create an exist file + ASSERT_THROW(ous.open(*fs, BASE_DIR"a", Create), FileAlreadyExistsException); + //open for append + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"a", Append)); + ous.close(); + //overwrite an exist file + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"a", Overwrite)); + ous.close(); + //create or append an exist file + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"a", Create | Append)); + ous.close(); + //create or append a non-exist file + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"b", Create | Append)); + ous.close(); + //create new file with SyncBlock flag + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"c", Create | SyncBlock)); + ous.close(); + //append file with SyncBlock flag + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"c", Append | SyncBlock)); + ous.close(); + //overwrite file with SyncBlock flag + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"c", Overwrite | SyncBlock)); + ous.close(); +} + + + +TEST_F(TestOutputStream, TestWriteChunkPacket) { + //test create a file and write a block + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create, 0644, false, 0, 2048)); + char buffer[512], buffer2[1024]; + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.close()); + //test append a packet to a file + ASSERT_THROW(ous.open(*fs, BASE_DIR"testWriteNotExist", Append, 0644, false, 0, 2048), FileNotFoundException); + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Append, 0644, false, 0, 2048)); + FillBuffer(buffer2, sizeof(buffer2), 0); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.append(buffer2, sizeof(buffer2))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //test overwrite a file + ASSERT_THROW(ous.open(*fs, BASE_DIR"testWriteNotExist", Overwrite, 0644, false, 0, 2048), FileNotFoundException); + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Overwrite, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.close()); + //test create|Append + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create | Append, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.close()); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), 1); + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create | Append, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.close()); + //test create|Overwrite + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create | Overwrite, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.close()); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), 1); + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create | Overwrite, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.close()); + //test create|SyncBlock + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), 1); + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create | SyncBlock, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //test Append|SyncBlock + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Append | SyncBlock, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); + //test Overwrite|SyncBlock + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Overwrite | SyncBlock, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); +} + +TEST_F(TestOutputStream, CheckWriteChunk) { + CheckWrite(512, Create | Append); + CheckWrite(512, Create); +} + +TEST_F(TestOutputStream, CheckWritePacket) { + CheckWrite(1024, Create | Append); + CheckWrite(1024, Create); +} + +TEST_F(TestOutputStream, CheckWriteBlock) { + CheckWrite(2048, Create | Append); + CheckWrite(2048, Create); +} + +TEST_F(TestOutputStream, CheckOverwrite) { + CheckOverWrite(512, Overwrite); + CheckOverWrite(1024, Overwrite); + CheckOverWrite(2048, Overwrite); + CheckOverWrite(512, Create | Overwrite); + CheckOverWrite(1024, Create | Overwrite); + CheckOverWrite(2048, Create | Overwrite); +} + + + +TEST_F(TestOutputStream, TestWrite) { + TestWrite(512, Create); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + TestWrite(1024, Create); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + TestWrite(2048, Create); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + TestWrite(512, Create | Append); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + TestWrite(1024, Create | Append); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); + TestWrite(2048, Create | Append); + ASSERT_EQ(fs->deletePath(BASE_DIR"testWrite", false), true); +} + +TEST_F(TestOutputStream, TestAppend) { + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create, 0644, false, 0, 2048)); + ous.close(); + TestAppend(512, Append); + fs->deletePath(BASE_DIR"testWrite", false); + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create, 0644, false, 0, 2048)); + ous.close(); + TestAppend(1024, Append); + fs->deletePath(BASE_DIR"testWrite", false); + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create, 0644, false, 0, 2048)); + ous.close(); + TestAppend(2048, Append); + fs->deletePath(BASE_DIR"testWrite", false); +} + +TEST_F(TestOutputStream, TestAppendSyncBlock) { + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create, 0644, false, 0, 2048)); + ous.close(); + TestAppendSyncBlock(512, Append | SyncBlock); + TestAppendSyncBlock(1024, Append | SyncBlock); + TestAppendSyncBlock(2048, Append | SyncBlock); + TestAppendSyncBlock(512, Overwrite | SyncBlock); + TestAppendSyncBlock(1024, Overwrite | SyncBlock); + TestAppendSyncBlock(2048, Overwrite | SyncBlock); +} + +TEST_F(TestOutputStream, TestCreateSyncBlock) { + TestCreateSyncBlock(512, Create | SyncBlock); + TestCreateSyncBlock(1024, Create | SyncBlock); + TestCreateSyncBlock(2048, Create | SyncBlock); +} + +TEST_F(TestOutputStream, TestOverwriteSyncBlock) { + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create, 0644, false, 0, 2048)); + ous.close(); + TestOverwriteSyncBlock(512, Overwrite | SyncBlock); + TestOverwriteSyncBlock(1024, Overwrite | SyncBlock); + TestOverwriteSyncBlock(2048, Overwrite | SyncBlock); +} + +TEST_F(TestOutputStream, TestFlush) { + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create, 0644, false, 0, 2048)); + ous.close(); + TestFlush(512, Append); + TestFlush(1024, Append); + TestFlush(2048, Append); +} + +TEST_F(TestOutputStream, TestTell) { + char buffer[512]; + ASSERT_NO_THROW(ous.open(*fs, BASE_DIR"testWrite", Create, 0644, false, 0, 2048)); + FillBuffer(buffer, sizeof(buffer), 0); + ASSERT_NO_THROW(ous.append(buffer, sizeof(buffer))); + ASSERT_EQ(ous.tell(), sizeof(buffer)); + ous.close(); +} + +static void CheckFileContent(FileSystem * fs, std::string path, int64_t len, size_t offset) { + InputStream in; + EXPECT_NO_THROW(in.open(*fs, path.c_str(), true)); + std::vector<char> buff(20 * 1024); + int rc, todo = len, batch; + + while (todo > 0) { + batch = todo < static_cast<int>(buff.size()) ? todo : buff.size(); + batch = in.read(&buff[0], batch); + EXPECT_TRUE(batch > 0); + todo = todo - batch; + rc = Hdfs::CheckBuffer(&buff[0], batch, offset); + offset += batch; + EXPECT_TRUE(rc); + } + + EXPECT_NO_THROW(in.close()); +} + +static void WriteSameTime(FileSystem * fs, std::string path, int flag, int64_t writeSize) { + std::vector<char> buffer(64 * 1024); + int64_t todo, batch; + size_t offset = 0; + todo = writeSize; + OutputStream ousA; + EXPECT_NO_THROW(DebugException(ousA.open(*fs, path.c_str(), flag, 0644, false, 0, 1024 * 1024))); + + while (todo > 0) { + batch = todo < static_cast<int>(buffer.size()) ? todo : buffer.size(); + Hdfs::FillBuffer(&buffer[0], batch, offset); + EXPECT_NO_THROW(DebugException(ousA.append(&buffer[0], batch))); + todo -= batch; + offset += batch; + } + + ASSERT_NO_THROW(DebugException(ousA.close())); + CheckFileContent(fs, path, writeSize, 0); +} + +static void NothrowTestWriteSameTime(FileSystem * fs, std::string path, + int flag, int64_t writeSize) { + EXPECT_NO_THROW(WriteSameTime(fs, path, flag, writeSize)); +} + +TEST_F(TestOutputStream, TestWriteSameTime) { + int flag = Create | Overwrite; + int64_t writeSize = 20 * 1024 * 1024 + 234; + std::vector<shared_ptr<thread> > threads; + const char * filename = BASE_DIR"testWriteSameTime"; + + for (int i = 0; i <= 50; ++i) { + std::stringstream buffer; + buffer.imbue(std::locale::classic()); + buffer << filename << i; + threads.push_back( + shared_ptr<thread>( + new thread(NothrowTestWriteSameTime, fs, buffer.str(), flag, writeSize))); + } + + for (size_t i = 0; i < threads.size(); ++i) { + threads[i]->join(); + } +} + + + + + + + + + + + + + + + + + + + + + + + + +
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/secure/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/secure/CMakeLists.txt b/depends/libhdfs3/test/secure/CMakeLists.txt new file mode 100644 index 0000000..1452adc --- /dev/null +++ b/depends/libhdfs3/test/secure/CMakeLists.txt @@ -0,0 +1,69 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) + +AUTO_SOURCES(secure_SOURCES "*.cpp" "RECURSE" "${CMAKE_CURRENT_SOURCE_DIR}") + +INCLUDE_DIRECTORIES(${gmock_INCLUDE_DIR} ${gtest_INCLUDE_DIR} ${libhdfs3_ROOT_SOURCES_DIR}) + +IF(NEED_BOOST) + INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR}) +ENDIF(NEED_BOOST) + +INCLUDE_DIRECTORIES(${libhdfs3_ROOT_SOURCES_DIR}) +INCLUDE_DIRECTORIES(${libhdfs3_COMMON_SOURCES_DIR}) +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR}) +INCLUDE_DIRECTORIES(${PROTOBUF_INCLUDE_DIRS}) +INCLUDE_DIRECTORIES(${libhdfs3_PLATFORM_HEADER_DIR}) +INCLUDE_DIRECTORIES(${LIBXML2_INCLUDE_DIR}) +INCLUDE_DIRECTORIES(${KERBEROS_INCLUDE_DIRS}) +INCLUDE_DIRECTORIES(${GSASL_INCLUDE_DIR}) +INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/mock) + +PROTOBUF_GENERATE_CPP(libhdfs3_PROTO_SOURCES libhdfs3_PROTO_HEADERS ${libhdfs3_PROTO_FILES}) + +IF(ENABLE_DEBUG STREQUAL ON) + SET(libhdfs3_SOURCES ${libhdfs3_SOURCES} ${libhdfs3_MOCK_SOURCES}) +ENDIF(ENABLE_DEBUG STREQUAL ON) + +IF(NOT HDFS_SUPERUSER) + SET(HDFS_SUPERUSER $ENV{USER}) +ENDIF(NOT HDFS_SUPERUSER) + +ADD_DEFINITIONS(-DHDFS_SUPERUSER="${HDFS_SUPERUSER}") +ADD_DEFINITIONS(-DUSER="$ENV{USER}") + +ADD_EXECUTABLE(secure EXCLUDE_FROM_ALL + ${gtest_SOURCES} + ${gmock_SOURCES} + ${libhdfs3_SOURCES} + ${libhdfs3_PROTO_SOURCES} + ${libhdfs3_PROTO_HEADERS} + ${secure_SOURCES} +) + +TARGET_LINK_LIBRARIES(secure pthread) + +IF(NEED_BOOST) + INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR}) + SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -L${Boost_LIBRARY_DIRS}") + TARGET_LINK_LIBRARIES(secure boost_thread) + TARGET_LINK_LIBRARIES(secure boost_chrono) + TARGET_LINK_LIBRARIES(secure boost_system) + TARGET_LINK_LIBRARIES(secure boost_atomic) + TARGET_LINK_LIBRARIES(secure boost_iostreams) +ENDIF(NEED_BOOST) + +IF(NEED_GCCEH) + TARGET_LINK_LIBRARIES(secure gcc_eh) +ENDIF(NEED_GCCEH) + +IF(OS_LINUX) + TARGET_LINK_LIBRARIES(secure uuid) +ENDIF(OS_LINUX) + +TARGET_LINK_LIBRARIES(secure ${PROTOBUF_LIBRARIES}) +TARGET_LINK_LIBRARIES(secure ${LIBXML2_LIBRARIES}) +TARGET_LINK_LIBRARIES(secure ${KERBEROS_LIBRARIES}) +TARGET_LINK_LIBRARIES(secure ${GSASL_LIBRARIES}) + +SET(secure_SOURCES ${secure_SOURCES} PARENT_SCOPE) + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/secure/FunctionTestSecureMain.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/secure/FunctionTestSecureMain.cpp b/depends/libhdfs3/test/secure/FunctionTestSecureMain.cpp new file mode 100644 index 0000000..b8299de --- /dev/null +++ b/depends/libhdfs3/test/secure/FunctionTestSecureMain.cpp @@ -0,0 +1,38 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "gtest/gtest.h" + +int main(int argc, char ** argv) { + ::testing::InitGoogleTest(&argc, argv); +#ifdef DATA_DIR + if (0 != chdir(DATA_DIR)) { + abort(); + } +#endif + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/secure/SecureFunctionTest.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/secure/SecureFunctionTest.cpp b/depends/libhdfs3/test/secure/SecureFunctionTest.cpp new file mode 100644 index 0000000..e74bfd8 --- /dev/null +++ b/depends/libhdfs3/test/secure/SecureFunctionTest.cpp @@ -0,0 +1,402 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "client/FileSystem.h" +#include "client/FileSystemInter.h" +#include "client/FileSystemKey.h" +#include "client/InputStream.h" +#include "client/OutputStream.h" +#include "client/OutputStream.h" +#include "client/Permission.h" +#include "DateTime.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "gtest/gtest.h" +#include "Logger.h" +#include "Memory.h" +#include "server/NamenodeInfo.h" +#include "TestUtil.h" +#include "XmlConfig.h" + +#include <ctime> + +#ifndef TEST_HDFS_PREFIX +#define TEST_HDFS_PREFIX "./" +#endif + +#define BASE_DIR TEST_HDFS_PREFIX"/testSecureHA/" + +using namespace Hdfs; +using namespace Hdfs::Internal; + +class TestKerberosConnect : public ::testing::Test { +public: + TestKerberosConnect() : conf("function-secure.xml") { + std::stringstream ss; + ss.imbue(std::locale::classic()); + ss << "/tmp/krb5cc_"; + ss << getuid(); + const char * userCCpath = GetEnv("LIBHDFS3_TEST_USER_CCPATH", + ss.str().c_str()); + conf.set("hadoop.security.kerberos.ticket.cache.path", userCCpath); + } + + ~TestKerberosConnect() { + } + +protected: + Config conf; +}; + +TEST_F(TestKerberosConnect, connect) { + FileSystem fs(conf); + ASSERT_NO_THROW(DebugException(fs.connect())); +} + +TEST_F(TestKerberosConnect, GetDelegationToken_Failure) { + FileSystem fs(conf); + ASSERT_NO_THROW(DebugException(fs.connect())); + EXPECT_THROW(fs.getDelegationToken(NULL), InvalidParameter); + EXPECT_THROW(fs.getDelegationToken(""), InvalidParameter); + fs.disconnect(); +} + +TEST_F(TestKerberosConnect, DelegationToken_Failure) { + std::string token; + FileSystem fs(conf); + ASSERT_NO_THROW(DebugException(fs.connect())); + EXPECT_THROW(DebugException(fs.renewDelegationToken(token)), HdfsInvalidBlockToken); + ASSERT_NO_THROW(token = fs.getDelegationToken("Unknown")); + EXPECT_THROW(fs.renewDelegationToken(token), AccessControlException); + ASSERT_NO_THROW(token = fs.getDelegationToken()); + ASSERT_NO_THROW(fs.cancelDelegationToken(token)); + EXPECT_THROW(DebugException(fs.renewDelegationToken(token)), HdfsInvalidBlockToken); + EXPECT_THROW(DebugException(fs.cancelDelegationToken(token)), HdfsInvalidBlockToken); + fs.disconnect(); +} + +TEST_F(TestKerberosConnect, DelegationToken) { + FileSystem fs(conf); + ASSERT_NO_THROW(DebugException(fs.connect())); + ASSERT_NO_THROW({ + std::string token = fs.getDelegationToken(); + fs.renewDelegationToken(token); + fs.cancelDelegationToken(token); + }); + fs.disconnect(); +} + +TEST(TestDeletationToken, ToFromString) { + Token t1, t2; + std::string str; + ASSERT_NO_THROW(str = t1.toString()); + ASSERT_NO_THROW(t2.fromString(str)); + EXPECT_TRUE(t1 == t2); + t1.setIdentifier("test"); + ASSERT_NO_THROW(str = t1.toString()); + ASSERT_NO_THROW(t2.fromString(str)); + EXPECT_TRUE(t1 == t2); + t1.setPassword("test"); + ASSERT_NO_THROW(str = t1.toString()); + ASSERT_NO_THROW(t2.fromString(str)); + EXPECT_TRUE(t1 == t2); + t1.setKind("test"); + ASSERT_NO_THROW(str = t1.toString()); + ASSERT_NO_THROW(t2.fromString(str)); + EXPECT_TRUE(t1 == t2); + t1.setService("test"); + ASSERT_NO_THROW(str = t1.toString()); + ASSERT_NO_THROW(t2.fromString(str)); + EXPECT_TRUE(t1 == t2); +} + +class TestToken: public ::testing::Test { +public: + TestToken() { + Config conf("function-secure.xml"); + std::stringstream ss; + ss.imbue(std::locale::classic()); + ss << "/tmp/krb5cc_"; + ss << getuid(); + const char * userCCpath = GetEnv("LIBHDFS3_TEST_USER_CCPATH", ss.str().c_str()); + conf.set("hadoop.security.kerberos.ticket.cache.path", userCCpath); + fs = shared_ptr<FileSystem> (new FileSystem(conf)); + fs->connect(); + + try { + fs->deletePath(BASE_DIR, true); + } catch (...) { + } + + fs->mkdirs(BASE_DIR, 0755); + relengBin = GetEnv("RELENG_BIN", "./"); + } + + ~TestToken() { + try { + fs->disconnect(); + } catch (...) { + } + } + +protected: + shared_ptr<FileSystem> fs; + std::string relengBin; +}; + +class TestNamenodeHA: public TestToken { +public: + void switchToStandbyNamenode() { + std::string cmd = relengBin + "/ha_failover.sh"; + int rc = std::system(cmd.c_str()); + + if (rc) { + LOG(WARNING, "failed to invoke ha_failover.sh, return code is %d", rc); + } + } + + void CheckFileContent(const std::string & path, int64_t len, size_t offset, bool switchNamenode = false) { + InputStream in; + EXPECT_NO_THROW(in.open(*fs, path.c_str(), true)); + std::vector<char> buff(20 * 1024); + int rc, todo = len, batch; + + while (todo > 0) { + if (switchNamenode && todo < len / 2) { + switchToStandbyNamenode(); + switchNamenode = false; + } + + batch = todo < static_cast<int>(buff.size()) ? todo : buff.size(); + batch = in.read(&buff[0], batch); + EXPECT_TRUE(batch > 0); + todo = todo - batch; + rc = Hdfs::CheckBuffer(&buff[0], batch, offset); + offset += batch; + EXPECT_TRUE(rc); + } + + EXPECT_NO_THROW(in.close()); + } +}; + +static void TestInputOutputStream(FileSystem & fs) { + OutputStream out; + ASSERT_NO_THROW(DebugException(out.open(fs, BASE_DIR "file", Create, 0644, true, 0, 1024))); + std::vector<char> buffer(1024 * 3 + 1); + FillBuffer(&buffer[0], buffer.size(), 0); + ASSERT_NO_THROW(DebugException(out.append(&buffer[0], buffer.size()))); + ASSERT_NO_THROW(out.sync()); + InputStream in; + ASSERT_NO_THROW(DebugException(in.open(fs, BASE_DIR "file"))); + memset(&buffer[0], 0, buffer.size()); + ASSERT_NO_THROW(DebugException(in.readFully(&buffer[0], buffer.size()))); + EXPECT_TRUE(CheckBuffer(&buffer[0], buffer.size(), 0)); + ASSERT_NO_THROW(in.close()); + ASSERT_NO_THROW(out.close()); +} + +TEST_F(TestToken, BlockToken) { + TestInputOutputStream(*fs); +} + +static void VerifyToken(const std::string & token, const std::string & host, + const std::string & port) { + std::stringstream ss; + ss.imbue(std::locale::classic()); + ss << "curl -o /tmp/VerifyToken.out --silent --no-buffer -i \"http://" + << host << ":" << port + << "/webhdfs/v1/?op=GETHOMEDIRECTORY&delegation=" << token << "\"" + << std::endl; + std::cerr << "cmd: " << ss.str(); + ASSERT_EQ(0, std::system(ss.str().c_str())); + ASSERT_EQ(0, access("/tmp/VerifyToken.out", R_OK)); + ASSERT_EQ(0, std::system("grep -E '200 OK|StandbyException' /tmp/VerifyToken.out")); +} + +static void ExtractHostPort(const std::string & uri, std::string & host, + std::string & port) { + std::stringstream ss; + ss.imbue(std::locale::classic()); + + if (NULL == strstr(uri.c_str(), "://")) { + ss << "hdfs://" << uri; + } else { + ss << uri; + } + + FileSystemKey key(ss.str(), NULL); + host = key.getHost(); + port = key.getPort(); +} + +TEST_F(TestToken, DelegatationToken) { + std::string token; + Hdfs::Internal::Token t; + std::string host, port; + Config conf("function-secure.xml"); + std::string namenode = conf.getString("dfs.default.uri"); + std::vector<NamenodeInfo> namenodes; + int count = 1000; + + for (int i = 0; i < count; ++i) { + ExtractHostPort(namenode, host, port); + ASSERT_NO_THROW(token = fs->getDelegationToken()); + ASSERT_NO_THROW(DebugException(t.fromString(token))); + + try { + namenodes = NamenodeInfo::GetHANamenodeInfo(host, conf); + + for (size_t i = 0; i < namenodes.size(); ++i) { + ExtractHostPort(namenodes[i].getHttpAddr(), host, port); + VerifyToken(token, host, port); + } + } catch (HdfsConfigNotFound & e) { + VerifyToken(token, host, "50070"); + } + + ASSERT_NO_THROW(fs->cancelDelegationToken(token)); + } +} + +TEST_F(TestToken, DelegatationTokenAccess) { + std::string token; + ASSERT_NO_THROW(token = fs->getDelegationToken()); + Config conf("function-secure.xml"); + FileSystem tempfs(conf); + ASSERT_NO_THROW(tempfs.connect(conf.getString("dfs.default.uri"), NULL, token.c_str())); + TestInputOutputStream(tempfs); + ASSERT_NO_THROW(fs->cancelDelegationToken(token)); +} + +TEST_F(TestNamenodeHA, Throughput) { + const char * filename = BASE_DIR"TestNamenodeHA"; + std::vector<char> buffer(64 * 1024); + int64_t fileLength = 5 * 1024 * 1024 * 1024ll; + int64_t todo = fileLength, batch; + size_t offset = 0; + OutputStream ous; + bool switchNamenode = true; + EXPECT_NO_THROW(DebugException(ous.open(*fs, filename, Create | Overwrite /*| SyncBlock*/))); + + while (todo > 0) { + if (switchNamenode && todo < fileLength / 2) { + switchToStandbyNamenode(); + switchNamenode = false; + } + + batch = todo < static_cast<int>(buffer.size()) ? todo : buffer.size(); + Hdfs::FillBuffer(&buffer[0], batch, offset); + ASSERT_NO_THROW(DebugException(ous.append(&buffer[0], batch))); + todo -= batch; + offset += batch; + } + + ASSERT_NO_THROW(DebugException(ous.close())); + CheckFileContent(filename, fileLength, 0, true); +} + +static void CreateFile(FileSystem & fs, const std::string & path) { + OutputStream ous; + ASSERT_NO_THROW(DebugException(ous.open(fs, path.c_str()))); + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); +} + +static void AppendFile(FileSystem & fs, const std::string & path, int64_t len) { + OutputStream ous; + ous.open(fs, path.c_str(), Append | SyncBlock, 0755, true); + size_t offset = 0; + int64_t todo = len, batch; + std::vector<char> buffer(32 * 1024); + + while (todo > 0) { + batch = static_cast<int64_t>(buffer.size()) < todo ? buffer.size() : todo; + Hdfs::FillBuffer(&buffer[0], batch, offset); + ASSERT_NO_THROW(DebugException(ous.append(&buffer[0], batch))); + todo -= batch; + offset += batch; + } + + ASSERT_NO_THROW(ous.sync()); + ASSERT_NO_THROW(ous.close()); +} + +TEST_F(TestNamenodeHA, SmallFiles) { + int count = 100; + int64_t filesize = 32 * 1024 * 1024ll; + const char * filename = BASE_DIR"smallfile"; + std::vector<std::string> paths; + bool switchNamenode = true; + + for (int i = 0; i < count; ++i) { + std::stringstream ss; + ss.imbue(std::locale::classic()); + ss << filename << i; + paths.push_back(ss.str()); + } + + /* + * test create + */ + for (int i = 0; i < count; ++i) { + if (switchNamenode && i > count / 2) { + switchToStandbyNamenode(); + switchNamenode = false; + } + + CreateFile(*fs, paths[i]); + } + + /* + * test append + */ + switchNamenode = true; + + for (int i = 0; i < count; ++i) { + if (switchNamenode && i > count / 2) { + switchToStandbyNamenode(); + switchNamenode = false; + } + + AppendFile(*fs, paths[i], filesize); + } + + /* + * test read + */ + switchNamenode = true; + + for (int i = 0; i < count; ++i) { + if (switchNamenode && i > count / 2) { + switchToStandbyNamenode(); + switchNamenode = false; + } + + CheckFileContent(paths[i], filesize, 0); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/secure/TestSecureCInterface.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/secure/TestSecureCInterface.cpp b/depends/libhdfs3/test/secure/TestSecureCInterface.cpp new file mode 100644 index 0000000..e5f9e4c --- /dev/null +++ b/depends/libhdfs3/test/secure/TestSecureCInterface.cpp @@ -0,0 +1,183 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "gtest/gtest.h" + +#include "client/hdfs.h" +#include "client/KerberosName.h" +#include "Exception.h" +#include "ExceptionInternal.h" +#include "TestUtil.h" + +#include <krb5/krb5.h> + +#ifndef TEST_HDFS_PREFIX +#define TEST_HDFS_PREFIX "./" +#endif + +#define BASE_DIR TEST_HDFS_PREFIX"/testSecureHACInterface/" + +using namespace Hdfs; +using namespace Hdfs::Internal; + +static std::string ExtractPrincipalFromTicketCache( + const std::string & cachePath) { + krb5_context cxt = NULL; + krb5_ccache ccache = NULL; + krb5_principal principal = NULL; + krb5_error_code ec = 0; + std::string errmsg, retval; + char * priName = NULL; + + if (!cachePath.empty()) { + if (0 != setenv("KRB5CCNAME", cachePath.c_str(), 1)) { + THROW(HdfsIOException, "Cannot set env parameter \"KRB5CCNAME\""); + } + } + + do { + if (0 != (ec = krb5_init_context(&cxt))) { + break; + } + + if (0 != (ec = krb5_cc_default(cxt, &ccache))) { + break; + } + + if (0 != (ec = krb5_cc_get_principal(cxt, ccache, &principal))) { + break; + } + + if (0 != (ec = krb5_unparse_name(cxt, principal, &priName))) { + break; + } + } while (0); + + if (!ec) { + retval = priName; + } else { + if (cxt) { + errmsg = krb5_get_error_message(cxt, ec); + } else { + errmsg = "Cannot initialize kerberos context"; + } + } + + if (priName != NULL) { + krb5_free_unparsed_name(cxt, priName); + } + + if (principal != NULL) { + krb5_free_principal(cxt, principal); + } + + if (ccache != NULL) { + krb5_cc_close(cxt, ccache); + } + + if (cxt != NULL) { + krb5_free_context(cxt); + } + + if (!errmsg.empty()) { + THROW(HdfsIOException, + "FileSystem: Filed to extract principal from ticket cache: %s", + errmsg.c_str()); + } + + return retval; +} + +class TestKerberosConnectC: public ::testing::Test { +public: + TestKerberosConnectC() { + setenv("LIBHDFS3_CONF", "function-secure.xml", 1); + struct hdfsBuilder * bld = hdfsNewBuilder(); + assert(bld != NULL); + hdfsBuilderSetNameNode(bld, "default"); + hdfsBuilderSetForceNewInstance(bld); + std::stringstream ss; + ss.imbue(std::locale::classic()); + ss << "/tmp/krb5cc_"; + ss << getuid(); + const char * userCCpath = GetEnv("LIBHDFS3_TEST_USER_CCPATH", ss.str().c_str()); + hdfsBuilderSetKerbTicketCachePath(bld, userCCpath); + fs = hdfsBuilderConnect(bld); + hdfsFreeBuilder(bld); + + if (fs == NULL) { + throw std::runtime_error("cannot connect hdfs"); + } + + if (0 != hdfsCreateDirectory(fs, BASE_DIR)) { + throw std::runtime_error("cannot create test directory"); + } + + name = KerberosName(ExtractPrincipalFromTicketCache(userCCpath)); + } + + ~TestKerberosConnectC() { + hdfsDelete(fs, BASE_DIR, true); + hdfsDisconnect(fs); + } + +protected: + hdfsFS fs; + KerberosName name; +}; + +TEST_F(TestKerberosConnectC, GetDelegationToken_Failure) { + ASSERT_TRUE(NULL == hdfsGetDelegationToken(NULL, NULL)); + ASSERT_EQ(EINVAL, errno); + ASSERT_TRUE(NULL == hdfsGetDelegationToken(fs, NULL)); + ASSERT_EQ(EINVAL, errno); + ASSERT_TRUE(NULL == hdfsGetDelegationToken(fs, "")); + ASSERT_EQ(EINVAL, errno); +} + +TEST_F(TestKerberosConnectC, DelegationToken) { + char * token = NULL; + hdfsFreeDelegationToken(NULL); + ASSERT_EQ(-1, hdfsRenewDelegationToken(NULL, NULL)); + ASSERT_EQ(EINVAL, errno); + ASSERT_EQ(-1, hdfsRenewDelegationToken(fs, NULL)); + ASSERT_EQ(EINVAL, errno); + ASSERT_TRUE(NULL != (token = hdfsGetDelegationToken(fs, "Unknown"))); + ASSERT_EQ(-1, hdfsRenewDelegationToken(fs, token)); + ASSERT_EQ(EACCES, errno); + hdfsCancelDelegationToken(fs, token); + hdfsFreeDelegationToken(token); + ASSERT_TRUE(NULL != (token = hdfsGetDelegationToken(fs, name.getName().c_str()))); + ASSERT_GT(hdfsRenewDelegationToken(fs, token), 0); + ASSERT_EQ(0, hdfsCancelDelegationToken(fs, token)); + ASSERT_EQ(-1, hdfsRenewDelegationToken(fs, token)); + ASSERT_EQ(EPERM, errno); + ASSERT_EQ(-1, hdfsCancelDelegationToken(fs, token)); + ASSERT_EQ(EPERM, errno); + hdfsFreeDelegationToken(token); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/unit/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/unit/CMakeLists.txt b/depends/libhdfs3/test/unit/CMakeLists.txt new file mode 100644 index 0000000..80066c7 --- /dev/null +++ b/depends/libhdfs3/test/unit/CMakeLists.txt @@ -0,0 +1,64 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) + +AUTO_SOURCES(unit_SOURCES "*.cpp" "RECURSE" ${CMAKE_CURRENT_SOURCE_DIR}) + +INCLUDE_DIRECTORIES(${gmock_INCLUDE_DIR} ${gtest_INCLUDE_DIR} ${libhdfs3_ROOT_SOURCES_DIR}) + +IF(NEED_BOOST) + INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR}) +ENDIF(NEED_BOOST) + +INCLUDE_DIRECTORIES(${libhdfs3_ROOT_SOURCES_DIR}) +INCLUDE_DIRECTORIES(${libhdfs3_COMMON_SOURCES_DIR}) +INCLUDE_DIRECTORIES(${PROTOBUF_INCLUDE_DIRS}) +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_BINARY_DIR}) +INCLUDE_DIRECTORIES(${libhdfs3_PLATFORM_HEADER_DIR}) +INCLUDE_DIRECTORIES(${LIBXML2_INCLUDE_DIR}) +INCLUDE_DIRECTORIES(${KERBEROS_INCLUDE_DIRS}) +INCLUDE_DIRECTORIES(${GSASL_INCLUDE_DIR}) +INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/mock) + +ADD_DEFINITIONS(-DMOCK) + +PROTOBUF_GENERATE_CPP(libhdfs3_PROTO_SOURCES libhdfs3_PROTO_HEADERS ${libhdfs3_PROTO_FILES}) + +SET(libhdfs3_SOURCES ${libhdfs3_SOURCES} ${libhdfs3_MOCK_SOURCES}) + +ADD_EXECUTABLE(unit EXCLUDE_FROM_ALL + ${gtest_SOURCES} + ${gmock_SOURCES} + ${libhdfs3_SOURCES} + ${libhdfs3_PROTO_SOURCES} + ${libhdfs3_PROTO_HEADERS} + ${unit_SOURCES} +) + +TARGET_LINK_LIBRARIES(unit pthread) + +IF(NEED_BOOST) + INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIR}) + SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -L${Boost_LIBRARY_DIRS}") + TARGET_LINK_LIBRARIES(unit boost_thread) + TARGET_LINK_LIBRARIES(unit boost_chrono) + TARGET_LINK_LIBRARIES(unit boost_system) + TARGET_LINK_LIBRARIES(unit boost_atomic) + TARGET_LINK_LIBRARIES(unit boost_iostreams) +ENDIF(NEED_BOOST) + +IF(NEED_GCCEH) + TARGET_LINK_LIBRARIES(unit gcc_eh) +ENDIF(NEED_GCCEH) + +IF(OS_LINUX) + TARGET_LINK_LIBRARIES(unit ${LIBUUID_LIBRARIES}) + INCLUDE_DIRECTORIES(${LIBUUID_INCLUDE_DIRS}) +ENDIF(OS_LINUX) + +TARGET_LINK_LIBRARIES(unit ${PROTOBUF_LIBRARIES}) +TARGET_LINK_LIBRARIES(unit ${LIBXML2_LIBRARIES}) +TARGET_LINK_LIBRARIES(unit ${KERBEROS_LIBRARIES}) +TARGET_LINK_LIBRARIES(unit ${GSASL_LIBRARIES}) + +SET(unit_SOURCES ${unit_SOURCES} PARENT_SCOPE) + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/unit/TestBufferedSocketReader.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/unit/TestBufferedSocketReader.cpp b/depends/libhdfs3/test/unit/TestBufferedSocketReader.cpp new file mode 100644 index 0000000..f4aa16a --- /dev/null +++ b/depends/libhdfs3/test/unit/TestBufferedSocketReader.cpp @@ -0,0 +1,180 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Exception.h" +#include "ExceptionInternal.h" +#include "gtest/gtest.h" +#include "MockSocket.h" +#include "network/BufferedSocketReader.h" +#include "TestUtil.h" +#include "Thread.h" + +using namespace Hdfs; +using namespace Hdfs::Internal; +using namespace testing; + +class TestBufferedSocketReader: public ::testing::Test { +public: + TestBufferedSocketReader() : + reader(sock) { + } + +public: + MockSocket sock; + BufferedSocketReaderImpl reader; +}; + +static int32_t ReadAction(char * buffer, int32_t size, const char * target, + int32_t tsize) { + int32_t todo = size < tsize ? size : tsize; + memcpy(buffer, target, todo); + return todo; +} + +static void ReadFullyAction(char * buffer, int32_t size, int timeout, + const char * target, int32_t tsize) { + assert(size == tsize); + memcpy(buffer, target, size); +} + +TEST_F(TestBufferedSocketReader, TestRead) { + ASSERT_EQ(0, reader.size); + ASSERT_EQ(0, reader.cursor); + char readData[64]; + int rsize = sizeof(readData); + EXPECT_CALL(sock, read(_, _)).Times(1).WillOnce( + Invoke(bind(&ReadAction, _1, _2, readData, rsize))); + /* + * fill the buffer, read from buffer. + */ + int bufferSize = reader.buffer.size(); + ASSERT_GT(bufferSize, 0); + FillBuffer(&reader.buffer[0], bufferSize, 0); + reader.size = bufferSize; + std::vector<char> target(bufferSize); + ASSERT_EQ(1, reader.read(&target[0], 1)); + ASSERT_TRUE(CheckBuffer(&target[0], 1, 0)); + ASSERT_EQ(bufferSize - 1, reader.read(&target[1], bufferSize - 1)); + ASSERT_TRUE(CheckBuffer(&target[0], bufferSize, 0)); + /* + * Refill the buffer, read cross the buffer. + */ + reader.cursor = 0; + reader.size = bufferSize; + target.resize(bufferSize + sizeof(readData)); + FillBuffer(readData, sizeof(readData), bufferSize); + int32_t todo = target.size(); + + while (todo > 0) { + todo -= reader.read(&target[0] + (target.size() - todo), todo); + } + + ASSERT_TRUE(CheckBuffer(&target[0], target.size(), 0)); + ASSERT_EQ(0, reader.size); + ASSERT_EQ(0, reader.cursor); +} + +TEST_F(TestBufferedSocketReader, TestReadFully) { + ASSERT_EQ(0, reader.size); + ASSERT_EQ(0, reader.cursor); + char readData[64]; + int rsize = sizeof(readData); + EXPECT_CALL(sock, readFully(_, _, 500)).Times(1).WillOnce( + Invoke(bind(&ReadFullyAction, _1, _2, _3, readData, rsize))); + /* + * fill the buffer, read from buffer. + */ + int bufferSize = reader.buffer.size(); + ASSERT_GT(bufferSize, 0); + FillBuffer(&reader.buffer[0], bufferSize, 0); + reader.size = bufferSize; + std::vector<char> target(bufferSize); + reader.readFully(&target[0], 1, 500); + ASSERT_TRUE(CheckBuffer(&target[0], 1, 0)); + reader.readFully(&target[1], bufferSize - 1, 500); + ASSERT_TRUE(CheckBuffer(&target[0], bufferSize, 0)); + /* + * Refill the buffer, read cross the buffer. + */ + reader.cursor = 0; + reader.size = bufferSize; + target.resize(bufferSize + sizeof(readData)); + FillBuffer(readData, sizeof(readData), bufferSize); + reader.readFully(&target[0], target.size(), 500); + ASSERT_TRUE(CheckBuffer(&target[0], target.size(), 0)); + ASSERT_EQ(0, reader.size); + ASSERT_EQ(0, reader.cursor); +} + +TEST_F(TestBufferedSocketReader, TestBigEndianInt32) { + ASSERT_EQ(0, reader.size); + ASSERT_EQ(0, reader.cursor); + ASSERT_GE(reader.buffer.size(), sizeof(int32_t)); + reader.buffer[0] = '1'; + reader.buffer[1] = '2'; + reader.buffer[2] = '3'; + reader.buffer[3] = '4'; + reader.size = 4; + char target[4] = { '4', '3', '2', '1' }; + EXPECT_EQ(*reinterpret_cast<int32_t *>(target), + reader.readBigEndianInt32(500)); +} + +TEST_F(TestBufferedSocketReader, TestReadVarInt32) { + ASSERT_EQ(0, reader.size); + ASSERT_EQ(0, reader.cursor); + ASSERT_GE(reader.buffer.size(), sizeof(int32_t)); + reader.buffer[0] = 0; + reader.buffer[1] = 1; + reader.buffer[2] = 2; + reader.size = 3; + reader.cursor = 1; + char target[2]; + target[0] = 1; + target[1] = 2; + EXPECT_CALL(sock, read(_, _)).Times(1).WillOnce( + Invoke(bind(&ReadAction, _1, _2, target, sizeof(target)))); + EXPECT_CALL(sock, poll(true, false, _)).WillOnce(Return(true)); + EXPECT_EQ(1, reader.readVarint32(500)); + EXPECT_EQ(1, reader.size - reader.cursor); + EXPECT_EQ(2, reader.readVarint32(500)); + EXPECT_EQ(0, reader.size - reader.cursor); + EXPECT_EQ(1, reader.readVarint32(500)); + EXPECT_EQ(2, reader.readVarint32(500)); + EXPECT_EQ(0, reader.size - reader.cursor); +} + +TEST_F(TestBufferedSocketReader, TestReadVarInt32_Failure) { + EXPECT_CALL(sock, poll(true, false, _)).Times(AnyNumber()).WillOnce( + Return(false)).WillRepeatedly(Return(true)); + char target[1]; + target[0] = -1; + EXPECT_CALL(sock, read(_, _)).Times(AnyNumber()).WillRepeatedly( + Invoke(bind(&ReadAction, _1, _2, target, sizeof(target)))); + EXPECT_THROW(reader.readVarint32(0), HdfsTimeoutException); + EXPECT_THROW(reader.readVarint32(1000), HdfsNetworkException); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/unit/TestChecksum.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/unit/TestChecksum.cpp b/depends/libhdfs3/test/unit/TestChecksum.cpp new file mode 100644 index 0000000..704fef3 --- /dev/null +++ b/depends/libhdfs3/test/unit/TestChecksum.cpp @@ -0,0 +1,141 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "gtest/gtest.h" + +#include "HWCrc32c.h" +#include "SWCrc32c.h" + +#include <fstream> +#include <string> +#include <iostream> +#include <sstream> + +#ifndef DATA_DIR +#define DATA_DIR "" +#endif + +using namespace Hdfs::Internal; +using namespace testing; + +class TestChecksum: public ::testing::Test { + virtual void SetUp() { + in.open(DATA_DIR"checksum1.in"); + ASSERT_TRUE(!!in); + uint32_t value; + std::string str; + + while (getline(in, str)) { + std::stringstream ss(str); + ss.imbue(std::locale::classic()); + ss >> value >> str; + cases.push_back(std::make_pair(value, str)); + } + + in.close(); + in.open(DATA_DIR"checksum2.in"); + ASSERT_TRUE(!!in); + in >> result; + + while (getline(in, str)) { + strs.push_back(str); + str.clear(); + } + + in.close(); + } + + virtual void TearDown() { + cases.clear(); + } + +protected: + std::ifstream in; + uint32_t result; + std::vector<std::string> strs; + std::vector<std::pair<uint32_t, std::string> > cases; +}; + +TEST_F(TestChecksum, HWCrc32c) { + HWCrc32c cs; + + if (cs.available()) { + EXPECT_EQ(0u, cs.getValue()); + + for (size_t i = 0; i < cases.size(); ++i) { + size_t len = cases[i].second.length(); + const char * data = cases[i].second.c_str(); + std::vector<char> buffer(sizeof(uint64_t) + len); + + for (size_t j = 0; j < sizeof(uint64_t); ++j) { + memcpy(&buffer[j], data, len); + cs.reset(); + cs.update(&buffer[j], len); + EXPECT_EQ(cases[i].first, cs.getValue()); + } + } + + cs.reset(); + EXPECT_EQ(0u, cs.getValue()); + + for (size_t i = 0; i < strs.size(); ++i) { + cs.update(&strs[i][0], strs[i].length()); + } + + EXPECT_EQ(result, cs.getValue()); + } else { + std::cout << "skip HWCrc32c checksum test on unsupported paltform." << std::endl; + } +} + +TEST_F(TestChecksum, SWCrc32c) { + SWCrc32c cs; + EXPECT_EQ(0u, cs.getValue()); + + for (size_t i = 0; i < cases.size(); ++i) { + size_t len = cases[i].second.length(); + const char * data = cases[i].second.c_str(); + std::vector<char> buffer(sizeof(uint64_t) + len); + + for (size_t j = 0; j < sizeof(uint64_t); ++j) { + memcpy(&buffer[j], data, len); + cs.reset(); + cs.update(&buffer[j], len); + EXPECT_EQ(cases[i].first, cs.getValue()); + } + } + + cs.reset(); + EXPECT_EQ(0u, cs.getValue()); + + for (size_t i = 0; i < strs.size(); ++i) { + cs.update(&strs[i][0], strs[i].length()); + } + + EXPECT_EQ(result, cs.getValue()); +} + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/unit/TestException.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/unit/TestException.cpp b/depends/libhdfs3/test/unit/TestException.cpp new file mode 100644 index 0000000..19751b1 --- /dev/null +++ b/depends/libhdfs3/test/unit/TestException.cpp @@ -0,0 +1,122 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "gtest/gtest.h" + +#include "Exception.h" +#include "ExceptionInternal.h" + +#include <iostream> + +using namespace Hdfs; +using namespace Hdfs::Internal; + +TEST(TestException, ThrowAndCatch) { + try { + THROW(HdfsException, "hello world %d %lf", 123, 123.123); + } catch (const HdfsException & e) { + return; + } + + ASSERT_TRUE(false); +} + +TEST(TestException, ThrowAndCatchInheritedException) { + try { + THROW(HdfsException, "hello world %d %lf", 123, 123.123); + } catch (const HdfsException & e) { + return; + } + + ASSERT_TRUE(false); +} + +TEST(TestException, ThrowAndRethrow) { + try { + try { + THROW(HdfsException, "hello world %d %lf", 123, 123.123); + } catch (const HdfsException & e) { + exception_ptr p = current_exception(); + rethrow_exception(p); + } + + ASSERT_TRUE(false); + } catch (const HdfsException & e) { + return; + } + + ASSERT_TRUE(false); +} + +TEST(TestException, ThrowNestedException) { + try { + try { + try { + THROW(HdfsNetworkException, "hello world %d %lf", 123, 123.123); + } catch (const HdfsException & e) { + NESTED_THROW(HdfsNetworkException, "nested throw: "); + } + + ASSERT_TRUE(false); + } catch (const HdfsException & e) { + Hdfs::rethrow_if_nested(e); + } + + ASSERT_TRUE(false); + } catch (const HdfsException & e) { + std::cerr << e.what() << std::endl; + std::cerr << e.msg() << std::endl; + return; + } + + ASSERT_TRUE(false); +} + +TEST(TestException, NestedErrorDetails) { + try { + try { + try { + try { + try { + throw std::runtime_error("runtime_error"); + } catch (...) { + NESTED_THROW(HdfsException, "nested HdfsException"); + } + } catch (...) { + Hdfs::throw_with_nested(std::runtime_error("runtime_error")); + } + } catch (...) { + Hdfs::throw_with_nested(std::runtime_error("runtime_error")); + } + } catch (...) { + NESTED_THROW(HdfsException, "nested HdfsException"); + } + } catch (const HdfsException & e) { + std::string buffer; + std::cout << GetExceptionDetail(e, buffer) << std::endl; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/unit/TestGetHANamenodes.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/unit/TestGetHANamenodes.cpp b/depends/libhdfs3/test/unit/TestGetHANamenodes.cpp new file mode 100644 index 0000000..7aa5079 --- /dev/null +++ b/depends/libhdfs3/test/unit/TestGetHANamenodes.cpp @@ -0,0 +1,60 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "gtest/gtest.h" + +#include "client/hdfs.h" + +TEST(TestGetHAANamenodes, TestInvalidInput) { + int size; + EXPECT_TRUE(NULL == hdfsGetHANamenodesWithConfig(NULL, "phdcluster", &size)); + EXPECT_TRUE(errno == EINVAL); + EXPECT_TRUE(NULL == hdfsGetHANamenodesWithConfig("", "phdcluster", &size)); + EXPECT_TRUE(errno == EINVAL); + EXPECT_TRUE(NULL == hdfsGetHANamenodesWithConfig("invalidha.xml", "phdcluster", &size)); + EXPECT_TRUE(errno == EINVAL); + EXPECT_TRUE(NULL == hdfsGetHANamenodesWithConfig("notExist", "phdcluster", &size)); + EXPECT_TRUE(errno == EINVAL); + EXPECT_TRUE(NULL == hdfsGetHANamenodesWithConfig("validha.xml", NULL, &size)); + EXPECT_TRUE(errno == EINVAL); + EXPECT_TRUE(NULL == hdfsGetHANamenodesWithConfig("validha.xml", "", &size)); + EXPECT_TRUE(errno == EINVAL); + EXPECT_TRUE(NULL == hdfsGetHANamenodesWithConfig("validha.xml", "phdcluster", NULL)); + EXPECT_TRUE(errno == EINVAL); +} + +TEST(TestGetHAANamenodes, GetHANamenodes) { + int size; + Namenode * namenodes = NULL; + ASSERT_TRUE(NULL != (namenodes = hdfsGetHANamenodesWithConfig("validha.xml", "phdcluster", &size))); + ASSERT_EQ(2, size); + EXPECT_STREQ("mdw:9000", namenodes[0].rpc_addr); + EXPECT_STREQ("mdw:50070", namenodes[0].http_addr); + EXPECT_STREQ("smdw:9000", namenodes[1].rpc_addr); + EXPECT_EQ((char *)NULL, namenodes[1].http_addr); + EXPECT_NO_THROW(hdfsFreeNamenodeInformation(namenodes, size)); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/unit/TestLeaseRenewer.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/unit/TestLeaseRenewer.cpp b/depends/libhdfs3/test/unit/TestLeaseRenewer.cpp new file mode 100644 index 0000000..2e1ad70 --- /dev/null +++ b/depends/libhdfs3/test/unit/TestLeaseRenewer.cpp @@ -0,0 +1,48 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "gtest/gtest.h" + +#include "client/LeaseRenewer.h" +#include "DateTime.h" +#include "MockFileSystemInter.h" + +using namespace Hdfs::Internal; +using namespace testing; + +TEST(TestRenewer, Renew) { + shared_ptr<MockFileSystemInter> filesystem(new MockFileSystemInter()); + LeaseRenewerImpl renewer; + renewer.setInterval(1000); + EXPECT_CALL(*filesystem, getClientName()).Times(2).WillRepeatedly(Return("MockFS")); + EXPECT_CALL(*filesystem, registerOpenedOutputStream()).Times(1); + EXPECT_CALL(*filesystem, unregisterOpenedOutputStream()).Times(1).WillOnce(Return(true)); + EXPECT_CALL(*filesystem, renewLease()).Times(AtLeast(1)).WillRepeatedly(Return(true)); + renewer.StartRenew(filesystem); + sleep_for(seconds(2)); + renewer.StopRenew(filesystem); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/unit/TestLocatedBlocks.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/unit/TestLocatedBlocks.cpp b/depends/libhdfs3/test/unit/TestLocatedBlocks.cpp new file mode 100644 index 0000000..dc480c8 --- /dev/null +++ b/depends/libhdfs3/test/unit/TestLocatedBlocks.cpp @@ -0,0 +1,93 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "gtest/gtest.h" +#include "server/LocatedBlock.h" +#include "server/LocatedBlocks.h" + +using namespace Hdfs::Internal; + +TEST(TestLocatedBlocks, TestFindBlock){ + // repro GPSQL-3051 + LocatedBlocksImpl *lbs = new LocatedBlocksImpl(); + + // one element in blocks + lbs->setFileLength(10*1024+500); + LocatedBlock *blk = new LocatedBlock(10240); + blk->setNumBytes(500); + lbs->getBlocks().push_back(*blk); + const LocatedBlock *lb = lbs->findBlock(10239); + EXPECT_TRUE(lb == NULL); + lb = lbs->findBlock(10240); + EXPECT_TRUE(lb->getOffset() == blk->getOffset()); + EXPECT_TRUE(lb->getNumBytes() == blk->getNumBytes()); + + lbs->getBlocks().clear(); + // 3 elements in blocks + lbs->setFileLength(1024*2+100); + LocatedBlock *blk1 = new LocatedBlock(0); + blk1->setNumBytes(1024); + LocatedBlock *blk2 = new LocatedBlock(1024); + blk2->setNumBytes(1024); + LocatedBlock *blk3 = new LocatedBlock(2048); + blk3->setNumBytes(100); + lbs->getBlocks().push_back(*blk1); + lbs->getBlocks().push_back(*blk2); + lbs->getBlocks().push_back(*blk3); + + lb = lbs->findBlock(-100); + EXPECT_TRUE(lb == NULL); + lb = lbs->findBlock(0); + EXPECT_TRUE(lb->getOffset() == 0); + lb = lbs->findBlock(100); + EXPECT_TRUE(lb->getOffset() == 0); + lb = lbs->findBlock(1023); + EXPECT_TRUE(lb->getOffset() == 0); + lb = lbs->findBlock(1024); + EXPECT_TRUE(lb->getOffset() == 1024); + lb = lbs->findBlock(1024+100); + EXPECT_TRUE(lb->getOffset() == 1024); + lb = lbs->findBlock(2047); + EXPECT_TRUE(lb->getOffset() == 1024); + lb = lbs->findBlock(2047); + EXPECT_TRUE(lb->getOffset() == 1024); + lb = lbs->findBlock(2048); + EXPECT_TRUE(lb->getOffset() == 2048); + lb = lbs->findBlock(2048+100-1); + EXPECT_TRUE(lb->getOffset() == 2048); + + lb = lbs->findBlock(2048+100); + EXPECT_TRUE(lb == NULL); + lb = lbs->findBlock(2048+1000); + EXPECT_TRUE(lb == NULL); + + delete blk; + delete blk1; + delete blk2; + delete blk3; + delete lbs; +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/bc0904ab/depends/libhdfs3/test/unit/TestLruMap.cpp ---------------------------------------------------------------------- diff --git a/depends/libhdfs3/test/unit/TestLruMap.cpp b/depends/libhdfs3/test/unit/TestLruMap.cpp new file mode 100644 index 0000000..e171b95 --- /dev/null +++ b/depends/libhdfs3/test/unit/TestLruMap.cpp @@ -0,0 +1,64 @@ +/******************************************************************** + * Copyright (c) 2013 - 2014, Pivotal Inc. + * All rights reserved. + * + * Author: Zhanwei Wang + ********************************************************************/ +/******************************************************************** + * 2014 - + * open source under Apache License Version 2.0 + ********************************************************************/ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "gtest/gtest.h" +#include "LruMap.h" + +using namespace Hdfs::Internal; + +TEST(TestLruMap, TestInsertAndFind) { + LruMap<int, int> map(3); + map.insert(1, 1); + map.insert(2, 2); + map.insert(3, 3); + map.insert(4, 4); + int value = 0; + EXPECT_TRUE(map.find(2, &value)); + EXPECT_TRUE(value == 2); + EXPECT_TRUE(map.find(3, &value)); + EXPECT_TRUE(value == 3); + EXPECT_TRUE(map.find(4, &value)); + EXPECT_TRUE(value == 4); + EXPECT_FALSE(map.find(1, &value)); + EXPECT_TRUE(map.find(2, &value)); + EXPECT_TRUE(value == 2); + map.insert(5, 5); + EXPECT_FALSE(map.find(3, &value)); +} + +TEST(TestLruMap, TestFindAndErase) { + LruMap<int, int> map(3); + map.insert(1, 1); + map.insert(2, 2); + map.insert(3, 3); + map.insert(4, 4); + int value = 0; + EXPECT_EQ(3u, map.size()); + EXPECT_TRUE(map.findAndErase(2, &value)); + EXPECT_TRUE(value == 2); + EXPECT_EQ(2u, map.size()); +}