Improved TextScanOperator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4f8fdbe8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4f8fdbe8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4f8fdbe8 Branch: refs/heads/adaptive-bloom-filters Commit: 4f8fdbe8451aed1ad1c07a8badb5be85bee1ff57 Parents: eebb464 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Thu Jun 9 03:18:37 2016 -0500 Committer: Zuyu Zhang <zu...@apache.org> Committed: Thu Jun 9 10:52:40 2016 -0700 ---------------------------------------------------------------------- query_optimizer/ExecutionGenerator.cpp | 1 - relational_operators/CMakeLists.txt | 23 +- relational_operators/TextScanOperator.cpp | 818 ++++++------------- relational_operators/TextScanOperator.hpp | 286 +++---- relational_operators/WorkOrder.proto | 15 +- relational_operators/WorkOrderFactory.cpp | 72 +- .../tests/TextScanOperator_unittest.cpp | 1 - relational_operators/tests/text_scan_input.txt | 8 +- 8 files changed, 384 insertions(+), 840 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 99c2a21..f9fd742 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -945,7 +945,6 @@ void ExecutionGenerator::convertCopyFrom( physical_plan->file_name(), physical_plan->column_delimiter(), physical_plan->escape_strings(), - FLAGS_parallelize_load, *output_relation, insert_destination_index)); insert_destination_proto->set_relational_op_index(scan_operator_index); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index d2693eb..eb73c07 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -1,5 +1,7 @@ # Copyright 2011-2015 Quickstep Technologies LLC. # Copyright 2015-2016 Pivotal Software, Inc. +# Copyright 2016, Quickstep Research Group, Computer Sciences Department, +# University of WisconsinâMadison. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,9 +18,6 @@ QS_PROTOBUF_GENERATE_CPP(relationaloperators_SortMergeRunOperator_proto_srcs relationaloperators_SortMergeRunOperator_proto_hdrs SortMergeRunOperator.proto) -QS_PROTOBUF_GENERATE_CPP(relationaloperators_TextScanOperator_proto_srcs - relationaloperators_TextScanOperator_proto_hdrs - TextScanOperator.proto) QS_PROTOBUF_GENERATE_CPP(relationaloperators_WorkOrder_proto_srcs relationaloperators_WorkOrder_proto_hdrs WorkOrder.proto) @@ -61,9 +60,6 @@ add_library(quickstep_relationaloperators_SortRunGenerationOperator SortRunGener SortRunGenerationOperator.hpp) add_library(quickstep_relationaloperators_TableGeneratorOperator TableGeneratorOperator.cpp TableGeneratorOperator.hpp) add_library(quickstep_relationaloperators_TextScanOperator TextScanOperator.cpp TextScanOperator.hpp) -add_library(quickstep_relationaloperators_TextScanOperator_proto - ${relationaloperators_TextScanOperator_proto_srcs} - ${relationaloperators_TextScanOperator_proto_hdrs}) add_library(quickstep_relationaloperators_UpdateOperator UpdateOperator.cpp UpdateOperator.hpp) add_library(quickstep_relationaloperators_WorkOrder ../empty_src.cpp WorkOrder.hpp) add_library(quickstep_relationaloperators_WorkOrderFactory WorkOrderFactory.cpp WorkOrderFactory.hpp) @@ -360,27 +356,19 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator glog quickstep_catalog_CatalogAttribute quickstep_catalog_CatalogRelation - quickstep_catalog_CatalogRelationSchema quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext - quickstep_queryexecution_QueryExecutionMessages_proto - quickstep_queryexecution_QueryExecutionTypedefs - quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_WorkOrdersContainer quickstep_relationaloperators_RelationalOperator - quickstep_relationaloperators_TextScanOperator_proto quickstep_relationaloperators_WorkOrder quickstep_storage_InsertDestination - quickstep_storage_StorageBlob - quickstep_storage_StorageBlockInfo - quickstep_storage_StorageManager - quickstep_threading_ThreadIDBasedMap quickstep_types_Type quickstep_types_TypedValue + quickstep_types_containers_ColumnVector + quickstep_types_containers_ColumnVectorsValueAccessor quickstep_types_containers_Tuple quickstep_utility_Glob quickstep_utility_Macros - quickstep_utility_ThreadSafeQueue tmb) target_link_libraries(quickstep_relationaloperators_UpdateOperator glog @@ -430,7 +418,6 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory quickstep_relationaloperators_SortRunGenerationOperator quickstep_relationaloperators_TableGeneratorOperator quickstep_relationaloperators_TextScanOperator - quickstep_relationaloperators_TextScanOperator_proto quickstep_relationaloperators_UpdateOperator quickstep_relationaloperators_WorkOrder_proto quickstep_storage_StorageBlockInfo @@ -438,7 +425,6 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory tmb) target_link_libraries(quickstep_relationaloperators_WorkOrder_proto quickstep_relationaloperators_SortMergeRunOperator_proto - quickstep_relationaloperators_TextScanOperator_proto ${PROTOBUF_LIBRARY}) # Module all-in-one library: @@ -466,7 +452,6 @@ target_link_libraries(quickstep_relationaloperators quickstep_relationaloperators_SortRunGenerationOperator quickstep_relationaloperators_TableGeneratorOperator quickstep_relationaloperators_TextScanOperator - quickstep_relationaloperators_TextScanOperator_proto quickstep_relationaloperators_UpdateOperator quickstep_relationaloperators_WorkOrder quickstep_relationaloperators_WorkOrderFactory http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/TextScanOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp index 5acecbf..d2fd0cd 100644 --- a/relational_operators/TextScanOperator.cpp +++ b/relational_operators/TextScanOperator.cpp @@ -1,6 +1,8 @@ /** * Copyright 2011-2015 Quickstep Technologies LLC. * Copyright 2015-2016 Pivotal Software, Inc. + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,124 +22,30 @@ #include <algorithm> #include <cctype> #include <cstddef> -#include <cstdint> #include <cstdio> #include <cstdlib> -#include <cstring> +#include <memory> #include <string> #include <utility> #include <vector> #include "catalog/CatalogAttribute.hpp" -#include "catalog/CatalogRelationSchema.hpp" #include "query_execution/QueryContext.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/WorkOrdersContainer.hpp" -#include "relational_operators/TextScanOperator.pb.h" #include "storage/InsertDestination.hpp" -#include "storage/StorageBlob.hpp" -#include "storage/StorageBlockInfo.hpp" -#include "storage/StorageManager.hpp" -#include "threading/ThreadIDBasedMap.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" #include "types/containers/Tuple.hpp" +#include "types/containers/ColumnVector.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" #include "utility/Glob.hpp" -#include "gflags/gflags.h" #include "glog/logging.h" #include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/tagged_message.h" - -using std::isxdigit; -using std::size_t; -using std::sscanf; -using std::string; namespace quickstep { -DEFINE_uint64(textscan_split_blob_size, 2, - "Size of blobs in number of slots the input text files " - "are split into in the TextScanOperator."); - -// Check if blob size is positive. -static bool ValidateTextScanSplitBlobSize(const char *flagname, - std::uint64_t blob_size) { - if (blob_size == 0) { - LOG(ERROR) << "--" << flagname << " must be greater than 0"; - return false; - } - - return true; -} - -static const volatile bool text_scan_split_blob_size_dummy = gflags::RegisterFlagValidator( - &FLAGS_textscan_split_blob_size, &ValidateTextScanSplitBlobSize); - -namespace { - -// Detect whether '*search_string' contains a row-terminator (either line-feed -// or carriage-return + line-feed) immediately before 'end_pos'. If -// 'process_escape_sequences' is true, this function will also eliminate -// false-positives from an escaped row-terminator. Returns the number of -// characters in the row-terminator, or 0 if no terminator is detected. -inline unsigned DetectRowTerminator(const char *search_string, - std::size_t end_pos, - const bool process_escape_sequences) { - if (end_pos == 0) { - // Empty string. - return 0; - } - - if (search_string[end_pos - 1] != '\n') { - // String doesn't end in newline. - return 0; - } - - if (end_pos == 1) { - // String is the single newline character. - return 1; - } - - const bool have_carriage_return = (search_string[end_pos - 2] == '\r'); - if (have_carriage_return && (end_pos == 2)) { - // String is CR-LF and nothing else. - return 2; - } - - std::size_t backslashes = 0; - // Count consecutive backslashes preceding the terminator. If there is an odd - // number of backslashes, then the terminator is escaped and doesn't count as - // a real terminator. If there is an even number of backslashes, then each - // pair is an escaped backslash literal and the terminator still counts. - if (process_escape_sequences) { - end_pos = end_pos - 2 - have_carriage_return; - while (end_pos != 0) { - if (search_string[end_pos] == '\\') { - ++backslashes; - --end_pos; - if ((end_pos == 0) && (search_string[0] == '\\')) { - // Don't forget to count a backslash at the very beginning of a string. - ++backslashes; - } - } else { - break; - } - } - } - - if (backslashes & 0x1) { - return 0; - } else { - return 1 + have_carriage_return; - } -} - -} // namespace - bool TextScanOperator::getAllWorkOrders( WorkOrdersContainer *container, QueryContext *query_context, @@ -155,116 +63,50 @@ bool TextScanOperator::getAllWorkOrders( InsertDestination *output_destination = query_context->getInsertDestination(output_destination_index_); - if (parallelize_load_) { - // Parallel implementation: Split work orders are generated for each file - // being bulk-loaded. (More than one file can be loaded, because we support - // glob() semantics in file name.) These work orders read the input file, - // and split them in the blobs that can be parsed independently. - if (blocking_dependencies_met_) { - if (!work_generated_) { - // First, generate text-split work orders. - for (const auto &file : files) { - container->addNormalWorkOrder( - new TextSplitWorkOrder(query_id_, - file, - process_escape_sequences_, - storage_manager, - op_index_, - scheduler_client_id, - bus), - op_index_); - ++num_split_work_orders_; - } - work_generated_ = true; - return false; - } else { - // Check if there are blobs to parse. - while (!text_blob_queue_.empty()) { - const TextBlob blob_work = text_blob_queue_.popOne(); - container->addNormalWorkOrder( - new TextScanWorkOrder(query_id_, - blob_work.blob_id, - blob_work.size, - field_terminator_, - process_escape_sequences_, - output_destination, - storage_manager), - op_index_); - } - // Done if all split work orders are completed, and no blobs are left to - // process. - return num_done_split_work_orders_.load(std::memory_order_acquire) == num_split_work_orders_ && - text_blob_queue_.empty(); - } - } - return false; - } else { - // Serial implementation. - if (blocking_dependencies_met_ && !work_generated_) { - for (const auto &file : files) { + // Text segment size set to 256KB. + constexpr std::size_t kTextSegmentSize = 0x40000u; + + if (blocking_dependencies_met_ && !work_generated_) { + for (const std::string &file : files) { + // Use standard C libary to retrieve the file size. + FILE *fp = std::fopen(file.c_str(), "rb"); + std::fseek(fp, 0, SEEK_END); + const std::size_t file_size = std::ftell(fp); + std::fclose(fp); + + std::size_t text_offset = 0; + while (text_offset < file_size) { container->addNormalWorkOrder( new TextScanWorkOrder(query_id_, file, + text_offset, + std::min(kTextSegmentSize, file_size - text_offset), field_terminator_, process_escape_sequences_, output_destination, storage_manager), op_index_); + text_offset += kTextSegmentSize; } - work_generated_ = true; } - return work_generated_; - } -} - -void TextScanOperator::receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) { - switch (msg.type()) { - case kSplitWorkOrderCompletionMessage: { - num_done_split_work_orders_.fetch_add(1, std::memory_order_release); - break; - } - case kNewTextBlobMessage: { - serialization::TextBlob proto; - CHECK(proto.ParseFromArray(msg.payload(), msg.payload_size())); - text_blob_queue_.push(TextBlob(proto.blob_id(), proto.size())); - break; - } - default: - LOG(ERROR) << "Unknown feedback message type for TextScanOperator"; + work_generated_ = true; } + return work_generated_; } TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id, const std::string &filename, + const std::size_t text_offset, + const std::size_t text_segment_size, const char field_terminator, const bool process_escape_sequences, InsertDestination *output_destination, StorageManager *storage_manager) : WorkOrder(query_id), - is_file_(true), filename_(filename), + text_offset_(text_offset), + text_segment_size_(text_segment_size), field_terminator_(field_terminator), - text_blob_(0), - text_size_(0), - process_escape_sequences_(process_escape_sequences), - output_destination_(output_destination), - storage_manager_(storage_manager) { - DCHECK(output_destination_ != nullptr); - DCHECK(storage_manager_ != nullptr); -} - -TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id, - const block_id text_blob, - const std::size_t text_size, - const char field_terminator, - const bool process_escape_sequences, - InsertDestination *output_destination, - StorageManager *storage_manager) - : WorkOrder(query_id), - is_file_(false), - field_terminator_(field_terminator), - text_blob_(text_blob), - text_size_(text_size), process_escape_sequences_(process_escape_sequences), output_destination_(output_destination), storage_manager_(storage_manager) { @@ -274,439 +116,293 @@ TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id, void TextScanWorkOrder::execute() { const CatalogRelationSchema &relation = output_destination_->getRelation(); + std::vector<Tuple> tuples; - string current_row_string; - if (is_file_) { - FILE *file = std::fopen(filename_.c_str(), "r"); - if (file == nullptr) { - throw TextScanReadError(filename_); - } + constexpr std::size_t kSmallBufferSize = 0x4000; + char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize))); - bool have_row = false; - do { - current_row_string.clear(); - have_row = readRowFromFile(file, ¤t_row_string); - if (have_row) { - Tuple tuple = parseRow(current_row_string, relation); - output_destination_->insertTupleInBatch(tuple); - } - } while (have_row); - - std::fclose(file); - } else { - BlobReference blob = storage_manager_->getBlob(text_blob_); - const char *blob_pos = static_cast<const char*>(blob->getMemory()); - const char *blob_end = blob_pos + text_size_; - bool have_row = false; - do { - current_row_string.clear(); - have_row = readRowFromBlob(&blob_pos, blob_end, ¤t_row_string); - if (have_row) { - Tuple tuple = parseRow(current_row_string, relation); - output_destination_->insertTupleInBatch(tuple); - } - } while (have_row); - - // Drop the consumed blob produced by TextSplitWorkOrder. - blob.release(); - storage_manager_->deleteBlockOrBlobFile(text_blob_); + // Read text segment into buffer. + FILE *file = std::fopen(filename_.c_str(), "rb"); + std::fseek(file, text_offset_, SEEK_SET); + std::size_t bytes_read = std::fread(buffer, 1, text_segment_size_, file); + if (bytes_read != text_segment_size_) { + throw TextScanReadError(filename_); } -} -char TextScanWorkOrder::ParseOctalLiteral(const std::string &row_string, - std::size_t *start_pos) { - const std::size_t stop_pos = std::min(row_string.length(), *start_pos + 3); - - int value = 0; - for (; *start_pos < stop_pos; ++*start_pos) { - int char_value = row_string[*start_pos] - '0'; - if ((char_value >= 0) && (char_value < 8)) { - value = value * 8 + char_value; - } else { - return value; + // Locate the first newline character. + const char *buffer_end = buffer + text_segment_size_; + const char *row_ptr = buffer; + if (text_offset_ != 0) { + while (row_ptr < buffer_end && *row_ptr != '\n') { + ++row_ptr; } + } else { + --row_ptr; } - return value; -} - -char TextScanWorkOrder::ParseHexLiteral(const std::string &row_string, - std::size_t *start_pos) { - const std::size_t stop_pos = std::min(row_string.length(), *start_pos + 2); + if (row_ptr >= buffer_end) { + // This block does not even contain a newline character. + return; + } - int value = 0; - for (; *start_pos < stop_pos; ++*start_pos) { - if (!std::isxdigit(row_string[*start_pos])) { - break; - } + // Locate the last newline character. + const char *end_ptr = buffer_end - 1; + while (end_ptr > row_ptr && *end_ptr != '\n') { + --end_ptr; + } - int char_value; - if (std::isdigit(row_string[*start_pos])) { - char_value = row_string[*start_pos] - '0'; - } else if (std::islower(row_string[*start_pos])) { - char_value = row_string[*start_pos] - 'a' + 10; + // Advance both row_ptr and end_ptr by 1. + ++row_ptr; + ++end_ptr; + // Now row_ptr is pointing to the first character RIGHT AFTER the FIRST newline + // character in this text segment, and end_ptr is pointing to the first character + // RIGHT AFTER the LAST newline character in this text segment. + + // Process the tuples which are between the first newline character and the + // last newline character. + while (row_ptr < end_ptr) { + if (*row_ptr == '\r' || *row_ptr == '\n') { + // Skip empty lines. + ++row_ptr; } else { - char_value = row_string[*start_pos] - 'A' + 10; + tuples.emplace_back(parseRow(&row_ptr, relation)); } - - value = value * 16 + char_value; } - return value; -} + // Process the tuple that is right after the last newline character. + // NOTE(jianqiao): dynamic_read_size is trying to balance between the cases + // that the last tuple is very small / very large. + std::size_t dynamic_read_size = 1024; + std::string row_string; + std::fseek(file, text_offset_ + (end_ptr - buffer), SEEK_SET); + bool has_reached_end = false; + do { + bytes_read = std::fread(buffer, 1, dynamic_read_size, file); + std::size_t bytes_to_copy = bytes_read; -bool TextScanWorkOrder::readRowFromFile(FILE *file, std::string *row_string) const { - // Read up to 1023 chars + null-terminator at a time. - static constexpr std::size_t kRowBufferSize = 1024; - char row_buffer[kRowBufferSize]; - for (;;) { - char *read_string = std::fgets(row_buffer, sizeof(row_buffer), file); - if (read_string == nullptr) { - if (std::feof(file)) { - if (row_string->empty()) { - return false; - } else { - throw TextScanFormatError("File ended without delimiter"); - } - } else { - throw TextScanReadError(filename_); + for (std::size_t i = 0; i < bytes_read; ++i) { + if (buffer[i] == '\n') { + bytes_to_copy = i + 1; + has_reached_end = true; + break; } } - - // Append the contents of the buffer to '*row_string', and see if we've - // reached a genuine row-terminator yet. - row_string->append(row_buffer); - if (removeRowTerminator(row_string)) { - row_string->push_back(field_terminator_); - return true; + if (!has_reached_end && bytes_read != dynamic_read_size) { + has_reached_end = true; } - } -} -bool TextScanWorkOrder::readRowFromBlob(const char **start_pos, - const char *end_pos, - std::string *row_string) const { - while (*start_pos != end_pos) { - const char *next_newline = static_cast<const char*>(std::memchr( - *start_pos, - '\n', - end_pos - *start_pos)); - - if (next_newline == nullptr) { - throw TextScanFormatError("File ended without delimiter"); - } + row_string.append(buffer, bytes_to_copy); + dynamic_read_size = std::min(dynamic_read_size * 2, kSmallBufferSize); + } while (!has_reached_end); - // Append the blob's contents through the next newline to '*row_string', - // and see if we've reached a genuine row-terminator yet. - row_string->append(*start_pos, next_newline - *start_pos + 1); - *start_pos = next_newline + 1; - if (removeRowTerminator(row_string)) { - row_string->push_back(field_terminator_); - return true; + if (!row_string.empty()) { + if (row_string.back() != '\n') { + row_string.push_back('\n'); } + row_ptr = row_string.c_str(); + tuples.emplace_back(parseRow(&row_ptr, relation)); } - if (row_string->empty()) { - return false; - } else { - throw TextScanFormatError("File ended without delimiter"); - } -} - -bool TextScanWorkOrder::removeRowTerminator(std::string *row_string) const { - unsigned row_term_chars = DetectRowTerminator(row_string->c_str(), - row_string->length(), - process_escape_sequences_); - if (row_term_chars == 0) { - return false; - } else { - row_string->resize(row_string->length() - row_term_chars); - return true; - } -} - -bool TextScanWorkOrder::extractFieldString(const std::string &row_string, - std::size_t *start_pos, - std::string *field_string) const { - // Check for NULL literal string. - if (process_escape_sequences_ - && (row_string.length() - *start_pos >= 3) - && (row_string[*start_pos] == '\\') - && (row_string[*start_pos + 1] == 'N') - && (row_string[*start_pos + 2] == field_terminator_)) { - *start_pos += 3; - return false; - } - - // Scan up until terminator, expanding backslashed escape sequences as we go. - std::size_t terminator_pos = row_string.find(field_terminator_, *start_pos); - std::size_t scan_pos = *start_pos; - - if (process_escape_sequences_) { - for (;;) { - std::size_t backslash_pos = row_string.find('\\', scan_pos); - if ((backslash_pos == std::string::npos) || (backslash_pos >= terminator_pos)) { - // No more backslashes, or the next backslash is beyond the field - // terminator. - break; - } - - // Copy up to the backslash. - field_string->append(row_string, scan_pos, backslash_pos - scan_pos); - - if (backslash_pos + 1 == terminator_pos) { - // The terminator we found was escaped by a backslash, so append the - // literal terminator and re-scan for the next terminator character. - field_string->push_back(field_terminator_); - scan_pos = terminator_pos + 1; - terminator_pos = row_string.find(field_terminator_, scan_pos); - continue; + std::fclose(file); + free(buffer); + + // Store the tuples in a ColumnVectorsValueAccessor for bulk insert. + ColumnVectorsValueAccessor column_vectors; + std::size_t attr_id = 0; + for (const auto &attribute : relation) { + const Type &attr_type = attribute.getType(); + if (attr_type.isVariableLength()) { + std::unique_ptr<IndirectColumnVector> column( + new IndirectColumnVector(attr_type, tuples.size())); + for (const auto &tuple : tuples) { + column->appendTypedValue(tuple.getAttributeValue(attr_id)); } - - // Expand escape sequence. - switch (row_string[backslash_pos + 1]) { - case '0': // Fallthrough for octal digits. - case '1': - case '2': - case '3': - case '4': - case '5': - case '6': - case '7': - // Octal char literal. - scan_pos = backslash_pos + 1; - field_string->push_back(ParseOctalLiteral(row_string, &scan_pos)); - break; - case 'N': { - // Null literal after some other column data. - throw TextScanFormatError( - "Null indicator '\\N' encountered in text scan mixed in with " - "other column data."); - } - case '\\': - // Backslash. - field_string->push_back('\\'); - scan_pos = backslash_pos + 2; - break; - case 'b': - // Backspace. - field_string->push_back('\b'); - scan_pos = backslash_pos + 2; - break; - case 'f': - // Form-feed. - field_string->push_back('\f'); - scan_pos = backslash_pos + 2; - break; - case 'n': - // Newline. - field_string->push_back('\n'); - scan_pos = backslash_pos + 2; - break; - case 'r': - // Carriage return. - field_string->push_back('\r'); - scan_pos = backslash_pos + 2; - break; - case 't': - // Tab. - field_string->push_back('\t'); - scan_pos = backslash_pos + 2; - break; - case 'v': - // Vertical tab. - field_string->push_back('\v'); - scan_pos = backslash_pos + 2; - break; - case 'x': - if ((backslash_pos + 2 < row_string.length()) && std::isxdigit(row_string[backslash_pos + 2])) { - // Hexidecimal char literal. - scan_pos = backslash_pos + 2; - field_string->push_back(ParseHexLiteral(row_string, &scan_pos)); - } else { - // Just an escaped 'x' with no hex digits. - field_string->push_back('x'); - scan_pos = backslash_pos + 2; - } - break; - default: - // Append escaped character as-is. - field_string->push_back(row_string[backslash_pos + 1]); - scan_pos = backslash_pos + 2; - break; + column_vectors.addColumn(column.release()); + } else { + std::unique_ptr<NativeColumnVector> column( + new NativeColumnVector(attr_type, tuples.size())); + for (const auto &tuple : tuples) { + column->appendTypedValue(tuple.getAttributeValue(attr_id)); } + column_vectors.addColumn(column.release()); } + ++attr_id; } - DCHECK_NE(terminator_pos, std::string::npos); - field_string->append(row_string, scan_pos, terminator_pos - scan_pos); - *start_pos = terminator_pos + 1; - return true; + // Bulk insert the tuples. + output_destination_->bulkInsertTuples(&column_vectors); } -Tuple TextScanWorkOrder::parseRow(const std::string &row_string, const CatalogRelationSchema &relation) const { +Tuple TextScanWorkOrder::parseRow(const char **row_ptr, + const CatalogRelationSchema &relation) const { std::vector<TypedValue> attribute_values; - std::size_t pos = 0; + bool is_null_literal; + bool has_reached_end_of_line = false; std::string value_str; - CatalogRelationSchema::const_iterator attr_it = relation.begin(); - while (pos < row_string.length()) { - if (attr_it == relation.end()) { - throw TextScanFormatError("Row has too many fields"); + for (const auto &attr : relation) { + if (has_reached_end_of_line) { + throw TextScanFormatError("Row has too few fields"); } value_str.clear(); - if (extractFieldString(row_string, &pos, &value_str)) { - attribute_values.emplace_back(); - if (!attr_it->getType().parseValueFromString(value_str, &(attribute_values.back()))) { - throw TextScanFormatError("Failed to parse value"); - } - } else { + extractFieldString(row_ptr, + &is_null_literal, + &has_reached_end_of_line, + &value_str); + + if (is_null_literal) { // NULL literal. - if (!attr_it->getType().isNullable()) { + if (!attr.getType().isNullable()) { throw TextScanFormatError( "NULL literal '\\N' was specified for a column with a " "non-nullable Type"); } - - attribute_values.emplace_back(attr_it->getType().makeNullValue()); + attribute_values.emplace_back(attr.getType().makeNullValue()); + } else { + attribute_values.emplace_back(); + if (!attr.getType().parseValueFromString(value_str, &(attribute_values.back()))) { + throw TextScanFormatError("Failed to parse value"); + } } - - ++attr_it; } - if (attr_it != relation.end()) { - throw TextScanFormatError("Row has too few fields"); + if (!has_reached_end_of_line) { + throw TextScanFormatError("Row has too many fields"); } return Tuple(std::move(attribute_values)); } -void TextSplitWorkOrder::execute() { - std::FILE *file = std::fopen(filename_.c_str(), "r"); - if (!file) { - throw TextScanReadError(filename_); - } - - bool eof = false; - do { - // Allocate new blob, if current is empty. - if (0 == remainingBlobBytes()) { - allocateBlob(); - } - - // Read the into the unwritten part of blob. - std::size_t bytes = - std::fread(writeableBlobAddress(), 1, remainingBlobBytes(), file); - eof = bytes < remainingBlobBytes(); - written_ += bytes; - - // Write the current blob to queue for processing. - sendBlobInfoToOperator(!eof /* write_row_aligned */); - } while (!eof); - - std::fclose(file); +void TextScanWorkOrder::extractFieldString(const char **field_ptr, + bool *is_null_literal, + bool *has_reached_end_of_line, + std::string *field_string) const { + const char *cur_ptr = *field_ptr; + *is_null_literal = false; - // Notify the operator about the completion of this Work Order. - FeedbackMessage msg(TextScanOperator::kSplitWorkOrderCompletionMessage, - operator_index_, - nullptr /* payload */, - 0 /* payload_size */, - false /* ownership */); - SendFeedbackMessage(bus_, ClientIDMap::Instance()->getValue(), scheduler_client_id_, msg); -} + // Check for NULL literal string. + if (process_escape_sequences_ && cur_ptr[0] == '\\' && cur_ptr[1] == 'N') { + cur_ptr += 2; -// Allocate new blob. -void TextSplitWorkOrder::allocateBlob() { - text_blob_id_ = storage_manager_->createBlob(FLAGS_textscan_split_blob_size); - text_blob_ = storage_manager_->getBlobMutable(text_blob_id_); - blob_size_ = text_blob_->size(); - written_ = 0; -} + // Skip '\r' + if (*cur_ptr == '\r') { + ++cur_ptr; + } -// Find the last row terminator in the blob. -std::size_t TextSplitWorkOrder::findLastRowTerminator() { - std::size_t found = 0; - const char *blob = static_cast<const char *>(text_blob_->getMemory()); - - for (std::size_t index = written_; - index != 0; - --index) { - if (DetectRowTerminator(blob, index, process_escape_sequences_)) { - found = index; - break; + const char c = *cur_ptr; + if (c == field_terminator_ || c == '\n') { + *is_null_literal = true; + *has_reached_end_of_line = (c == '\n'); + *field_ptr = cur_ptr + 1; + return; } } - // TODO(quickstep-team): Design a way to handle long rows that are larger than - // the configured blob size. - CHECK_NE(0u, found) << "No row terminator found in " << FLAGS_textscan_split_blob_size - << "-slot chunk of " << filename_; - return found; -} + // Not a NULL literal string, rewind cur_ptr to the start position for parsing. + cur_ptr = *field_ptr; -void TextSplitWorkOrder::sendBlobInfoToOperator(const bool write_row_aligned) { - std::size_t text_len = written_; - std::string residue; - if (write_row_aligned) { - // Find last row terminator in current blob. - text_len = findLastRowTerminator(); - - // Copy the residual bytes after the last row terminator. - residue = std::string( - static_cast<char *>(text_blob_->getMemoryMutable()) + text_len, - written_ - text_len); - } + if (!process_escape_sequences_) { + // Simply copy until field_terminator or '\n'. + for (;; ++cur_ptr) { + const char c = *cur_ptr; + if (c == field_terminator_) { + *has_reached_end_of_line = false; + break; + } else if (c == '\n') { + *has_reached_end_of_line = true; + break; + } - // Notify the operator for the split-up blob. - serialization::TextBlob proto; - proto.set_blob_id(text_blob_id_); - proto.set_size(text_len); - - const std::size_t payload_size = proto.ByteSize(); - // NOTE(zuyu): 'payload' gets released by FeedbackMessage's destructor. - char *payload = static_cast<char *>(std::malloc(payload_size)); - CHECK(proto.SerializeToArray(payload, payload_size)); - - const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue(); - FeedbackMessage feedback_msg(TextScanOperator::kNewTextBlobMessage, - operator_index_, - payload, - payload_size); - SendFeedbackMessage(bus_, worker_thread_client_id, scheduler_client_id_, feedback_msg); - - // Notify Foreman for the avaiable work order on the blob. - serialization::WorkOrdersAvailableMessage message_proto; - message_proto.set_operator_index(operator_index_); - - // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string. - const size_t message_proto_length = message_proto.ByteSize(); - char *message_proto_bytes = static_cast<char*>(std::malloc(message_proto_length)); - CHECK(message_proto.SerializeToArray(message_proto_bytes, message_proto_length)); - - tmb::TaggedMessage tagged_message(static_cast<const void *>(message_proto_bytes), - message_proto_length, - kWorkOrdersAvailableMessage); - std::free(message_proto_bytes); - - // Send new work order available message to Foreman. - const tmb::MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage( - bus_, - worker_thread_client_id, - scheduler_client_id_, - std::move(tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not " - "be sent from thread with TMB client ID " - << worker_thread_client_id << " to Foreman with TMB client " - "ID " << scheduler_client_id_; - - if (residue.size()) { - // Allocate new blob, and copy residual bytes from last blob. - allocateBlob(); - std::memcpy(writeableBlobAddress(), residue.data(), residue.size()); - written_ += residue.size(); + // Ignore '\r' + if (c != '\r') { + field_string->push_back(c); + } + } + } else { + for (;; ++cur_ptr) { + const char c = *cur_ptr; + if (c == '\\') { + ++cur_ptr; + const char first_escaped_character = *cur_ptr; + switch (first_escaped_character) { + case '0': // Fallthrough for octal digits. + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + field_string->push_back(ParseOctalLiteral(&cur_ptr)); + break; + case 'N': { + // Null literal after some other column data. + throw TextScanFormatError( + "Null indicator '\\N' encountered in text scan mixed in with " + "other column data."); + } + case '\\': + // Backslash. + field_string->push_back('\\'); + break; + case 'b': + // Backspace. + field_string->push_back('\b'); + break; + case 'f': + // Form-feed. + field_string->push_back('\f'); + break; + case 'n': + // Newline. + field_string->push_back('\n'); + break; + case 'r': + // Carriage return. + field_string->push_back('\r'); + break; + case 't': + // Tab. + field_string->push_back('\t'); + break; + case 'v': + // Vertical tab. + field_string->push_back('\v'); + break; + case 'x': + if (std::isxdigit(cur_ptr[1])) { + // Hexidecimal char literal. + ++cur_ptr; + field_string->push_back(ParseHexLiteral(&cur_ptr)); + } else { + // Just an escaped 'x' with no hex digits. + field_string->push_back('x'); + } + break; + case '\n': + throw TextScanFormatError( + "Backslash line splicing is not supported."); + default: + // Append escaped character as-is. + field_string->push_back(first_escaped_character); + break; + } + } else if (c == field_terminator_) { + *has_reached_end_of_line = false; + break; + } else if (c == '\n') { + *has_reached_end_of_line = true; + break; + } else { + if (c != '\r') { + // Ignore '\r' + field_string->push_back(c); + } + } + } } + *field_ptr = cur_ptr + 1; } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/TextScanOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp index 3cda65b..d73e7dd 100644 --- a/relational_operators/TextScanOperator.hpp +++ b/relational_operators/TextScanOperator.hpp @@ -1,6 +1,8 @@ /** * Copyright 2011-2015 Quickstep Technologies LLC. * Copyright 2015-2016 Pivotal Software, Inc. + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,26 +20,18 @@ #ifndef QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_ #define QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_ -#include <atomic> +#include <cctype> #include <cstddef> -#include <cstdint> -#include <cstdio> #include <exception> #include <string> #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryContext.hpp" -#include "query_execution/QueryExecutionTypedefs.hpp" #include "relational_operators/RelationalOperator.hpp" #include "relational_operators/WorkOrder.hpp" -#include "storage/StorageBlob.hpp" -#include "storage/StorageBlockInfo.hpp" #include "types/containers/Tuple.hpp" #include "utility/Macros.hpp" -#include "utility/ThreadSafeQueue.hpp" - -#include "glog/logging.h" #include "tmb/id_typedefs.h" @@ -98,26 +92,11 @@ class TextScanFormatError : public std::exception { }; /** - * @brief A structure for text data blobs. - */ -struct TextBlob { - TextBlob(const block_id text_blob_id, const std::size_t text_size) - : blob_id(text_blob_id), size(text_size) {} - block_id blob_id; - std::size_t size; -}; - -/** * @brief An operator which reads tuples from a text file and inserts them into * a relation. **/ class TextScanOperator : public RelationalOperator { public: - enum FeedbackMessageType : WorkOrder::FeedbackMessageType { - kNewTextBlobMessage, - kSplitWorkOrderCompletionMessage, - }; - /** * @brief Constructor * @@ -130,29 +109,22 @@ class TextScanOperator : public RelationalOperator { * the text file. * @param process_escape_sequences Whether to decode escape sequences in the * text file. - * @param parallelize_load Parallelize the load process by th spliting file - * into blobs, and generating separate work-orders for each of them. * @param output_relation The output relation. * @param output_destination_index The index of the InsertDestination in the * QueryContext to insert tuples. **/ - TextScanOperator( - const std::size_t query_id, - const std::string &file_pattern, - const char field_terminator, - const bool process_escape_sequences, - const bool parallelize_load, - const CatalogRelation &output_relation, - const QueryContext::insert_destination_id output_destination_index) + TextScanOperator(const std::size_t query_id, + const std::string &file_pattern, + const char field_terminator, + const bool process_escape_sequences, + const CatalogRelation &output_relation, + const QueryContext::insert_destination_id output_destination_index) : RelationalOperator(query_id), file_pattern_(file_pattern), field_terminator_(field_terminator), process_escape_sequences_(process_escape_sequences), - parallelize_load_(parallelize_load), output_relation_(output_relation), output_destination_index_(output_destination_index), - num_done_split_work_orders_(0), - num_split_work_orders_(0), work_generated_(false) {} ~TextScanOperator() override {} @@ -171,23 +143,14 @@ class TextScanOperator : public RelationalOperator { return output_relation_.getID(); } - void receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) override; - private: const std::string file_pattern_; const char field_terminator_; const bool process_escape_sequences_; - const bool parallelize_load_; const CatalogRelation &output_relation_; const QueryContext::insert_destination_id output_destination_index_; - ThreadSafeQueue<TextBlob> text_blob_queue_; - std::atomic<std::uint32_t> num_done_split_work_orders_; - std::uint32_t num_split_work_orders_; - - // Indicates if work order to load file is generated for non-parallel load, and - // if work order to split file to blobs is generated for parallel load. bool work_generated_; DISALLOW_COPY_AND_ASSIGN(TextScanOperator); @@ -203,7 +166,9 @@ class TextScanWorkOrder : public WorkOrder { * * @param query_id The ID of the query to which this WorkOrder belongs. * @param filename The name of the text file to bulk insert. - * @param field_terminator The string which separates attribute values in + * @param text_offset The start position in the text file to start text scan. + * @param text_segment_size The size of text segment to be scanned. + * @param field_terminator The character which separates attribute values in * the text file. * @param process_escape_sequences Whether to decode escape sequences in the * text file. @@ -213,28 +178,8 @@ class TextScanWorkOrder : public WorkOrder { TextScanWorkOrder( const std::size_t query_id, const std::string &filename, - const char field_terminator, - const bool process_escape_sequences, - InsertDestination *output_destination, - StorageManager *storage_manager); - - /** - * @brief Constructor. - * - * @param query_id The ID of the query to which this WorkOrder belongs. - * @param text_blob Blob ID containing the data to be scanned. - * @param text_size Size of the data in the blob. - * @param field_terminator The character which separates attribute values in - * the text file. - * @param process_escape_sequences Whether to decode escape sequences in the - * text file. - * @param output_destination The InsertDestination to write the read tuples. - * @param storage_manager The StorageManager to use. - */ - TextScanWorkOrder( - const std::size_t query_id, - const block_id text_blob, - const std::size_t text_size, + const std::size_t text_offset, + const std::size_t text_segment_size, const char field_terminator, const bool process_escape_sequences, InsertDestination *output_destination, @@ -255,141 +200,106 @@ class TextScanWorkOrder : public WorkOrder { void execute() override; private: - // Parse up to three octal digits (0-7) starting at '*start_pos' in - // 'row_string' as a char literal. '*start_pos' will be modified to - // the first position AFTER the parsed octal digits. - static char ParseOctalLiteral(const std::string &row_string, - std::size_t *start_pos); - - // Parse up to two hexadecimal digits (0-F, case insensitive) starting at - // '*start_pos' in 'row_string' as a char literal. '*start_pos' will be - // modified to the first position AFTER the parsed hexadecimal digits. - static char ParseHexLiteral(const std::string &row_string, - std::size_t *start_pos); - - // Read the next text row from the open FILE stream '*file' into - // '*row_string'. Returns false if end-of-file is reached and there are no - // more rows, true if a row string was successfully read. For ease of - // parsing, '*row_string' has the trailing row-terminator removed and - // replaced with a field-terminator. - bool readRowFromFile(FILE *file, std::string *row_string) const; - - // Read the next text from blob memory starting at '**start_pos' and ending - // at '*end_pos' into '*row_string'. Returns false if the end of the blob is - // reached and there are no more rows, true if a row was successfully read. - // For ease of parsing, '*row_string' has the trailing row-terminator removed - // and replaced with a field-terminator. After call '*start_pos' points to - // first character AFTER the read row in the blob. - bool readRowFromBlob(const char **start_pos, - const char *end_pos, - std::string *row_string) const; - - // Trim a row-terminator (newline or carriage-return + newline) off the end - // of '*row_string'. Returns true if the row-terminator was successfully - // removed, false if '*row_string' did not end in a row-terminator. - bool removeRowTerminator(std::string *row_string) const; - - // Extract a field string starting at '*start_pos' in 'row_string' into - // '*field_string'. This method also expands escape sequences if - // 'process_escape_sequences_' is true. Returns true if a field string was - // successfully extracted, false in the special case where the NULL-literal - // string "\N" was found. Throws TextScanFormatError if text was malformed. - bool extractFieldString(const std::string &row_string, - std::size_t *start_pos, - std::string *field_string) const; - - // Make a tuple by parsing all of the individual fields specified in - // 'row_string'. - Tuple parseRow(const std::string &row_string, const CatalogRelationSchema &relation) const; - - const bool is_file_; - const std::string filename_; - const char field_terminator_; - const block_id text_blob_; - const std::size_t text_size_; - const bool process_escape_sequences_; - - InsertDestination *output_destination_; - StorageManager *storage_manager_; - - DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder); -}; - -/** - * @brief A WorkOrder to split the file into blobs of text that can be processed - * separately. - **/ -class TextSplitWorkOrder : public WorkOrder { - public: /** - * @brief Constructor. + * @brief Extract a field string starting at \p *field_ptr. This method also + * expands escape sequences if \p process_escape_sequences_ is true. + * Throws TextScanFormatError if text was malformed. * - * @param query_id The ID of the query to which this WorkOrder belongs. - * @param filename File to split into row-aligned blobs. - * @param process_escape_sequences Whether to decode escape sequences in the - * text file. - * @param storage_manager The StorageManager to use. - * @param operator_index Operator index of the current operator. This is used - * to send new-work available message to Foreman. - * @param scheduler_client_id The TMB client ID of the scheduler thread. - * @param bus A pointer to the TMB. + * @param field_ptr \p *field_ptr points to the current position of the input + * char stream for parsing. The overall char stream must end with a + * newline character. After the call, \p *field_ptr will be modified to + * the start position of the NEXT field string. + * @param is_null_literal OUTPUT parameter. Set to true if the NULL-literal + * string "\N" was found. + * @param has_reached_end_of_line OUTPUT parameter. Set to true if the newline + * character was encountered. + * @param field_string OUTPUT parameter. Set to the extracted field string. */ - TextSplitWorkOrder(const std::size_t query_id, - const std::string &filename, - const bool process_escape_sequences, - StorageManager *storage_manager, - const std::size_t operator_index, - const tmb::client_id scheduler_client_id, - MessageBus *bus) - : WorkOrder(query_id), - filename_(filename), - process_escape_sequences_(process_escape_sequences), - storage_manager_(DCHECK_NOTNULL(storage_manager)), - operator_index_(operator_index), - scheduler_client_id_(scheduler_client_id), - bus_(DCHECK_NOTNULL(bus)) {} + void extractFieldString(const char **field_ptr, + bool *is_null_literal, + bool *has_reached_end_of_line, + std::string *field_string) const; /** - * @exception TextScanReadError The text file could not be opened for - * reading. + * @brief Make a tuple by parsing all of the individual fields from a char stream. + * + * @param \p *row_ptr points to the current position of the input char stream + * for parsing. The overall char stream must end with a newline character. + * After the call, \p *row_ptr will be modified to the start position of + * the NEXT text row. + * @param relation The relation schema for the tuple. + * @return The tuple parsed from the char stream. */ - void execute() override; - - private: - // Allocate a new blob. - void allocateBlob(); - - // Find the last row terminator in current blob. - std::size_t findLastRowTerminator(); + Tuple parseRow(const char **row_ptr, + const CatalogRelationSchema &relation) const; - // Send the blob info to its operator via TMB. - void sendBlobInfoToOperator(const bool write_row_aligned); - // Get the writeable address (unwritten chunk) in current blob. - inline char* writeableBlobAddress() { - return static_cast<char*>(text_blob_->getMemoryMutable()) + written_; + /** + * @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as + * a char literal. \p *literal_ptr will be modified to the last position + * of the parsed octal digits. + * + * @param literal_ptr \p *literal_ptr points to the current position of the + * input char stream for parsing. The overall char stream must end with + * a newline character. + * @return The char literal from the parsed octal digits. + */ + inline static char ParseOctalLiteral(const char **literal_ptr) { + int value = 0; + const char *ptr = *literal_ptr; + for (int i = 0; i < 3; ++i, ++ptr) { + const int char_value = *ptr - '0'; + if ((char_value >= 0) && (char_value < 8)) { + value = value * 8 + char_value; + } else { + break; + } + } + *literal_ptr = ptr - 1; + return value; } - // Number of bytes remaining to be written. - inline std::size_t remainingBlobBytes() const { - return blob_size_ - written_; + /** + * @brief Parse up to two hexadecimal digits (0-F, case insensitive) starting + * at \p *literal_ptr as a char literal. \p *literal_ptr will be modified + * to the last position of the parsed octal digits. + * + * @param literal_ptr \p *literal_ptr points to the current position of the + * input char stream for parsing. The overall char stream must end with + * a newline character. + * @return The char literal from the parsed hexadecimal digits. + */ + inline static char ParseHexLiteral(const char **literal_ptr) { + int value = 0; + const char *ptr = *literal_ptr; + for (int i = 0; i < 2; ++i, ++ptr) { + const char c = *ptr; + int char_value; + if (std::isdigit(c)) { + char_value = c - '0'; + } else if (c >= 'a' && c <= 'f') { + char_value = c - 'a' + 10; + } else if (c >= 'A' && c <= 'F') { + char_value = c - 'A' + 10; + } else { + break; + } + value = value * 16 + char_value; + } + *literal_ptr = ptr - 1; + return value; } - const std::string filename_; // File to split. + const std::string filename_; + const std::size_t text_offset_; + const std::size_t text_segment_size_; + const char field_terminator_; const bool process_escape_sequences_; + InsertDestination *output_destination_; StorageManager *storage_manager_; - const std::size_t operator_index_; // Opeartor index. - const tmb::client_id scheduler_client_id_; // The scheduler's TMB client ID. - MessageBus *bus_; - - MutableBlobReference text_blob_; // Mutable reference to current blob. - block_id text_blob_id_; // Current blob ID. - std::size_t written_ = 0; // Bytes written in current blob. - std::size_t blob_size_ = 0; // Size of the current blob. - - DISALLOW_COPY_AND_ASSIGN(TextSplitWorkOrder); + DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder); }; /** @} */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index fd731f7..60d4c8f 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -1,5 +1,7 @@ // Copyright 2011-2015 Quickstep Technologies LLC. // Copyright 2015-2016 Pivotal Software, Inc. +// Copyright 2016, Quickstep Research Group, Computer Sciences Department, +// University of WisconsinâMadison. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -18,7 +20,6 @@ syntax = "proto2"; package quickstep.serialization; import "relational_operators/SortMergeRunOperator.proto"; -import "relational_operators/TextScanOperator.proto"; enum WorkOrderType { AGGREGATION = 1; @@ -39,8 +40,7 @@ enum WorkOrderType { SORT_RUN_GENERATION = 16; TABLE_GENERATOR = 17; TEXT_SCAN = 18; - TEXT_SPLIT = 19; - UPDATE = 20; + UPDATE = 19; } message WorkOrder { @@ -223,15 +223,12 @@ message TableGeneratorWorkOrder { message TextScanWorkOrder { extend WorkOrder { // All required. + optional string filename = 301; + optional uint64 text_offset = 302; + optional uint64 text_segment_size = 303; optional uint32 field_terminator = 304; // For one-byte char. optional bool process_escape_sequences = 305; optional int32 insert_destination_index = 306; - - // Either - optional string filename = 307; - - // Or - optional TextBlob text_blob = 308; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index 489b666..da42b4d 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -42,7 +42,6 @@ #include "relational_operators/SortRunGenerationOperator.hpp" #include "relational_operators/TableGeneratorOperator.hpp" #include "relational_operators/TextScanOperator.hpp" -#include "relational_operators/TextScanOperator.pb.h" #include "relational_operators/UpdateOperator.hpp" #include "relational_operators/WorkOrder.pb.h" #include "storage/StorageBlockInfo.hpp" @@ -389,40 +388,16 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder } case serialization::TEXT_SCAN: { LOG(INFO) << "Creating TextScanWorkOrder"; - if (proto.HasExtension(serialization::TextScanWorkOrder::filename)) { - return new TextScanWorkOrder( - proto.query_id(), - proto.GetExtension(serialization::TextScanWorkOrder::filename), - proto.GetExtension(serialization::TextScanWorkOrder::field_terminator), - proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences), - query_context->getInsertDestination( - proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)), - storage_manager); - } else { - const serialization::TextBlob &text_blob_proto = - proto.GetExtension(serialization::TextScanWorkOrder::text_blob); - - return new TextScanWorkOrder( - proto.query_id(), - text_blob_proto.blob_id(), - text_blob_proto.size(), - proto.GetExtension(serialization::TextScanWorkOrder::field_terminator), - proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences), - query_context->getInsertDestination( - proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)), - storage_manager); - } - } - case serialization::TEXT_SPLIT: { - LOG(INFO) << "Creating TextSplitWorkOrder"; - return new TextSplitWorkOrder( + return new TextScanWorkOrder( proto.query_id(), - proto.GetExtension(serialization::TextSplitWorkOrder::filename), - proto.GetExtension(serialization::TextSplitWorkOrder::process_escape_sequences), - storage_manager, - proto.GetExtension(serialization::TextSplitWorkOrder::operator_index), - shiftboss_client_id, - bus); + proto.GetExtension(serialization::TextScanWorkOrder::filename), + proto.GetExtension(serialization::TextScanWorkOrder::text_offset), + proto.GetExtension(serialization::TextScanWorkOrder::text_segment_size), + proto.GetExtension(serialization::TextScanWorkOrder::field_terminator), + proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences), + query_context->getInsertDestination( + proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)), + storage_manager); } case serialization::UPDATE: { LOG(INFO) << "Creating UpdateWorkOrder"; @@ -691,27 +666,14 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index)); } case serialization::TEXT_SCAN: { - if (!proto.HasExtension(serialization::TextScanWorkOrder::field_terminator) || - !proto.HasExtension(serialization::TextScanWorkOrder::process_escape_sequences) || - !proto.HasExtension(serialization::TextScanWorkOrder::insert_destination_index) || - !query_context.isValidInsertDestinationId( - proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index))) { - return false; - } - - // Two fields are exclusive. - if (proto.HasExtension(serialization::TextScanWorkOrder::filename) == - proto.HasExtension(serialization::TextScanWorkOrder::text_blob)) { - return false; - } - - return proto.HasExtension(serialization::TextScanWorkOrder::filename) || - proto.GetExtension(serialization::TextScanWorkOrder::text_blob).IsInitialized(); - } - case serialization::TEXT_SPLIT: { - return proto.HasExtension(serialization::TextSplitWorkOrder::filename) && - proto.HasExtension(serialization::TextSplitWorkOrder::process_escape_sequences) && - proto.HasExtension(serialization::TextSplitWorkOrder::operator_index); + return proto.HasExtension(serialization::TextScanWorkOrder::filename) && + proto.HasExtension(serialization::TextScanWorkOrder::text_offset) && + proto.HasExtension(serialization::TextScanWorkOrder::text_segment_size) && + proto.HasExtension(serialization::TextScanWorkOrder::field_terminator) && + proto.HasExtension(serialization::TextScanWorkOrder::process_escape_sequences) && + proto.HasExtension(serialization::TextScanWorkOrder::insert_destination_index) && + query_context.isValidInsertDestinationId( + proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)); } case serialization::UPDATE: { return proto.HasExtension(serialization::UpdateWorkOrder::relation_id) && http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/tests/TextScanOperator_unittest.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp index ef6fc2d..5860745 100644 --- a/relational_operators/tests/TextScanOperator_unittest.cpp +++ b/relational_operators/tests/TextScanOperator_unittest.cpp @@ -193,7 +193,6 @@ TEST_F(TextScanOperatorTest, ScanTest) { input_filename, '\t', true, - false, *relation_, output_destination_index)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/tests/text_scan_input.txt ---------------------------------------------------------------------- diff --git a/relational_operators/tests/text_scan_input.txt b/relational_operators/tests/text_scan_input.txt index bcb76bf..51015bd 100644 --- a/relational_operators/tests/text_scan_input.txt +++ b/relational_operators/tests/text_scan_input.txt @@ -2,9 +2,5 @@ -1234567890 -1.2e-200 A twenty char string 1969-07-21 02:56:00 00:00:01.001 Another twenty chars \N \N \N \N \N \N \N \N \\N \N \N \\N -\x34\062 \55\064\x32\56\65 \x7B\ -\t\ \\\e\s\c\a\p\e\d\x\b\n\x7d 1988-07-16\T00:00\:00\x2E0\x30\60\06001 00:00:00 'good\' \"bye"\r\n\ -\r\n\v\n\ - -0 0.0 \\\\\ -\\\\\n 1970-01-01 0 s \\\\ +\x34\062 \55\064\x32\56\65 \x7B\n\t\ \\\e\s\c\a\p\e\d\x\b\n\x7d 1988-07-16\T00:00\:00\x2E0\x30\60\06001 00:00:00 'good\' \"bye"\r\n\n\r\n\v\n\n +0 0.0 \\\\\n\\\\\n 1970-01-01 0 s \\\\