This is an automated email from the ASF dual-hosted git repository. martinzink pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9b6fb59117cf8739f07ad95be8f9c93d7c91d54f Author: Gabor Gyimesi <gamezb...@gmail.com> AuthorDate: Mon Jul 31 11:30:26 2023 +0200 MINIFICPP-2172 Remove TensorFlow extension Closes #1619 Signed-off-by: Martin Zink <martinz...@apache.org> --- .github/workflows/ci.yml | 2 +- README.md | 6 +- Windows.md | 3 +- arch.sh | 2 - bootstrap.sh | 4 - bstrp_functions.sh | 2 - cmake/FindTensorFlow.cmake | 96 ----- cmake/MiNiFiOptions.cmake | 1 - extensions/tensorflow/BUILDING.md | 59 --- extensions/tensorflow/CMakeLists.txt | 46 --- extensions/tensorflow/TFApplyGraph.cpp | 196 ---------- extensions/tensorflow/TFApplyGraph.h | 135 ------- extensions/tensorflow/TFConvertImageToTensor.cpp | 262 -------------- extensions/tensorflow/TFConvertImageToTensor.h | 154 -------- extensions/tensorflow/TFExtractTopLabels.cpp | 161 --------- extensions/tensorflow/TFExtractTopLabels.h | 93 ----- libminifi/test/TestBase.cpp | 2 +- libminifi/test/TestBase.h | 7 +- libminifi/test/tensorflow-tests/CMakeLists.txt | 38 -- .../test/tensorflow-tests/TensorFlowTests.cpp | 399 --------------------- run_clang_tidy.sh | 2 +- win_build_vs.bat | 3 +- 22 files changed, 9 insertions(+), 1664 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f4625a377..23ec4ca3a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -218,7 +218,7 @@ jobs: export LDFLAGS="${LDFLAGS} -stdlib=libc++" cmake -DUSE_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCI_BUILD=ON -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON -DENABLE_AWS=ON -DENABLE_AZURE=ON -DENABLE_BUSTACHE=ON -DENABLE_COAP=ON \ -DENABLE_ENCRYPT_CONFIG=ON -DENABLE_GPS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_NANOFI=ON -DENABLE_OPC=ON -DENABLE_OPENCV=ON \ - -DENABLE_OPENWSMAN=ON -DENABLE_OPS=ON -DENABLE_PCAP=ON -DENABLE_SENSORS=ON -DENABLE_SFTP=ON -DENABLE_SQL=ON -DENABLE_SYSTEMD=ON -DENABLE_TENSORFLOW=OFF \ + -DENABLE_OPENWSMAN=ON -DENABLE_OPS=ON -DENABLE_PCAP=ON -DENABLE_SENSORS=ON -DENABLE_SFTP=ON -DENABLE_SQL=ON -DENABLE_SYSTEMD=ON \ -DENABLE_USB_CAMERA=ON -DENABLE_PYTHON_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON -DENABLE_PROCFS=ON -DENABLE_PROMETHEUS=ON -DENABLE_ELASTICSEARCH=ON \ -DCMAKE_EXPORT_COMPILE_COMMANDS=ON .. cmake --build . --parallel $(nproc) diff --git a/README.md b/README.md index a0518995c..1866219c2 100644 --- a/README.md +++ b/README.md @@ -87,8 +87,8 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension | JNI | **NiFi Processors** [...] | Lua Scripting | [ExecuteScript](PROCESSORS.md#executescript) [...] | MQTT | [ConsumeMQTT](PROCESSORS.md#consumemqtt)<br/>[PublishMQTT](PROCESSORS.md#publishmqtt) [...] -| OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)<br/>[PutOPCProcessor](PROCESSORS.md#putopcprocessor) [...] -| OpenCV | [CaptureRTSPFrame](PROCESSORS.md#capturertspframe)<br/>[MotionDetector](PROCESSORS.md#motiondetector) [...] +| OPC | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)<br/>[PutOPCProcessor](PROCESSORS.md#putopcprocessor) [...] +| OpenCV | [CaptureRTSPFrame](PROCESSORS.md#capturertspframe)<br/>[MotionDetector](PROCESSORS.md#motiondetector) [...] | OpenWSMAN | SourceInitiatedSubscriptionListener [...] | PCAP | [CapturePacket](PROCESSORS.md#capturepacket) [...] | PDH (Windows only) | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor) [...] @@ -99,7 +99,6 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension | SQL | [ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)<br/>[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)<br/> [...] | Splunk | [PutSplunkHTTP](PROCESSORS.md#putsplunkhttp)<br/>[QuerySplunkIndexingStatus](PROCESSORS.md#querysplunkindexingstatus) [...] | Systemd | [ConsumeJournald](PROCESSORS.md#consumejournald) [...] -| Tensorflow | TFApplyGraph<br/>TFConvertImageToTensor<br/>TFExtractTopLabels<br/> [...] | USB Camera | [GetUSBCamera](PROCESSORS.md#getusbcamera) [...] | Windows Event Log (Windows only) | [CollectorInitiatedSubscription](PROCESSORS.md#collectorinitiatedsubscription)<br/>[ConsumeWindowsEventLog](PROCESSORS.md#consumewindowseventlog)<br/>[TailEventLog](PROCESSORS.md#taileventlog) [...] @@ -339,7 +338,6 @@ sudo brew install libpcap G. PCAP support ................Disabled H. USB Camera support ..........Disabled I. GPS support .................Disabled - J. TensorFlow Support ..........Disabled K. Bustache Support ............Disabled L. Lua Scripting Support .......Disabled M. MQTT Support ................Disabled diff --git a/Windows.md b/Windows.md index f215cc083..412b8daa3 100644 --- a/Windows.md +++ b/Windows.md @@ -75,7 +75,6 @@ After the build directory it will take optional parameters modifying the CMake c | /LUA_SCRIPTING | Enables Lua scripting extension | | /PYTHON_SCRIPTING | Enables Python scripting extension | | /SENSORS | Enables the Sensors package | -| /TENSORFLOW | Enables Tensorflow extension | | /USB_CAMERA | Enables USB camera support | | /L | Enables Linter | | /O | Enables OpenCV | @@ -113,7 +112,7 @@ A basic working CMake configuration can be inferred from the `win_build_vs.bat`. ``` mkdir build cd build -cmake -G "Visual Studio 16 2019" -A x64 -DINSTALLER_MERGE_MODULES=OFF -DTEST_CUSTOM_WEL_PROVIDER=OFF -DENABLE_SQL=OFF -DUSE_REAL_ODBC_TEST_DRIVER=OFF -DCMAKE_BUILD_TYPE_INIT=Release -DCMAKE_BUILD_TYPE=Release -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=OFF -DENABLE_JNI=OFF -DOPENSSL_OFF=OFF -DENABLE_COAP=OFF -DENABLE_AWS=OFF -DENABLE_PDH= -DENABLE_AZURE=OFF -DENABLE_SFTP=OFF -DENABLE_SPLUNK= -DENABLE_GCP= -DENABLE_NANOFI=OFF -DENABLE_OPENCV=OFF -DENABLE_PROMETHEUS=OFF -DENABLE_ELASTICSEARCH= -DUSE [...] +cmake -G "Visual Studio 16 2019" -A x64 -DINSTALLER_MERGE_MODULES=OFF -DTEST_CUSTOM_WEL_PROVIDER=OFF -DENABLE_SQL=OFF -DUSE_REAL_ODBC_TEST_DRIVER=OFF -DCMAKE_BUILD_TYPE_INIT=Release -DCMAKE_BUILD_TYPE=Release -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=OFF -DENABLE_JNI=OFF -DOPENSSL_OFF=OFF -DENABLE_COAP=OFF -DENABLE_AWS=OFF -DENABLE_PDH= -DENABLE_AZURE=OFF -DENABLE_SFTP=OFF -DENABLE_SPLUNK= -DENABLE_GCP= -DENABLE_NANOFI=OFF -DENABLE_OPENCV=OFF -DENABLE_PROMETHEUS=OFF -DENABLE_ELASTICSEARCH= -DUSE [...] msbuild /m nifi-minifi-cpp.sln /property:Configuration=Release /property:Platform=x64 copy minifi_main\Release\minifi.exe minifi_main\ cpack diff --git a/arch.sh b/arch.sh index e593ccc25..635fb251b 100644 --- a/arch.sh +++ b/arch.sh @@ -76,8 +76,6 @@ build_deps(){ INSTALLED+=("gpsd") elif [ "$FOUND_VALUE" = "libarchive" ]; then INSTALLED+=("libarchive") - elif [ "$FOUND_VALUE" = "tensorflow" ]; then - INSTALLED+=("tensorflow") elif [ "$FOUND_VALUE" = "boost" ]; then INSTALLED+=("boost") elif [ "$FOUND_VALUE" = "opensslbuild" ]; then diff --git a/bootstrap.sh b/bootstrap.sh index 5d04ef878..48f2fb76c 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -316,10 +316,6 @@ add_option OPENWSMAN_ENABLED ${FALSE} "ENABLE_OPENWSMAN" add_option BUSTACHE_ENABLED ${FALSE} "ENABLE_BUSTACHE" "2.6" ${TRUE} add_dependency BUSTACHE_ENABLED "boost" -## currently need to limit on certain platforms -add_option TENSORFLOW_ENABLED ${FALSE} "ENABLE_TENSORFLOW" "2.6" ${TRUE} -add_dependency TENSORFLOW_ENABLED "tensorflow" - add_option OPC_ENABLED ${FALSE} "ENABLE_OPC" add_dependency OPC_ENABLED "mbedtls" diff --git a/bstrp_functions.sh b/bstrp_functions.sh index cb87e5eff..de8c69da7 100755 --- a/bstrp_functions.sh +++ b/bstrp_functions.sh @@ -368,7 +368,6 @@ show_supported_features() { echo "G. PCAP support ................$(print_feature_status PCAP_ENABLED)" echo "H. USB Camera support ..........$(print_feature_status USB_ENABLED)" echo "I. GPS support .................$(print_feature_status GPS_ENABLED)" - echo "J. TensorFlow Support ..........$(print_feature_status TENSORFLOW_ENABLED)" echo "K. Bustache Support ............$(print_feature_status BUSTACHE_ENABLED)" echo "L. Lua Scripting Support .......$(print_feature_status LUA_SCRIPTING_ENABLED)" echo "M. MQTT Support ................$(print_feature_status MQTT_ENABLED)" @@ -426,7 +425,6 @@ read_feature_options(){ g) ToggleFeature PCAP_ENABLED ;; h) ToggleFeature USB_ENABLED ;; i) ToggleFeature GPS_ENABLED ;; - j) ToggleFeature TENSORFLOW_ENABLED ;; k) ToggleFeature BUSTACHE_ENABLED ;; l) ToggleFeature LUA_SCRIPTING_ENABLED ;; m) ToggleFeature MQTT_ENABLED ;; diff --git a/cmake/FindTensorFlow.cmake b/cmake/FindTensorFlow.cmake deleted file mode 100644 index 0832734d5..000000000 --- a/cmake/FindTensorFlow.cmake +++ /dev/null @@ -1,96 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -include(FindPackageHandleStandardArgs) -unset(TENSORFLOW_FOUND) - -if (TENSORFLOW_INCLUDE_PATH) - message("-- Checking for TensorFlow includes in provided TENSORFLOW_INCLUDE_PATH: ${TENSORFLOW_INCLUDE_PATH}") -endif() - -if (TENSORFLOW_LIB_PATH) - message("-- Checking for TensorFlow libs in provided TENSORFLOW_LIB_PATH: ${TENSORFLOW_LIB_PATH}") -endif() - -if (GOOGLE_PROTOBUF_INCLUDE_PATH) - message("-- Checking for Google Protobuf includes in provided GOOGLE_PROTOBUF_INCLUDE_PATH: ${GOOGLE_PROTOBUF_INCLUDE_PATH}") -endif() - -if (GOOGLE_PROTOBUF_LIB_PATH) - message("-- Checking for Google Protobuf libs in provided GOOGLE_PROTOBUF_LIB_PATH: ${GOOGLE_PROTOBUF_LIB_PATH}") -endif() - -find_path(TENSORFLOW_INCLUDE_DIR - NAMES - tensorflow/core - tensorflow/cc - third_party - HINTS - ${TENSORFLOW_INCLUDE_PATH} - /usr/include/tensorflow - /usr/local/include/google/tensorflow - /usr/local/include/tensorflow - /usr/local/include/tensorflow/bazel-bin/tensorflow/include - /usr/include/google/tensorflow) - -find_library(TENSORFLOW_CC_LIBRARY NAMES tensorflow_cc - HINTS - ${TENSORFLOW_LIB_PATH} - ${TENSORFLOW_INCLUDE_PATH}/bazel-bin/tensorflow - /usr/lib - /usr/local/lib - /usr/local/lib/tensorflow_cc) - -find_path(GOOGLE_PROTOBUF_INCLUDE NAMES google/protobuf - HINTS - ${GOOGLE_PROTOBUF_INCLUDE_PATH} - ${TENSORFLOW_INCLUDE_PATH}/src - /usr/include/tensorflow/src - /usr/local/include/google/tensorflow/src - /usr/local/include/tensorflow/src - /usr/local/include/tensorflow/bazel-bin/tensorflow/include/src - /usr/include/google/tensorflow/src) - -find_library(GOOGLE_PROTOBUF_LIBRARY NAMES protobuf - HINTS - ${GOOGLE_PROTOBUF_LIB_PATH} - /usr/lib - /usr/local/lib - /usr/lib/x86_64-linux-gnu) - -find_package_handle_standard_args(TENSORFLOW DEFAULT_MSG TENSORFLOW_INCLUDE_DIR TENSORFLOW_CC_LIBRARY GOOGLE_PROTOBUF_INCLUDE GOOGLE_PROTOBUF_LIBRARY) - -if(TENSORFLOW_FOUND) - message("-- Found TensorFlow includes: ${TENSORFLOW_INCLUDE_DIR}") - message("-- Found TensorFlow libs: ${TENSORFLOW_CC_LIBRARY}") - message("-- Found Google Protobuf includes: ${GOOGLE_PROTOBUF_INCLUDE}") - message("-- Found Google Protobuf libs: ${GOOGLE_PROTOBUF_LIBRARY}") - set(TENSORFLOW_LIBRARIES ${TENSORFLOW_CC_LIBRARY} ${GOOGLE_PROTOBUF_LIBRARY}) - set(TENSORFLOW_INCLUDE_DIRS - ${TENSORFLOW_INCLUDE_DIR} - ${TENSORFLOW_INCLUDE_DIR}/bazel-genfiles - ${TENSORFLOW_INCLUDE_DIR}/tensorflow/contrib/makefile/downloads - ${TENSORFLOW_INCLUDE_DIR}/tensorflow/contrib/makefile/downloads/eigen - ${TENSORFLOW_INCLUDE_DIR}/tensorflow/contrib/makefile/downloads/gemmlowp - ${TENSORFLOW_INCLUDE_DIR}/tensorflow/contrib/makefile/downloads/nsync/public - ${TENSORFLOW_INCLUDE_DIR}/tensorflow/contrib/makefile/gen/protobuf-host/include - ${GOOGLE_PROTOBUF_INCLUDE}) -else() - message(FATAL_ERROR "TensorFlow or Google Protobuf dependency was not found. Check or set TENSORFLOW_INCLUDE_PATH, TENSORFLOW_LIB_PATH, GOOGLE_PROTOBUF_INCLUDE, GOOGLE_PROTOBUF_LIBRARY to TensorFlow build, Install libtensorflow_cc.so, libprotobuf.so and headers into the system, or disable the TensorFlow extension.") -endif() - -mark_as_advanced(TENSORFLOW_INCLUDE_DIR TENSORFLOW_CC_LIBRARY GOOGLE_PROTOBUF_INCLUDE GOOGLE_PROTOBUF_LIBRARY) diff --git a/cmake/MiNiFiOptions.cmake b/cmake/MiNiFiOptions.cmake index c7d5fdd11..21c35435c 100644 --- a/cmake/MiNiFiOptions.cmake +++ b/cmake/MiNiFiOptions.cmake @@ -107,7 +107,6 @@ add_minifi_option(ENABLE_LUA_SCRIPTING "Enables lua scripting" ON) add_minifi_option(ENABLE_PYTHON_SCRIPTING "Enables python scripting" ON) add_minifi_option(ENABLE_SENSORS "Enables the Sensors package." OFF) add_minifi_option(ENABLE_USB_CAMERA "Enables USB camera support." OFF) -add_minifi_option(ENABLE_TENSORFLOW "Enables the TensorFlow extensions." OFF) ## Disabled by default because TF can be complex/environment-specific to build add_minifi_option(ENABLE_AWS "Enables AWS support." ON) add_minifi_option(ENABLE_OPENCV "Enables the OpenCV extensions." OFF) add_minifi_option(ENABLE_BUSTACHE "Enables Bustache (ApplyTemplate) support." OFF) diff --git a/extensions/tensorflow/BUILDING.md b/extensions/tensorflow/BUILDING.md deleted file mode 100644 index 0a24d5507..000000000 --- a/extensions/tensorflow/BUILDING.md +++ /dev/null @@ -1,59 +0,0 @@ -# Building the MiNiFi - C++ TensorFlow Extension - -The TensorFlow extension depends on the libtensorflow_cc.so (C++) library. -In order to build the extension, CMake must be able to locate the TensorFlow -headers as well as the built libtensorflow_cc.so library. Additionally, the -system must be able to locate the library in its runtime library search path -in order for MiNiFi to run. - -## CentOS 7 - -This extension is known to work on CentOS 7 with cuDNN 6, CUDA 8, and TensorFlow 1.4. - -1. If using CUDA, first install NVIDIA drivers for the system's hardware -2. If using CUDA, install CUDA 8 using the official NVIDIA repositories - -```bash -sudo yum install cuda-repo-rhel7-9.0.176-1.x86_64.rpm -sudo yum install cuda-{core,command-line-tools,curand-dev,cufft-dev,cublas-dev,cusolver-dev}-8-0 -``` - -3. If using CUDA, install cuDNN to /usr/local/cuda-8.0 - -```bash -tar xvf cudnn-8.0-linux-x64-v6.0.tgz -cd cuda -sudo cp lib64/libcudnn.so.6.0.21 /usr/local/cuda-8.0/lib64/ -sudo ln -s /usr/local/cuda-8.0/lib64/libcudnn.so{.6.0.21,.6} -sudo ln -s /usr/local/cuda-8.0/lib64/libcudnn.so{.6.0.21,} -sudo ldconfig -sudo cp include/cudnn.h /usr/local/cuda-8.0/include/ -``` - -4. Install Bazel - -```bash -wget https://copr.fedorainfracloud.org/coprs/vbatts/bazel/repo/epel-7/vbatts-bazel-epel-7.repo -sudo cp vbatts-bazel-epel-7.repo /etc/yum.repos.d/ -sudo yum install bazel -``` - -5. Build and install TensorFlow libtensorflow_cc.so - -There are many ways to build and install TensorFlow libtensorflow_cc.so, but we have found -the following to currently be the simplest: - -```bash -# Help tensorflow_cc find CUDA (only required if using CUDA) -sudo ln -s /usr/local/cuda-8.0 /opt/cuda - -# Clone/build tensorflow_cc -git clone https://github.com/FloopCZ/tensorflow_cc.git -cd tensorflow_cc/tensorflow_cc/ -mkdir build && cd build -cmake3 -DTENSORFLOW_STATIC=OFF -DTENSORFLOW_SHARED=ON .. -make -sudo make install -``` - -6. Build MiNiFi - C++ with TensorFlow extension enabled \ No newline at end of file diff --git a/extensions/tensorflow/CMakeLists.txt b/extensions/tensorflow/CMakeLists.txt deleted file mode 100644 index 975b44d33..000000000 --- a/extensions/tensorflow/CMakeLists.txt +++ /dev/null @@ -1,46 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -# The TensorFlow extension requires the libtensorflow_cc.so library to be installed on the system -# Due to this reason ENABLE_ALL does not include the TensorFlow extension -if (NOT ENABLE_TENSORFLOW) - return() -endif() - -include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt) - -set(CMAKE_CXX_STANDARD 14) -set(CMAKE_CXX_STANDARD_REQUIRED ON) - -find_package(TensorFlow REQUIRED) - -message("-- Found TensorFlow: ${TENSORFLOW_INCLUDE_DIRS}") - -file(GLOB SOURCES "*.cpp") - -add_library(minifi-tensorflow-extensions SHARED ${SOURCES}) - -target_include_directories(minifi-tensorflow-extensions SYSTEM PRIVATE ${TENSORFLOW_INCLUDE_DIRS}) -target_link_libraries(minifi-tensorflow-extensions ${LIBMINIFI} Threads::Threads) -target_link_libraries(minifi-tensorflow-extensions ${TENSORFLOW_LIBRARIES}) - -SET (TENSORFLOW-EXTENSIONS minifi-tensorflow-extensions PARENT_SCOPE) - -register_extension(minifi-tensorflow-extensions "TENSORFLOW EXTENSIONS" TENSORFLOW-EXTENSIONS "This enables TensorFlow support" "${TEST_DIR}/tensorflow-tests") - diff --git a/extensions/tensorflow/TFApplyGraph.cpp b/extensions/tensorflow/TFApplyGraph.cpp deleted file mode 100644 index 72d867270..000000000 --- a/extensions/tensorflow/TFApplyGraph.cpp +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "TFApplyGraph.h" -#include <tensorflow/cc/ops/standard_ops.h> - -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "utils/gsl.h" - -namespace org::apache::nifi::minifi::processors { - -void TFApplyGraph::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} - -void TFApplyGraph::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* /*sessionFactory*/) { - context->getProperty(InputNode, input_node_); - - if (input_node_.empty()) { - logger_->log_error("Invalid input node"); - } - - context->getProperty(OutputNode, output_node_); - - if (output_node_.empty()) { - logger_->log_error("Invalid output node"); - } -} - -void TFApplyGraph::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, - const std::shared_ptr<core::ProcessSession>& session) { - auto flow_file = session->get(); - - if (!flow_file) { - return; - } - - try { - // Read graph - std::string tf_type; - flow_file->getAttribute("tf.type", tf_type); - - std::shared_ptr<tensorflow::GraphDef> graph_def; - uint32_t graph_version; - - { - std::lock_guard<std::mutex> guard(graph_def_mtx_); - - if ("graph" == tf_type) { - logger_->log_info("Reading new graph def"); - graph_def_ = std::make_shared<tensorflow::GraphDef>(); - GraphReadCallback graph_cb(graph_def_); - session->read(flow_file, &graph_cb); - graph_version_++; - logger_->log_info("Read graph version: %i", graph_version_); - session->remove(flow_file); - return; - } - - graph_version = graph_version_; - graph_def = graph_def_; - } - - if (!graph_def) { - logger_->log_error("Cannot process input because no graph has been defined"); - session->transfer(flow_file, Retry); - return; - } - - // Use an existing context, if one is available - std::shared_ptr<TFContext> ctx; - - if (tf_context_q_.try_dequeue(ctx)) { - logger_->log_debug("Using available TensorFlow context"); - - if (ctx->graph_version != graph_version) { - logger_->log_info("Allowing session with stale graph to expire"); - ctx = nullptr; - } - } - - if (!ctx) { - logger_->log_info("Creating new TensorFlow context"); - tensorflow::SessionOptions options; - ctx = std::make_shared<TFContext>(); - ctx->tf_session.reset(tensorflow::NewSession(options)); - ctx->graph_version = graph_version; - auto status = ctx->tf_session->Create(*graph_def); - - if (!status.ok()) { - std::string msg = "Failed to create TensorFlow session: "; - msg.append(status.ToString()); - throw std::runtime_error(msg); - } - } - - // Apply graph - // Read input tensor from flow file - auto input_tensor_proto = std::make_shared<tensorflow::TensorProto>(); - TensorReadCallback tensor_cb(input_tensor_proto); - session->read(flow_file, &tensor_cb); - tensorflow::Tensor input; - if (!input.FromProto(*input_tensor_proto)) { - // failure deliberately ignored at this time - // added to avoid warn_unused_result build errors - } - std::vector<tensorflow::Tensor> outputs; - auto status = ctx->tf_session->Run({{input_node_, input}}, {output_node_}, {}, &outputs); - - if (!status.ok()) { - std::string msg = "Failed to apply TensorFlow graph: "; - msg.append(status.ToString()); - throw std::runtime_error(msg); - } - - // Create output flow file for each output tensor - for (const auto &output : outputs) { - auto tensor_proto = std::make_shared<tensorflow::TensorProto>(); - output.AsProtoTensorContent(tensor_proto.get()); - logger_->log_info("Writing output tensor flow file"); - TensorWriteCallback write_cb(tensor_proto); - session->write(flow_file, &write_cb); - session->transfer(flow_file, Success); - } - - // Make context available for use again - if (tf_context_q_.size_approx() < getMaxConcurrentTasks()) { - logger_->log_debug("Releasing TensorFlow context"); - tf_context_q_.enqueue(ctx); - } else { - logger_->log_info("Destroying TensorFlow context because it is no longer needed"); - } - } catch (std::exception &exception) { - logger_->log_error("Caught Exception %s", exception.what()); - session->transfer(flow_file, Failure); - this->yield(); - } catch (...) { - logger_->log_error("Caught Exception"); - session->transfer(flow_file, Failure); - this->yield(); - } -} - -int64_t TFApplyGraph::GraphReadCallback::process(const std::shared_ptr<io::InputStream>& stream) { - std::string graph_proto_buf; - graph_proto_buf.resize(stream->size()); - const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&graph_proto_buf[0]), stream->size()); - if (num_read != stream->size()) { - throw std::runtime_error("GraphReadCallback failed to fully read flow file input stream"); - } - graph_def_->ParseFromString(graph_proto_buf); - return gsl::narrow<int64_t>(num_read); -} - -int64_t TFApplyGraph::TensorReadCallback::process(const std::shared_ptr<io::InputStream>& stream) { - std::string tensor_proto_buf; - tensor_proto_buf.resize(stream->size()); - const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), stream->size()); - if (num_read != stream->size()) { - throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream"); - } - tensor_proto_->ParseFromString(tensor_proto_buf); - return gsl::narrow<int64_t>(num_read); -} - -int64_t TFApplyGraph::TensorWriteCallback::process(const std::shared_ptr<io::OutputStream>& stream) { - auto tensor_proto_buf = tensor_proto_->SerializeAsString(); - auto num_wrote = stream->write(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), - static_cast<int>(tensor_proto_buf.size())); - - if (num_wrote != tensor_proto_buf.size()) { - throw std::runtime_error("TensorWriteCallback failed to fully write flow file output stream"); - } - - return num_wrote; -} - -REGISTER_RESOURCE(TFApplyGraph, Processor); - -} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/tensorflow/TFApplyGraph.h b/extensions/tensorflow/TFApplyGraph.h deleted file mode 100644 index 6f916dee6..000000000 --- a/extensions/tensorflow/TFApplyGraph.h +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <atomic> - -#include "core/Resource.h" -#include "core/Processor.h" -#include "core/logging/LoggerFactory.h" -#include "core/PropertyDefinition.h" -#include "core/PropertyDefinitionBuilder.h" -#include "core/RelationshipDefinition.h" -#include "tensorflow/core/public/session.h" -#include "concurrentqueue.h" -#include "io/InputStream.h" -#include "io/OutputStream.h" - -namespace org::apache::nifi::minifi::processors { - -class TFApplyGraph : public core::Processor { - public: - explicit TFApplyGraph(const std::string &name, const utils::Identifier &uuid = {}) - : Processor(name, uuid), - logger_(core::logging::LoggerFactory<TFApplyGraph>::getLogger(uuid_)) { - } - - EXTENSIONAPI static constexpr const char* Description = "Applies a TensorFlow graph to the tensor protobuf supplied as input. The tensor is fed into the node specified by the Input Node property. " - "The output FlowFile is a tensor protobuf extracted from the node specified by the Output Node property. TensorFlow graphs are read dynamically by feeding a graph " - "protobuf to the processor with the tf.type property set to graph."; - - EXTENSIONAPI static constexpr auto InputNode = core::PropertyDefinitionBuilder<>::createProperty("Input Node") - .withDescription("The node of the TensorFlow graph to feed tensor inputs to") - .withDefaultValue("") - .build(); - EXTENSIONAPI static constexpr auto OutputNode = core::PropertyDefinitionBuilder<>::createProperty("Output Node") - .withDescription("The node of the TensorFlow graph to read tensor outputs from") - .withDefaultValue("") - .build(); - EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 2>{ - InputNode, - OutputNode - }; - - - EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Successful graph application outputs"}; - EXTENSIONAPI static constexpr auto Retry = core::RelationshipDefinition{"retry", "Inputs which fail graph application but may work if sent again"}; - EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "Failures which will not work if retried"}; - EXTENSIONAPI static constexpr auto Relationships = std::array{ - Success, - Retry, - Failure - }; - - EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; - EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; - EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; - EXTENSIONAPI static constexpr bool IsSingleThreaded = false; - - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - - void initialize() override; - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override; - void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override { - logger_->log_error("onTrigger invocation with raw pointers is not implemented"); - } - void onTrigger(const std::shared_ptr<core::ProcessContext> &context, - const std::shared_ptr<core::ProcessSession> &session) override; - - struct TFContext { - std::shared_ptr<tensorflow::Session> tf_session; - uint32_t graph_version; - }; - - class GraphReadCallback : public InputStreamCallback { - public: - explicit GraphReadCallback(std::shared_ptr<tensorflow::GraphDef> graph_def) - : graph_def_(std::move(graph_def)) { - } - ~GraphReadCallback() override = default; - int64_t process(const std::shared_ptr<io::InputStream>& stream) override; - - private: - std::shared_ptr<tensorflow::GraphDef> graph_def_; - }; - - class TensorReadCallback : public InputStreamCallback { - public: - explicit TensorReadCallback(std::shared_ptr<tensorflow::TensorProto> tensor_proto) - : tensor_proto_(std::move(tensor_proto)) { - } - ~TensorReadCallback() override = default; - int64_t process(const std::shared_ptr<io::InputStream>& stream) override; - - private: - std::shared_ptr<tensorflow::TensorProto> tensor_proto_; - }; - - class TensorWriteCallback : public OutputStreamCallback { - public: - explicit TensorWriteCallback(std::shared_ptr<tensorflow::TensorProto> tensor_proto) - : tensor_proto_(std::move(tensor_proto)) { - } - ~TensorWriteCallback() override = default; - int64_t process(const std::shared_ptr<io::OutputStream>& stream) override; - - private: - std::shared_ptr<tensorflow::TensorProto> tensor_proto_; - }; - - private: - std::shared_ptr<core::logging::Logger> logger_; - std::string input_node_; - std::string output_node_; - std::shared_ptr<tensorflow::GraphDef> graph_def_; - std::mutex graph_def_mtx_; - uint32_t graph_version_ = 0; - moodycamel::ConcurrentQueue<std::shared_ptr<TFContext>> tf_context_q_; -}; - -} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/tensorflow/TFConvertImageToTensor.cpp b/extensions/tensorflow/TFConvertImageToTensor.cpp deleted file mode 100644 index 1ed4adb55..000000000 --- a/extensions/tensorflow/TFConvertImageToTensor.cpp +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "TFConvertImageToTensor.h" - -#include "core/ProcessContext.h" -#include "core/ProcessSession.h" -#include "tensorflow/cc/ops/standard_ops.h" - -namespace org::apache::nifi::minifi::processors { - -void TFConvertImageToTensor::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} - -void TFConvertImageToTensor::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /*sessionFactory*/) { - context->getProperty(ImageFormat.getName(), input_format_); - - if (input_format_.empty()) { - logger_->log_error("Invalid image format"); - } - - std::string val; - - if (context->getProperty(InputWidth.getName(), val)) { - core::Property::StringToInt(val, input_width_); - } else { - logger_->log_error("Invalid Input Width"); - } - - if (context->getProperty(InputHeight.getName(), val)) { - core::Property::StringToInt(val, input_height_); - } else { - logger_->log_error("Invalid Input Height"); - } - - if (context->getProperty(OutputWidth.getName(), val)) { - core::Property::StringToInt(val, output_width_); - } else { - logger_->log_error("Invalid Output Width"); - } - - if (context->getProperty(OutputHeight.getName(), val)) { - core::Property::StringToInt(val, output_height_); - } else { - logger_->log_error("Invalid output height"); - } - - if (context->getProperty(NumChannels.getName(), val)) { - core::Property::StringToInt(val, num_channels_); - } else { - logger_->log_error("Invalid channel count"); - } - - do_crop_ = true; - - if (context->getProperty(CropOffsetX.getName(), val)) { - core::Property::StringToInt(val, crop_offset_x_); - } else { - do_crop_ = false; - } - - if (context->getProperty(CropOffsetY.getName(), val)) { - core::Property::StringToInt(val, crop_offset_y_); - } else { - do_crop_ = false; - } - - if (context->getProperty(CropSizeX.getName(), val)) { - core::Property::StringToInt(val, crop_size_x_); - } else { - do_crop_ = false; - } - - if (context->getProperty(CropSizeY.getName(), val)) { - core::Property::StringToInt(val, crop_size_y_); - } else { - do_crop_ = false; - } - - if (do_crop_) { - logger_->log_info("Input images will be cropped at %d, %d by %d, %d pixels", - crop_offset_x_, - crop_offset_y_, - crop_size_x_, - crop_size_y_); - } -} - -void TFConvertImageToTensor::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, - const std::shared_ptr<core::ProcessSession>& session) { - auto flow_file = session->get(); - - if (!flow_file) { - return; - } - - try { - // Use an existing context, if one is available - std::shared_ptr<TFContext> ctx; - - if (tf_context_q_.try_dequeue(ctx)) { - logger_->log_debug("Using available TensorFlow context"); - } - - std::string input_tensor_name = "input"; - std::string output_tensor_name = "output"; - - if (!ctx) { - logger_->log_info("Creating new TensorFlow context"); - tensorflow::SessionOptions options; - ctx = std::make_shared<TFContext>(); - ctx->tf_session.reset(tensorflow::NewSession(options)); - - auto root = tensorflow::Scope::NewRootScope(); - auto input = tensorflow::ops::Placeholder(root.WithOpName(input_tensor_name), tensorflow::DT_UINT8); - - // Cast pixel values to floats - auto float_caster = tensorflow::ops::Cast(root.WithOpName("float_caster"), input, tensorflow::DT_FLOAT); - - int crop_offset_x = 0; - int crop_offset_y = 0; - int crop_size_x = input_width_; - int crop_size_y = input_height_; - - if (do_crop_) { - crop_offset_x = crop_offset_x_; - crop_offset_y = crop_offset_y_; - crop_size_x = crop_size_x_; - crop_size_y = crop_size_y_; - } - - tensorflow::ops::Slice cropped = tensorflow::ops::Slice(root.WithOpName("crop"), - float_caster, - {crop_offset_y, - crop_offset_x, - 0}, - {crop_size_y, - crop_size_x, - num_channels_}); - - // Expand into batches (of size 1) - auto dims_expander = tensorflow::ops::ExpandDims(root, cropped, 0); - - // Resize tensor to output dimensions - auto resize = tensorflow::ops::ResizeBilinear( - root, dims_expander, - tensorflow::ops::Const(root.WithOpName("resize"), {output_height_, output_width_})); - - // Normalize tensor from 0-255 pixel values to 0.0-1.0 values - auto output = tensorflow::ops::Div(root.WithOpName(output_tensor_name), - tensorflow::ops::Sub(root, resize, {0.0f}), - {255.0f}); - tensorflow::GraphDef graph_def; - { - auto status = root.ToGraphDef(&graph_def); - - if (!status.ok()) { - std::string msg = "Failed to create TensorFlow graph: "; - msg.append(status.ToString()); - throw std::runtime_error(msg); - } - } - - { - auto status = ctx->tf_session->Create(graph_def); - - if (!status.ok()) { - std::string msg = "Failed to create TensorFlow session: "; - msg.append(status.ToString()); - throw std::runtime_error(msg); - } - } - } - - // Apply graph - // Read input tensor from flow file - tensorflow::Tensor img_tensor(tensorflow::DT_UINT8, {input_height_, input_width_, num_channels_}); - ImageReadCallback tensor_cb(&img_tensor); - session->read(flow_file, &tensor_cb); - std::vector<tensorflow::Tensor> outputs; - auto status = ctx->tf_session->Run({{input_tensor_name, img_tensor}}, {output_tensor_name + ":0"}, {}, &outputs); - - if (!status.ok()) { - std::string msg = "Failed to apply TensorFlow graph: "; - msg.append(status.ToString()); - throw std::runtime_error(msg); - } - - // Create output flow file for each output tensor - for (const auto &output : outputs) { - auto tensor_proto = std::make_shared<tensorflow::TensorProto>(); - output.AsProtoTensorContent(tensor_proto.get()); - logger_->log_info("Writing output tensor flow file"); - TensorWriteCallback write_cb(tensor_proto); - session->write(flow_file, &write_cb); - session->transfer(flow_file, Success); - } - - // Make context available for use again - if (tf_context_q_.size_approx() < getMaxConcurrentTasks()) { - logger_->log_debug("Releasing TensorFlow context"); - tf_context_q_.enqueue(ctx); - } else { - logger_->log_info("Destroying TensorFlow context because it is no longer needed"); - } - } catch (std::exception &exception) { - logger_->log_error("Caught Exception %s", exception.what()); - session->transfer(flow_file, Failure); - this->yield(); - } catch (...) { - logger_->log_error("Caught Exception"); - session->transfer(flow_file, Failure); - this->yield(); - } -} - -int64_t TFConvertImageToTensor::ImageReadCallback::process(const std::shared_ptr<io::InputStream>& stream) { - if (tensor_->AllocatedBytes() < stream->size()) { - throw std::runtime_error("Tensor is not big enough to hold FlowFile bytes"); - } - const auto num_read = stream->read(tensor_->flat<unsigned char>().data(), stream->size()); - if (num_read != stream->size()) { - throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream"); - } - return num_read; -} - -int64_t TFConvertImageToTensor::TensorWriteCallback::process(const std::shared_ptr<io::OutputStream>& stream) { - auto tensor_proto_buf = tensor_proto_->SerializeAsString(); - auto num_wrote = stream->write(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), - static_cast<int>(tensor_proto_buf.size())); - - if (num_wrote != tensor_proto_buf.size()) { - std::string msg = "TensorWriteCallback failed to fully write flow file output stream; Expected "; - msg.append(std::to_string(tensor_proto_buf.size())); - msg.append(" and wrote "); - msg.append(std::to_string(num_wrote)); - throw std::runtime_error(msg); - } - - return num_wrote; -} - -REGISTER_RESOURCE(TFConvertImageToTensor, Processor); - -} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/tensorflow/TFConvertImageToTensor.h b/extensions/tensorflow/TFConvertImageToTensor.h deleted file mode 100644 index d537528d6..000000000 --- a/extensions/tensorflow/TFConvertImageToTensor.h +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <atomic> - -#include "core/logging/LoggerFactory.h" -#include "core/PropertyDefinition.h" -#include "core/PropertyDefinitionBuilder.h" -#include "core/Resource.h" -#include "core/Processor.h" -#include "tensorflow/core/public/session.h" -#include "concurrentqueue.h" -#include "io/InputStream.h" -#include "io/OutputStream.h" - -namespace org::apache::nifi::minifi::processors { - -class TFConvertImageToTensor : public core::Processor { - public: - explicit TFConvertImageToTensor(const std::string &name, const utils::Identifier &uuid = {}) - : Processor(name, uuid), - logger_(core::logging::LoggerFactory<TFConvertImageToTensor>::getLogger(uuid_)) { - } - - EXTENSIONAPI static constexpr const char* Description = "Converts the input image file into a tensor protobuf. The image will be resized to the given output tensor dimensions."; - - EXTENSIONAPI static constexpr auto ImageFormat = core::PropertyDefinitionBuilder<>::createProperty("Input Format") - .withDescription("The format of the input image (PNG or RAW). RAW is RGB24.") - .build(); - EXTENSIONAPI static constexpr auto NumChannels = core::PropertyDefinitionBuilder<>::createProperty("Channels") - .withDescription("The number of channels (e.g. 3 for RGB, 4 for RGBA) in the input image") - .withDefaultValue("3") - .build(); - EXTENSIONAPI static constexpr auto InputWidth = core::PropertyDefinitionBuilder<>::createProperty("Input Width") - .withDescription("The width, in pixels, of the input image.") - .build(); - EXTENSIONAPI static constexpr auto InputHeight = core::PropertyDefinitionBuilder<>::createProperty("Input Height") - .withDescription("The height, in pixels, of the input image.") - .build(); - EXTENSIONAPI static constexpr auto OutputWidth = core::PropertyDefinitionBuilder<>::createProperty("Output Width") - .withDescription("The width, in pixels, of the output image.") - .build(); - EXTENSIONAPI static constexpr auto OutputHeight = core::PropertyDefinitionBuilder<>::createProperty("Output Height") - .withDescription("The height, in pixels, of the output image.") - .build(); - EXTENSIONAPI static constexpr auto CropOffsetX = core::PropertyDefinitionBuilder<>::createProperty("Crop Offset X") - .withDescription("The X (horizontal) offset, in pixels, to crop the input image (relative to top-left corner).") - .build(); - EXTENSIONAPI static constexpr auto CropOffsetY = core::PropertyDefinitionBuilder<>::createProperty("Crop Offset Y") - .withDescription("The Y (vertical) offset, in pixels, to crop the input image (relative to top-left corner).") - .build(); - EXTENSIONAPI static constexpr auto CropSizeX = core::PropertyDefinitionBuilder<>::createProperty("Crop Size X") - .withDescription("The X (horizontal) size, in pixels, to crop the input image.") - .build(); - EXTENSIONAPI static constexpr auto CropSizeY = core::PropertyDefinitionBuilder<>::createProperty("Crop Size Y") - .withDescription("The Y (vertical) size, in pixels, to crop the input image.") - .build(); - EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 10>{ - ImageFormat, - NumChannels, - InputWidth, - InputHeight, - OutputWidth, - OutputHeight, - CropOffsetX, - CropOffsetY, - CropSizeX, - CropSizeY - }; - - - EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Successful graph application outputs"}; - EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "Failures which will not work if retried"}; - EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; - - EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; - EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; - EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; - EXTENSIONAPI static constexpr bool IsSingleThreaded = false; - - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - - void initialize() override; - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override; - void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override { - logger_->log_error("onTrigger invocation with raw pointers is not implemented"); - } - void onTrigger(const std::shared_ptr<core::ProcessContext> &context, - const std::shared_ptr<core::ProcessSession> &session) override; - - struct TFContext { - std::shared_ptr<tensorflow::Session> tf_session; - }; - - class ImageReadCallback : public InputStreamCallback { - public: - explicit ImageReadCallback(tensorflow::Tensor *tensor) - : tensor_(tensor) { - } - ~ImageReadCallback() override = default; - int64_t process(const std::shared_ptr<io::InputStream>& stream) override; - - private: - tensorflow::Tensor *tensor_; - }; - - class TensorWriteCallback : public OutputStreamCallback { - public: - explicit TensorWriteCallback(std::shared_ptr<tensorflow::TensorProto> tensor_proto) - : tensor_proto_(std::move(tensor_proto)) { - } - ~TensorWriteCallback() override = default; - int64_t process(const std::shared_ptr<io::OutputStream>& stream) override; - - private: - std::shared_ptr<tensorflow::TensorProto> tensor_proto_; - }; - - private: - std::shared_ptr<core::logging::Logger> logger_; - - std::string input_format_; - int input_width_; - int input_height_; - int output_width_; - int output_height_; - int num_channels_; - bool do_crop_ = false; - int crop_offset_x_ = 0; - int crop_offset_y_ = 0; - int crop_size_x_ = 0; - int crop_size_y_ = 0; - - std::shared_ptr<tensorflow::GraphDef> graph_def_; - moodycamel::ConcurrentQueue<std::shared_ptr<TFContext>> tf_context_q_; -}; - -} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/tensorflow/TFExtractTopLabels.cpp b/extensions/tensorflow/TFExtractTopLabels.cpp deleted file mode 100644 index 95dd25fa0..000000000 --- a/extensions/tensorflow/TFExtractTopLabels.cpp +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "TFExtractTopLabels.h" - -#include "tensorflow/cc/ops/standard_ops.h" - -#include "utils/gsl.h" - -namespace org::apache::nifi::minifi::processors { - -const core::Relationship TFExtractTopLabels::Success( - "success", - "Successful FlowFiles are sent here with labels as attributes"); -const core::Relationship TFExtractTopLabels::Retry( - "retry", - "Failures which might work if retried"); -const core::Relationship TFExtractTopLabels::Failure( - "failure", - "Failures which will not work if retried"); - -void TFExtractTopLabels::initialize() { - setSupportedProperties(Properties); - setSupportedRelationships(Relationships); -} - -void TFExtractTopLabels::onSchedule(core::ProcessContext* /*context*/, core::ProcessSessionFactory* /*sessionFactory*/) { -} - -void TFExtractTopLabels::onTrigger(const std::shared_ptr<core::ProcessContext>& /*context*/, - const std::shared_ptr<core::ProcessSession> &session) { - auto flow_file = session->get(); - - if (!flow_file) { - return; - } - - try { - // Read labels - std::string tf_type; - flow_file->getAttribute("tf.type", tf_type); - std::shared_ptr<std::vector<std::string>> labels; - - { - std::lock_guard<std::mutex> guard(labels_mtx_); - - if (tf_type == "labels") { - logger_->log_info("Reading new labels..."); - auto new_labels = std::make_shared<std::vector<std::string>>(); - LabelsReadCallback cb(new_labels); - session->read(flow_file, &cb); - labels_ = new_labels; - logger_->log_info("Read %d new labels", labels_->size()); - session->remove(flow_file); - return; - } - - labels = labels_; - } - - // Read input tensor from flow file - auto input_tensor_proto = std::make_shared<tensorflow::TensorProto>(); - TensorReadCallback tensor_cb(input_tensor_proto); - session->read(flow_file, &tensor_cb); - - tensorflow::Tensor input; - if (!input.FromProto(*input_tensor_proto)) { - // failure deliberately ignored at this time - // added to avoid warn_unused_result build errors - } - auto input_flat = input.flat<float>(); - - std::vector<std::pair<uint64_t, float>> scores; - - for (int i = 0; i < input_flat.size(); i++) { - scores.emplace_back(std::make_pair(i, input_flat(i))); - } - - std::sort(scores.begin(), scores.end(), [](const std::pair<uint64_t, float> &a, - const std::pair<uint64_t, float> &b) { - return a.second > b.second; - }); - - for (std::size_t i = 0; i < 5 && i < scores.size(); i++) { - if (!labels || scores[i].first > labels->size()) { - logger_->log_error("Label index is out of range (are the correct labels loaded?); routing to retry..."); - session->transfer(flow_file, Retry); - return; - } - flow_file->addAttribute("tf.top_label_" + std::to_string(i), labels->at(scores[i].first)); - } - - session->transfer(flow_file, Success); - } catch (std::exception &exception) { - logger_->log_error("Caught Exception %s", exception.what()); - session->transfer(flow_file, Failure); - this->yield(); - } catch (...) { - logger_->log_error("Caught Exception"); - session->transfer(flow_file, Failure); - this->yield(); - } -} - -int64_t TFExtractTopLabels::LabelsReadCallback::process(const std::shared_ptr<io::InputStream>& stream) { - size_t total_read = 0; - std::string label; - uint64_t max_label_len = 65536; - label.resize(max_label_len); - std::string buf; - uint64_t label_size = 0; - uint64_t buf_size = 8096; - buf.resize(buf_size); - - while (total_read < stream->size()) { - const auto read = stream->read(reinterpret_cast<uint8_t *>(&buf[0]), buf_size); - if (io::isError(read)) break; - for (size_t i = 0; i < read; i++) { - if (buf[i] == '\n' || total_read + i == stream->size()) { - labels_->emplace_back(label.substr(0, label_size)); - label_size = 0; - } else { - label[label_size] = buf[i]; - label_size++; - } - } - - total_read += read; - } - - return gsl::narrow<int64_t>(total_read); -} - -int64_t TFExtractTopLabels::TensorReadCallback::process(const std::shared_ptr<io::OutputStream>& stream) { - std::string tensor_proto_buf; - tensor_proto_buf.resize(stream->size()); - const auto num_read = stream->read(reinterpret_cast<uint8_t *>(&tensor_proto_buf[0]), stream->size()); - if (num_read != stream->size()) { - throw std::runtime_error("TensorReadCallback failed to fully read flow file input stream"); - } - tensor_proto_->ParseFromString(tensor_proto_buf); - return gsl::narrow<int64_t>(num_read); -} - -REGISTER_RESOURCE(TFExtractTopLabels, Processor); - -} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/tensorflow/TFExtractTopLabels.h b/extensions/tensorflow/TFExtractTopLabels.h deleted file mode 100644 index 5b93df55a..000000000 --- a/extensions/tensorflow/TFExtractTopLabels.h +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include <atomic> - -#include <core/Resource.h> -#include <core/Processor.h> -#include <tensorflow/core/public/session.h> -#include <concurrentqueue.h> -#include "io/InputStream.h" -#include "io/OutputStream.h" - -namespace org::apache::nifi::minifi::processors { - -class TFExtractTopLabels : public core::Processor { - public: - explicit TFExtractTopLabels(const std::string &name, const utils::Identifier &uuid = {}) - : Processor(name, uuid), - logger_(logging::LoggerFactory<TFExtractTopLabels>::getLogger(uuid_)) { - } - - EXTENSIONAPI static constexpr const char* Description = "Extracts the top 5 labels for categorical inference models"; - - EXTENSIONAPI static constexpr auto Properties = std::array<core::PropertyReference, 0>{}; - - EXTENSIONAPI static const core::Relationship Success; - EXTENSIONAPI static const core::Relationship Retry; - EXTENSIONAPI static const core::Relationship Failure; - EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Retry, Failure}; - - EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; - EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; - EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; - EXTENSIONAPI static constexpr bool IsSingleThreaded = false; - - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - - void initialize() override; - void onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) override; - void onTrigger(core::ProcessContext* /*context*/, core::ProcessSession* /*session*/) override { - logger_->log_error("onTrigger invocation with raw pointers is not implemented"); - } - void onTrigger(const std::shared_ptr<core::ProcessContext> &context, - const std::shared_ptr<core::ProcessSession> &session) override; - - class LabelsReadCallback : public InputStreamCallback { - public: - explicit LabelsReadCallback(std::shared_ptr<std::vector<std::string>> labels) - : labels_(std::move(labels)) { - } - ~LabelsReadCallback() override = default; - int64_t process(const std::shared_ptr<io::InputStream>& stream) override; - - private: - std::shared_ptr<std::vector<std::string>> labels_; - }; - - class TensorReadCallback : public InputStreamCallback { - public: - explicit TensorReadCallback(std::shared_ptr<tensorflow::TensorProto> tensor_proto) - : tensor_proto_(std::move(tensor_proto)) { - } - ~TensorReadCallback() override = default; - int64_t process(const std::shared_ptr<io::OutputStream>& stream) override; - - private: - std::shared_ptr<tensorflow::TensorProto> tensor_proto_; - }; - - private: - std::shared_ptr<logging::Logger> logger_; - - std::shared_ptr<std::vector<std::string>> labels_; - std::mutex labels_mtx_; -}; - -} // namespace org::apache::nifi::minifi::processors diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index 70afe5ffb..c264cfa26 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -542,7 +542,7 @@ bool TestPlan::runNextProcessor(const PreTriggerVerifier& verify) { return runProcessor(location, verify); } -bool TestPlan::runCurrentProcessor(const PreTriggerVerifier& /*verify*/) { +bool TestPlan::runCurrentProcessor() { std::lock_guard<std::recursive_mutex> guard(mutex); return runProcessor(location); } diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index ab785aa16..33acac229 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -246,12 +246,10 @@ class TestPlan { void scheduleProcessor(const std::shared_ptr<minifi::core::Processor>& processor); void scheduleProcessors(); - // Note: all this verify logic is only used in TensorFlow tests as a replacement for UpdateAttribute - // It should probably not be the part of the standard way of running processors bool runProcessor(const std::shared_ptr<minifi::core::Processor>& processor, const PreTriggerVerifier& verify = nullptr); bool runProcessor(size_t target_location, const PreTriggerVerifier& verify = nullptr); bool runNextProcessor(const PreTriggerVerifier& verify = nullptr); - bool runCurrentProcessor(const PreTriggerVerifier& verify = nullptr); + bool runCurrentProcessor(); bool runCurrentProcessorUntilFlowfileIsProduced(std::chrono::milliseconds wait_duration); std::set<std::shared_ptr<minifi::provenance::ProvenanceEventRecord>> getProvenanceRecords(); @@ -359,8 +357,7 @@ class TestController { static void runSession(const std::shared_ptr<TestPlan> &plan, bool runToCompletion = true, - const std::function<void(const std::shared_ptr<minifi::core::ProcessContext>&, - const std::shared_ptr<minifi::core::ProcessSession>&)>& verify = nullptr) { + const TestPlan::PreTriggerVerifier& verify = nullptr) { while (plan->runNextProcessor(verify) && runToCompletion) { } } diff --git a/libminifi/test/tensorflow-tests/CMakeLists.txt b/libminifi/test/tensorflow-tests/CMakeLists.txt deleted file mode 100644 index 7056acbd0..000000000 --- a/libminifi/test/tensorflow-tests/CMakeLists.txt +++ /dev/null @@ -1,38 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -find_package(TensorFlow REQUIRED) - -file(GLOB TENSORFLOW_INTEGRATION_TESTS "*.cpp") - -SET(EXTENSIONS_TEST_COUNT 0) -FOREACH(testfile ${TENSORFLOW_INTEGRATION_TESTS}) - get_filename_component(testfilename "${testfile}" NAME_WE) - add_executable("${testfilename}" "${testfile}" "${TEST_DIR}/TestBase.cpp") - target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/tensorflow") - target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors") - target_include_directories(${testfilename} SYSTEM PRIVATE BEFORE ${TENSORFLOW_INCLUDE_DIRS}) - createTests("${testfilename}") - target_link_libraries(${testfilename} minifi-tensorflow-extensions) - target_link_libraries(${testfilename} minifi-standard-processors) - target_link_libraries(${testfilename} ${CATCH_MAIN_LIB}) - MATH(EXPR EXTENSIONS_TEST_COUNT "${EXTENSIONS_TEST_COUNT}+1") - add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR}) -ENDFOREACH() -message("-- Finished building ${TENSORFLOW-EXTENSIONS_TEST_COUNT} TensorFlow related test file(s)...") diff --git a/libminifi/test/tensorflow-tests/TensorFlowTests.cpp b/libminifi/test/tensorflow-tests/TensorFlowTests.cpp deleted file mode 100644 index ede69b238..000000000 --- a/libminifi/test/tensorflow-tests/TensorFlowTests.cpp +++ /dev/null @@ -1,399 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <memory> -#include <string> - -#include "processors/GetFile.h" -#include "processors/LogAttribute.h" -#include "processors/PutFile.h" -#include "tensorflow/cc/framework/scope.h" -#include "tensorflow/cc/ops/standard_ops.h" -#include "TFApplyGraph.h" -#include <TFConvertImageToTensor.h> -#include <TFExtractTopLabels.h> - -#include "../TestBase.h" - -TEST_CASE("TensorFlow: Apply Graph", "[tfApplyGraph]") { // NOLINT - TestController testController; - - LogTestController::getInstance().setTrace<TestPlan>(); - LogTestController::getInstance().setTrace<processors::TFApplyGraph>(); - LogTestController::getInstance().setTrace<processors::PutFile>(); - LogTestController::getInstance().setTrace<processors::GetFile>(); - LogTestController::getInstance().setTrace<processors::LogAttribute>(); - - auto plan = testController.createPlan(); - auto repo = std::make_shared<TestRepository>(); - - // Define directory for input protocol buffers - std::string in_dir = testController.createTempDirectory(); - - // Define input graph protocol buffer file - std::string in_graph_file(in_dir); - in_graph_file.append("/in_graph.pb"); - - // Define input tensor protocol buffer file - std::string in_tensor_file(in_dir); - in_tensor_file.append("/tensor.pb"); - - // Define directory for output protocol buffers - std::string out_dir = testController.createTempDirectory(); - - // Define output tensor protocol buffer file - std::string out_tensor_file(out_dir); - out_tensor_file.append("/tensor.pb"); - - // Build MiNiFi processing graph - auto get_file = plan->addProcessor( - "GetFile", - "Get Proto"); - plan->setProperty( - get_file, - processors::GetFile::Directory, in_dir); - plan->setProperty( - get_file, - processors::GetFile::KeepSourceFile, - "false"); - plan->addProcessor( - "LogAttribute", - "Log Pre Graph Apply", - core::Relationship("success", "description"), - true); - auto tf_apply = plan->addProcessor( - "TFApplyGraph", - "Apply Graph", - core::Relationship("success", "description"), - true); - plan->addProcessor( - "LogAttribute", - "Log Post Graph Apply", - core::Relationship("success", "description"), - true); - plan->setProperty( - tf_apply, - processors::TFApplyGraph::InputNode.getName(), - "Input"); - plan->setProperty( - tf_apply, - processors::TFApplyGraph::OutputNode.getName(), - "Output"); - auto put_file = plan->addProcessor( - "PutFile", - "Put Output Tensor", - core::Relationship("success", "description"), - true); - plan->setProperty( - put_file, - processors::PutFile::Directory, - out_dir); - plan->setProperty( - put_file, - processors::PutFile::ConflictResolution, - processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE); - - // Build test TensorFlow graph - { - tensorflow::Scope root = tensorflow::Scope::NewRootScope(); - auto d = tensorflow::ops::Placeholder(root.WithOpName("Input"), tensorflow::DT_FLOAT); - auto v = tensorflow::ops::Add(root.WithOpName("Output"), d, d); - tensorflow::GraphDef graph; - - // Write test TensorFlow graph - REQUIRE(root.ToGraphDef(&graph).ok()); - std::ofstream in_file_stream(in_graph_file); - graph.SerializeToOstream(&in_file_stream); - } - - // Read test TensorFlow graph into TFApplyGraph - plan->runNextProcessor([&in_graph_file](const std::shared_ptr<core::ProcessContext> /*context*/, - const std::shared_ptr<core::ProcessSession> session) { - // Intercept the call so that we can add an attr (won't be required when we have UpdateAttribute processor) - auto flow_file = session->create(); - session->import(in_graph_file, flow_file, false); - flow_file->addAttribute("tf.type", "graph"); - session->transfer(flow_file, processors::GetFile::Success); - session->commit(); - }); - - plan->runNextProcessor(); // Log - plan->runNextProcessor(); // ApplyGraph (loads graph) - - // Write test input tensor - { - tensorflow::Tensor input(tensorflow::DT_FLOAT, {1, 1}); - input.flat<float>().data()[0] = 2.0f; - tensorflow::TensorProto tensor_proto; - input.AsProtoTensorContent(&tensor_proto); - - std::ofstream in_file_stream(in_tensor_file); - tensor_proto.SerializeToOstream(&in_file_stream); - } - - plan->reset(); - plan->runNextProcessor(); // GetFile - plan->runNextProcessor(); // Log - plan->runNextProcessor(); // ApplyGraph (applies graph) - plan->runNextProcessor(); // Log - plan->runNextProcessor(); // PutFile - - // Read test output tensor - { - std::ifstream out_file_stream(out_tensor_file); - tensorflow::TensorProto tensor_proto; - tensor_proto.ParseFromIstream(&out_file_stream); - tensorflow::Tensor tensor; - REQUIRE(tensor.FromProto(tensor_proto)); - - // Verify output tensor - float tensor_val = tensor.flat<float>().data()[0]; - REQUIRE(tensor_val == 4.0f); - } -} - -TEST_CASE("TensorFlow: ConvertImageToTensor", "[tfConvertImageToTensor]") { // NOLINT - TestController testController; - - LogTestController::getInstance().setTrace<TestPlan>(); - LogTestController::getInstance().setTrace<processors::TFConvertImageToTensor>(); - LogTestController::getInstance().setTrace<processors::PutFile>(); - LogTestController::getInstance().setTrace<processors::GetFile>(); - LogTestController::getInstance().setTrace<processors::LogAttribute>(); - - auto plan = testController.createPlan(); - auto repo = std::make_shared<TestRepository>(); - - // Define directory for input protocol buffers - std::string in_dir = testController.createTempDirectory(); - - // Define input tensor protocol buffer file - std::string in_img_file(in_dir); - in_img_file.append("/img"); - - // Define directory for output protocol buffers - std::string out_dir = testController.createTempDirectory(); - - // Define output tensor protocol buffer file - std::string out_tensor_file(out_dir); - out_tensor_file.append("/img"); - - // Build MiNiFi processing graph - auto get_file = plan->addProcessor( - "GetFile", - "Get Proto"); - plan->setProperty( - get_file, - processors::GetFile::Directory, in_dir); - plan->setProperty( - get_file, - processors::GetFile::KeepSourceFile, - "false"); - plan->addProcessor( - "LogAttribute", - "Log Pre Graph Apply", - core::Relationship("success", "description"), - true); - auto tf_apply = plan->addProcessor( - "TFConvertImageToTensor", - "Convert Image", - core::Relationship("success", "description"), - true); - plan->addProcessor( - "LogAttribute", - "Log Post Graph Apply", - core::Relationship("success", "description"), - true); - plan->setProperty( - tf_apply, - processors::TFConvertImageToTensor::ImageFormat.getName(), - "RAW"); - plan->setProperty( - tf_apply, - processors::TFConvertImageToTensor::InputWidth.getName(), - "2"); - plan->setProperty( - tf_apply, - processors::TFConvertImageToTensor::InputHeight.getName(), - "2"); - plan->setProperty( - tf_apply, - processors::TFConvertImageToTensor::OutputWidth.getName(), - "10"); - plan->setProperty( - tf_apply, - processors::TFConvertImageToTensor::OutputHeight.getName(), - "10"); - plan->setProperty( - tf_apply, - processors::TFConvertImageToTensor::NumChannels.getName(), - "1"); - auto put_file = plan->addProcessor( - "PutFile", - "Put Output Tensor", - core::Relationship("success", "description"), - true); - plan->setProperty( - put_file, - processors::PutFile::Directory, - out_dir); - plan->setProperty( - put_file, - processors::PutFile::ConflictResolution, - processors::PutFile::CONFLICT_RESOLUTION_STRATEGY_REPLACE); - - // Write test input image - { - // 2x2 single-channel 8 bit per channel - const uint8_t in_img_raw[2 * 2] = {0, 0, - 0, 0}; - - std::ofstream in_file_stream(in_img_file); - in_file_stream << in_img_raw; - } - - plan->reset(); - plan->runNextProcessor(); // GetFile - plan->runNextProcessor(); // Log - plan->runNextProcessor(); // TFConvertImageToTensor - plan->runNextProcessor(); // Log - plan->runNextProcessor(); // PutFile - - // Read test output tensor - { - std::ifstream out_file_stream(out_tensor_file); - tensorflow::TensorProto tensor_proto; - tensor_proto.ParseFromIstream(&out_file_stream); - tensorflow::Tensor tensor; - REQUIRE(tensor.FromProto(tensor_proto)); - - // Verify output tensor - auto shape = tensor.shape(); - auto shapeString = shape.DebugString(); - - // Ensure output tensor is of the expected shape - REQUIRE(shape.IsSameSize({1, // Batch size - 10, // Width - 10, // Height - 1})); // Channels - } -} - -TEST_CASE("TensorFlow: Extract Top Labels", "[tfExtractTopLabels]") { // NOLINT - TestController testController; - - LogTestController::getInstance().setTrace<TestPlan>(); - LogTestController::getInstance().setTrace<processors::TFExtractTopLabels>(); - LogTestController::getInstance().setTrace<processors::GetFile>(); - LogTestController::getInstance().setTrace<processors::LogAttribute>(); - - auto plan = testController.createPlan(); - auto repo = std::make_shared<TestRepository>(); - - // Define directory for input protocol buffers - std::string in_dir = testController.createTempDirectory(); - - // Define input labels file - std::string in_labels_file(in_dir); - in_labels_file.append("/in_labels"); - - // Define input tensor protocol buffer file - std::string in_tensor_file(in_dir); - in_tensor_file.append("/tensor.pb"); - - // Build MiNiFi processing graph - auto get_file = plan->addProcessor( - "GetFile", - "Get Input"); - plan->setProperty( - get_file, - processors::GetFile::Directory, in_dir); - plan->setProperty( - get_file, - processors::GetFile::KeepSourceFile, - "false"); - plan->addProcessor( - "LogAttribute", - "Log Pre Extract", - core::Relationship("success", "description"), - true); - auto tf_apply = plan->addProcessor( - "TFExtractTopLabels", - "Extract", - core::Relationship("success", "description"), - true); - plan->addProcessor( - "LogAttribute", - "Log Post Extract", - core::Relationship("success", "description"), - true); - - // Build test labels - { - // Write labels - std::ofstream in_file_stream(in_labels_file); - in_file_stream << "label_a\nlabel_b\nlabel_c\nlabel_d\nlabel_e\nlabel_f\nlabel_g\nlabel_h\nlabel_i\nlabel_j\n"; - } - - // Read labels - plan->runNextProcessor([&in_labels_file](const std::shared_ptr<core::ProcessContext> /*context*/, - const std::shared_ptr<core::ProcessSession> session) { - // Intercept the call so that we can add an attr (won't be required when we have UpdateAttribute processor) - auto flow_file = session->create(); - session->import(in_labels_file, flow_file, false); - flow_file->addAttribute("tf.type", "labels"); - session->transfer(flow_file, processors::GetFile::Success); - session->commit(); - }); - - plan->runNextProcessor(); // Log - plan->runNextProcessor(); // Extract (loads labels) - - // Write input tensor - { - tensorflow::Tensor input(tensorflow::DT_FLOAT, {10}); - input.flat<float>().data()[0] = 0.000f; - input.flat<float>().data()[1] = 0.400f; - input.flat<float>().data()[2] = 0.100f; - input.flat<float>().data()[3] = 0.005f; - input.flat<float>().data()[4] = 1.000f; - input.flat<float>().data()[5] = 0.500f; - input.flat<float>().data()[6] = 0.200f; - input.flat<float>().data()[7] = 0.000f; - input.flat<float>().data()[8] = 0.300f; - input.flat<float>().data()[9] = 0.000f; - tensorflow::TensorProto tensor_proto; - input.AsProtoTensorContent(&tensor_proto); - - std::ofstream in_file_stream(in_tensor_file); - tensor_proto.SerializeToOstream(&in_file_stream); - } - - plan->reset(); - plan->runNextProcessor(); // GetFile - plan->runNextProcessor(); // Log - plan->runNextProcessor(); // Extract - plan->runNextProcessor(); // Log - - // Verify labels - REQUIRE(LogTestController::getInstance().contains("key:tf.top_label_0 value:label_e")); - REQUIRE(LogTestController::getInstance().contains("key:tf.top_label_1 value:label_f")); - REQUIRE(LogTestController::getInstance().contains("key:tf.top_label_2 value:label_b")); - REQUIRE(LogTestController::getInstance().contains("key:tf.top_label_3 value:label_i")); - REQUIRE(LogTestController::getInstance().contains("key:tf.top_label_4 value:label_g")); -} diff --git a/run_clang_tidy.sh b/run_clang_tidy.sh index 4079d76e3..1a6f5b2b1 100755 --- a/run_clang_tidy.sh +++ b/run_clang_tidy.sh @@ -4,7 +4,7 @@ set -uo pipefail FILE=$1 -EXCLUDED_EXTENSIONS=("pdh" "windows-event-log" "tensorflow") +EXCLUDED_EXTENSIONS=("pdh" "windows-event-log") EXCLUDED_DIRECTORY=("nanofi") for excluded_extension in "${EXCLUDED_EXTENSIONS[@]}"; do diff --git a/win_build_vs.bat b/win_build_vs.bat index e40c8dbd9..734cfd2bc 100755 --- a/win_build_vs.bat +++ b/win_build_vs.bat @@ -92,7 +92,6 @@ for %%x in (%*) do ( if [%%~x] EQU [/PCAP] set enable_pcap=ON if [%%~x] EQU [/PYTHON_SCRIPTING] set enable_python_scripting=ON if [%%~x] EQU [/SENSORS] set enable_sensors=ON - if [%%~x] EQU [/TENSORFLOW] set enable_tensorflow=ON if [%%~x] EQU [/USB_CAMERA] set enable_usb_camera=ON if [%%~x] EQU [/64] set build_platform=x64 if [%%~x] EQU [/D] set cmake_build_type=RelWithDebInfo @@ -121,7 +120,7 @@ cmake -G %generator% %build_platform_cmd% -DINSTALLER_MERGE_MODULES=%installer_m -DENABLE_NANOFI=%build_nanofi% -DENABLE_OPENCV=%build_opencv% -DENABLE_PROMETHEUS=%build_prometheus% -DENABLE_ELASTICSEARCH=%build_ELASTIC% -DUSE_SHARED_LIBS=OFF -DENABLE_CONTROLLER=OFF ^ -DENABLE_BUSTACHE=%enable_bustache% -DENABLE_COAP=%enable_coap% -DENABLE_ENCRYPT_CONFIG=%enable_encrypt_config% -DENABLE_GPS=%enable_gps% -DENABLE_LUA_SCRIPTING=%enable_lua_scripting% ^ -DENABLE_MQTT=%enable_mqtt% -DENABLE_OPC=%enable_opc% -DENABLE_OPENWSMAN=%enable_openwsman% -DENABLE_OPS=%enable_ops% -DENABLE_PCAP=%enable_pcap% ^ - -DENABLE_PYTHON_SCRIPTING=%enable_python_scripting% -DENABLE_SENSORS=%enable_sensors% -DENABLE_TENSORFLOW=%enable_tensorflow% -DENABLE_USB_CAMERA=%enable_usb_camera% ^ + -DENABLE_PYTHON_SCRIPTING=%enable_python_scripting% -DENABLE_SENSORS=%enable_sensors% -DENABLE_USB_CAMERA=%enable_usb_camera% ^ -DBUILD_ROCKSDB=ON -DFORCE_WINDOWS=ON -DUSE_SYSTEM_UUID=OFF -DDISABLE_LIBARCHIVE=OFF -DENABLE_WEL=ON -DFAIL_ON_WARNINGS=OFF -DSKIP_TESTS=%skiptests% ^ %strict_gsl_checks% %redist% %sccache_arg% %EXTRA_CMAKE_ARGUMENTS% "%scriptdir%" && %buildcmd% IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%