[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-20 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r326794838
 
 

 ##
 File path: nanofi/ecu/tailfile_delimited.c
 ##
 @@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+int main(int argc, char** argv) {
+
+if (argc < 7) {
+printf("Error: must run ./tailfile_delimited   
   \n");
+exit(1);
+}
+
+tailfile_input_params input_params = init_logaggregate_input(argv);
+
+uint64_t intrvl = 0;
+uint64_t port_num = 0;
+if (validate_input_params(&input_params, &intrvl, &port_num) < 0) {
+return 1;
+}
+
+setup_signal_action();
+nifi_proc_params params = setup_nifi_processor(&input_params, 
"TailFileDelimited", on_trigger_tailfiledelimited);
+
+set_standalone_property(params.processor, "file_path", input_params.file);
+set_standalone_property(params.processor, "delimiter", 
input_params.delimiter);
+
+struct CRawSiteToSiteClient * client = createClient(input_params.instance, 
port_num, input_params.nifi_port_uuid);
+
+char uuid_str[37];
+get_proc_uuid_from_processor(params.processor, uuid_str);
+
+while (!stopped) {
+flow_file_record * new_ff = invoke(params.processor);
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid_str, pp);
+if (pp) {
+transmit_payload(client, pp->ff_list, 1);
+delete_completed_flow_files_from_proc(uuid_str);
 
 Review comment:
   I have investigated this, infact, the function to 
delete_completed_flow_files_from_proc does free all flow files. However, the 
leak was during creating a flow file. Fixed it. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320818597
 
 

 ##
 File path: nanofi/src/core/flowfiles.c
 ##
 @@ -0,0 +1,171 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "api/nanofi.h"
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+
+#include "utlist.h"
+#include 
+#include 
+#include 
+#include 
+
+flow_file_list * add_flow_file_record(flow_file_list ** ff_list, 
flow_file_record * record) {
+if (!record) {
+return *ff_list;
+}
+
+struct flow_file_list * new_node = (struct flow_file_list 
*)malloc(sizeof(struct flow_file_list));
+new_node->ff_record = record;
+LL_APPEND(*ff_list, new_node);
+return new_node;
+}
+
+void free_flow_file_list(flow_file_list ** ff_list) {
+if (!*ff_list) {
+return;
+}
+flow_file_list * head = *ff_list;
+while (head) {
+   flow_file_list * tmp = head;
+   free_flowfile(tmp->ff_record);
+   head = head->next;
+   free(tmp);
+}
+}
+
+void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t 
curr_offset) {
+char offset_str[21];
+snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+add_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+char content_location[strlen(ffr->contentLocation) + 1];
+snprintf(content_location, sizeof(content_location), "%s", 
ffr->contentLocation);
+add_attribute(ffr, "content location", content_location, 
strlen(content_location));
+add_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path));
+}
+
+void update_attributes(flow_file_record * ffr, const char * file_path, 
uint64_t curr_offset) {
+char offset_str[21];
+snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+update_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+char content_location[strlen(ffr->contentLocation) + 1];
+snprintf(content_location, sizeof(content_location), "%s", 
ffr->contentLocation);
+update_attribute(ffr, "content location", content_location, 
strlen(content_location));
+update_attribute(ffr, "tailfile path", (char*)file_path, 
strlen(file_path));
+}
+
+void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, 
int complete) {
+if (!instance || !ff_list) {
+return;
+}
+flow_file_list * el = NULL;
+LL_FOREACH(ff_list, el) {
 
 Review comment:
   Refactored.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320818697
 
 

 ##
 File path: nanofi/src/core/file_utils.c
 ##
 @@ -0,0 +1,146 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+#ifdef _MSC_VER
+#ifndef PATH_MAX
+#define PATH_MAX 260
+#endif
+#endif
+
+int is_directory(const char * path) {
+struct stat dir_stat;
+if (stat(path, &dir_stat) < 0) {
+return 0;
+}
+return S_ISDIR(dir_stat.st_mode);
+}
+
+const char * get_separator(int force_posix)
+{
+#ifdef WIN32
+if (!force_posix) {
+return "\\";
+}
+#endif
+return "/";
+}
+
+char * concat_path(const char * parent, const char * child) {
+char * path = (char *)malloc((strlen(parent) + strlen(child) + 2) * 
sizeof(char));
+strcpy(path, parent);
+const char * sep = get_separator(1);
+strcat(path, sep);
+strcat(path, child);
+return path;
+}
+
+void remove_directory(const char * dir_path) {
+
+if (!is_directory(dir_path)) {
+if (unlink(dir_path) == -1) {
+printf("Could not remove file %s\n", dir_path);
+}
+return;
+}
+
+uint64_t path_len = strlen(dir_path);
+struct dirent * dir;
+DIR * d = opendir(dir_path);
+
+while ((dir = readdir(d)) != NULL) {
+char * entry_name = dir->d_name;
+if (!strcmp(entry_name, ".") || !strcmp(entry_name, "..")) {
+continue;
+}
+char * path = concat_path(dir_path, entry_name);
+remove_directory(path);
+free(path);
+}
+
+rmdir(dir_path);
+closedir(d);
+}
+
+uint64_t file_size(const char * path) {
+if (!path) return -1;
+
+struct stat fstat;
+if (stat(path, &fstat) < 0) {
+return -1;
 
 Review comment:
   Refactored.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320818477
 
 

 ##
 File path: nanofi/src/core/flowfiles.c
 ##
 @@ -0,0 +1,171 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "api/nanofi.h"
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+
+#include "utlist.h"
+#include 
+#include 
+#include 
+#include 
+
+flow_file_list * add_flow_file_record(flow_file_list ** ff_list, 
flow_file_record * record) {
+if (!record) {
+return *ff_list;
+}
+
+struct flow_file_list * new_node = (struct flow_file_list 
*)malloc(sizeof(struct flow_file_list));
+new_node->ff_record = record;
+LL_APPEND(*ff_list, new_node);
+return new_node;
+}
+
+void free_flow_file_list(flow_file_list ** ff_list) {
+if (!*ff_list) {
+return;
+}
+flow_file_list * head = *ff_list;
+while (head) {
+   flow_file_list * tmp = head;
+   free_flowfile(tmp->ff_record);
+   head = head->next;
+   free(tmp);
+}
+}
+
+void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t 
curr_offset) {
+char offset_str[21];
+snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+add_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+char content_location[strlen(ffr->contentLocation) + 1];
+snprintf(content_location, sizeof(content_location), "%s", 
ffr->contentLocation);
+add_attribute(ffr, "content location", content_location, 
strlen(content_location));
+add_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path));
+}
+
+void update_attributes(flow_file_record * ffr, const char * file_path, 
uint64_t curr_offset) {
+char offset_str[21];
+snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+update_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+char content_location[strlen(ffr->contentLocation) + 1];
+snprintf(content_location, sizeof(content_location), "%s", 
ffr->contentLocation);
+update_attribute(ffr, "content location", content_location, 
strlen(content_location));
+update_attribute(ffr, "tailfile path", (char*)file_path, 
strlen(file_path));
+}
+
+void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, 
int complete) {
+if (!instance || !ff_list) {
+return;
+}
+flow_file_list * el = NULL;
+LL_FOREACH(ff_list, el) {
+if (el) {
+if (!complete) {
+transmit_flowfile(el->ff_record, instance);
+}
+if (complete && el->complete) {
+transmit_flowfile(el->ff_record, instance);
+}
+}
+}
+}
+
+void read_payload_and_transmit(struct flow_file_list * ffl, struct 
CRawSiteToSiteClient * client) {
+if (!ffl || !client) {
+return;
+}
+
+char * file = ffl->ff_record->contentLocation;
+FILE * fp = fopen(file, "rb");
+
+struct stat statfs;
+stat(file, &statfs);
+size_t file_size = statfs.st_size;
+
+attribute attr;
+attr.key = "current offset";
+if (get_attribute(ffl->ff_record, &attr) < 0) {
+printf("Error looking up flow file attribute %s\n", attr.key);
+return;
+}
+
+errno = 0;
+uint64_t offset = strtoull((const char *)attr.value, NULL, 10);
+if (errno != 0) {
+printf("Error converting flow file offset value\n");
+return;
+}
+uint64_t begin_offset =  offset - file_size;
+if (fp) {
+char * buff = (char *)malloc(sizeof(char) * 4097);
+size_t count = 0;
+while ((count = fread(buff, 1, 4096, fp)) > 0) {
+buff[count] = '\0';
+begin_offset += count;
+char offset_str[21];
+snprintf(offset_str, sizeof(offset_str), "%llu", begin_offset);
+update_attribute(ffl->ff_record, "current offset", offset_str, 
strlen(offset_str));
+
+attribute_set as;
+uint64_t num_attrs = get_attribute_quantity(ffl->ff_record);
+as.size = num_attrs;
+as.attributes = (attribute *)malloc(num_attrs * sizeof(at

[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320818841
 
 

 ##
 File path: nanofi/src/core/file_utils.c
 ##
 @@ -0,0 +1,146 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+#ifdef _MSC_VER
+#ifndef PATH_MAX
+#define PATH_MAX 260
+#endif
+#endif
+
+int is_directory(const char * path) {
+struct stat dir_stat;
+if (stat(path, &dir_stat) < 0) {
+return 0;
+}
+return S_ISDIR(dir_stat.st_mode);
+}
+
+const char * get_separator(int force_posix)
+{
+#ifdef WIN32
+if (!force_posix) {
+return "\\";
+}
+#endif
+return "/";
+}
+
+char * concat_path(const char * parent, const char * child) {
+char * path = (char *)malloc((strlen(parent) + strlen(child) + 2) * 
sizeof(char));
+strcpy(path, parent);
+const char * sep = get_separator(1);
+strcat(path, sep);
+strcat(path, child);
+return path;
+}
+
+void remove_directory(const char * dir_path) {
+
+if (!is_directory(dir_path)) {
+if (unlink(dir_path) == -1) {
+printf("Could not remove file %s\n", dir_path);
+}
+return;
+}
+
+uint64_t path_len = strlen(dir_path);
+struct dirent * dir;
+DIR * d = opendir(dir_path);
+
+while ((dir = readdir(d)) != NULL) {
+char * entry_name = dir->d_name;
+if (!strcmp(entry_name, ".") || !strcmp(entry_name, "..")) {
+continue;
+}
+char * path = concat_path(dir_path, entry_name);
+remove_directory(path);
+free(path);
+}
+
+rmdir(dir_path);
+closedir(d);
+}
+
+uint64_t file_size(const char * path) {
+if (!path) return -1;
+
+struct stat fstat;
+if (stat(path, &fstat) < 0) {
+return -1;
+}
+return fstat.st_size;
+}
+
+int make_dir(const char * path) {
+if (!path) return -1;
+
+errno = 0;
+int ret = mkdir(path, 
S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
+if (ret == 0) {
+return 0;
+}
+
+switch (errno) {
+case ENOENT: {
+char * found = strrchr(path, '/');
+if (!found) {
+return -1;
+}
+int len = found - path;
+char * dir = calloc(len + 1, sizeof(char));
+strncpy(dir, path, len);
+dir[len] = '\0';
+int res = make_dir(dir);
+free(dir);
+if (res < 0) {
+return -1;
+}
+return mkdir(path, 
S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
+}
+case EEXIST: {
+if (is_directory(path)) {
+return 0;
+}
+return -1;
+}
+default:
+return -1;
+}
+}
+
+char * get_current_working_directory() {
+char * cwd = (char *)malloc(PATH_MAX * sizeof(char));
+#ifdef WIN32
+_getcwd(cwd,PATH_MAX);
+#else
+getcwd(cwd, PATH_MAX);
 
 Review comment:
   Refactored.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320800201
 
 

 ##
 File path: nanofi/src/api/nanofi.cpp
 ##
 @@ -124,25 +124,42 @@ nifi_instance *create_instance(const char *url, 
nifi_port *port) {
   return instance;
 }
 
-standalone_processor *create_processor(const char *name) {
+standalone_processor * create_processor(const char *name, nifi_instance * 
instance) {
   NULL_CHECK(nullptr, name);
   auto ptr = ExecutionPlan::createProcessor(name, name);
   if (!ptr) {
 return nullptr;
   }
-  if (standalone_instance == nullptr) {
+  if (instance == NULL) {
 nifi_port port;
 char portnum[] = "98765";
 port.port_id = portnum;
-standalone_instance = create_instance("internal_standalone", &port);
+instance = create_instance("internal_standalone", &port);
   }
-  auto flow = create_new_flow(standalone_instance);
+  auto flow = create_new_flow(instance);
   std::shared_ptr plan(flow);
   plan->addProcessor(ptr, name);
   ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan);
   return static_cast(ptr.get());
 }
 
+void initialize_content_repo(processor_context * ctx) {
+ctx->initializeContentRepository(get_current_working_directory());
+}
+
+char * get_proc_uuid_from_processor(standalone_processor * proc) {
+const std::string uuid = proc->getUUIDStr();
+uint64_t len = uuid.size();
+char * uuid_str = (char *)malloc((len+1) * sizeof(char));
+strcpy(uuid_str, uuid.c_str());
+return uuid_str;
+}
+
+char * get_proc_uuid_from_context(processor_context * ctx) {
 
 Review comment:
   Updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320795903
 
 

 ##
 File path: nanofi/src/api/nanofi.cpp
 ##
 @@ -124,25 +124,42 @@ nifi_instance *create_instance(const char *url, 
nifi_port *port) {
   return instance;
 }
 
-standalone_processor *create_processor(const char *name) {
+standalone_processor * create_processor(const char *name, nifi_instance * 
instance) {
   NULL_CHECK(nullptr, name);
   auto ptr = ExecutionPlan::createProcessor(name, name);
   if (!ptr) {
 return nullptr;
   }
-  if (standalone_instance == nullptr) {
+  if (instance == NULL) {
 nifi_port port;
 char portnum[] = "98765";
 port.port_id = portnum;
-standalone_instance = create_instance("internal_standalone", &port);
+instance = create_instance("internal_standalone", &port);
   }
-  auto flow = create_new_flow(standalone_instance);
+  auto flow = create_new_flow(instance);
   std::shared_ptr plan(flow);
   plan->addProcessor(ptr, name);
   ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan);
   return static_cast(ptr.get());
 }
 
+void initialize_content_repo(processor_context * ctx) {
+ctx->initializeContentRepository(get_current_working_directory());
+}
+
+char * get_proc_uuid_from_processor(standalone_processor * proc) {
 
 Review comment:
   Updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320795781
 
 

 ##
 File path: nanofi/src/api/nanofi.cpp
 ##
 @@ -245,28 +262,55 @@ flow_file_record* create_ff_object_nc() {
   return new_ff;
 }
 
-flow_file_record * generate_flow_file(nifi_instance * instance, 
standalone_processor * proc) {
-if (!instance || !proc) {
-return nullptr;
-}
+flow_file_record * generate_flow(processor_context * ctx) {
 flow_file_record * ffr = create_ff_object_nc();
 
-auto minifi_instance_ref = 
static_cast(instance->instance_ptr);
-auto content_repo = minifi_instance_ref->getContentRepository();
+ffr->crp = static_cast(new 
std::shared_ptr(ctx->getContentRepository()));
+
+auto plan = 
ExecutionPlan::getPlan(ctx->getProcessorNode()->getProcessor()->getUUIDStr());
 
-ffr->crp = static_cast(new 
std::shared_ptr(content_repo));
-auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
 if (!plan) {
 return nullptr;
 }
 ffr->ffp = static_cast(new 
std::shared_ptr(plan->getCurrentFlowFile()));
 ffr->keepContent = 1;
 auto ff_content_repo_ptr = 
(static_cast*>(ffr->crp));
 auto claim = std::make_shared(*ff_content_repo_ptr);
-const char * full_path = claim->getContentFullPath().c_str();
-int len = strlen(full_path);
-ffr->contentLocation = (char *) malloc(sizeof(char) * (len + 1));
-snprintf(ffr->contentLocation, len + 1, "%s", full_path);
+
+int len = strlen(claim->getContentFullPath().c_str());
 
 Review comment:
   Updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320795461
 
 

 ##
 File path: nanofi/ecu/log_aggregator.c
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/nanofi.h"
+#include "api/ecu.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+int main(int argc, char** argv) {
+
+if (argc < 7) {
 
 Review comment:
   The rest will be ignored if greater than 7.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320795461
 
 

 ##
 File path: nanofi/ecu/log_aggregator.c
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/nanofi.h"
+#include "api/ecu.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+int main(int argc, char** argv) {
+
+if (argc < 7) {
 
 Review comment:
   The rest will be ignored if greater than or equal to 7.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320794492
 
 

 ##
 File path: nanofi/src/core/string_utils.c
 ##
 @@ -20,67 +20,145 @@
 #include "core/string_utils.h"
 #include 
 #include 
+#include 
 
-tokens tokenize_string(const char * str, char delim, tokenizer_mode_t mode) {
-tokens tks;
-tks.num_strings = 0;
-tks.total_bytes = 0;
+int validate_list(struct token_list * tk_list) {
+if (tk_list && tk_list->head && tk_list->tail && tk_list->size > 0) {
+return 1;
 
 Review comment:
   Updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320794204
 
 

 ##
 File path: nanofi/src/core/flowfiles.c
 ##
 @@ -0,0 +1,99 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "api/nanofi.h"
+#include "core/flowfiles.h"
+
+#include "utlist.h"
+#include 
+#include 
+#include 
+
+void add_flow_file_record(flow_file_list ** ff_list, flow_file_record * 
record) {
+if (!record) return;
+
+struct flow_file_list * new_node = (struct flow_file_list 
*)malloc(sizeof(struct flow_file_list));
+new_node->ff_record = record;
+LL_APPEND(*ff_list, new_node);
+}
+
+void free_flow_file_list(flow_file_list ** ff_list) {
+if (!*ff_list) {
+return;
+}
+flow_file_list * el = NULL;
+LL_FOREACH(*ff_list, el) {
+if (el) {
+free_flowfile(el->ff_record);
+}
+}
+}
+
+flow_file_record * write_to_flow_file(const char * buff, size_t count, 
nifi_instance * instance, standalone_processor * proc) {
+if (!buff || !instance || !proc) {
 
 Review comment:
   I think this is more readable than using the macro.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320793828
 
 

 ##
 File path: nanofi/src/core/file_utils.c
 ##
 @@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+int is_directory(const char * path) {
+struct stat dir_stat;
+if (stat(path, &dir_stat) < 0) {
+return 0;
+}
+return S_ISDIR(dir_stat.st_mode);
+}
+
+char * get_separator(int force_posix)
+{
+char * sep = malloc(2 * sizeof(char));
+strcpy(sep, "/");
+#ifdef WIN32
+if (force_posix) {
+return sep;
+} else {
+char * sep = malloc(2 * sizeof(char));
+strcpy(sep, "\\");
+return sep;
+}
+#else
+return sep;
+#endif
+}
+
+char * concat_path(const char * parent, const char * child) {
+char * path = (char *)malloc((strlen(parent) + strlen(child) + 2) * 
sizeof(char));
+strcpy(path, parent);
+char * sep = get_separator(1);
+strcat(path, sep);
+strcat(path, child);
+free(sep);
+return path;
+}
+
+void remove_directory(const char * dir_path) {
+
+if (!is_directory(dir_path)) {
+if (unlink(dir_path) == -1) {
+printf("Could not remove file %s\n", dir_path);
+}
+return;
+}
+
+uint64_t path_len = strlen(dir_path);
+struct dirent * dir;
+DIR * d = opendir(dir_path);
+
+while ((dir = readdir(d)) != NULL) {
+char * entry_name = dir->d_name;
+if (!strcmp(entry_name, ".") || !strcmp(entry_name, "..")) {
+continue;
+}
+char * path = concat_path(dir_path, entry_name);
+remove_directory(path);
+free(path);
+}
+
+rmdir(dir_path);
+closedir(d);
+}
+
+long int file_size(const char * path) {
+if (!path) return -1;
+
+struct stat fstat;
+if (stat(path, &fstat) < 0) {
+return -1;
+}
+return fstat.st_size;
+}
+
+int make_dir(const char * path) {
+if (!path) return 0;
+
+errno = 0;
+int ret = mkdir(path, 
S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH);
+if (ret == 0) {
+return 0;
+}
+
+switch (errno) {
+case ENOENT: {
+char * found = strrchr(path, '/');
+if (!found) {
+return -1;
+}
+int len = found - path;
+char * dir = calloc(len + 1, sizeof(char));
+strncpy(dir, path, len);
+dir[len] = '\0';
+int res = make_dir(dir);
+if (res < 0) {
+free(dir);
 
 Review comment:
   Ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320793739
 
 

 ##
 File path: nanofi/src/core/file_utils.c
 ##
 @@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+int is_directory(const char * path) {
+struct stat dir_stat;
+if (stat(path, &dir_stat) < 0) {
+return 0;
+}
+return S_ISDIR(dir_stat.st_mode);
+}
+
+char * get_separator(int force_posix)
+{
+char * sep = malloc(2 * sizeof(char));
+strcpy(sep, "/");
+#ifdef WIN32
+if (force_posix) {
+return sep;
+} else {
+char * sep = malloc(2 * sizeof(char));
+strcpy(sep, "\\");
+return sep;
+}
+#else
+return sep;
+#endif
+}
+
+char * concat_path(const char * parent, const char * child) {
+char * path = (char *)malloc((strlen(parent) + strlen(child) + 2) * 
sizeof(char));
+strcpy(path, parent);
+char * sep = get_separator(1);
+strcat(path, sep);
+strcat(path, child);
+free(sep);
+return path;
+}
+
+void remove_directory(const char * dir_path) {
+
+if (!is_directory(dir_path)) {
+if (unlink(dir_path) == -1) {
+printf("Could not remove file %s\n", dir_path);
+}
+return;
+}
+
+uint64_t path_len = strlen(dir_path);
+struct dirent * dir;
+DIR * d = opendir(dir_path);
+
+while ((dir = readdir(d)) != NULL) {
+char * entry_name = dir->d_name;
+if (!strcmp(entry_name, ".") || !strcmp(entry_name, "..")) {
+continue;
+}
+char * path = concat_path(dir_path, entry_name);
+remove_directory(path);
+free(path);
+}
+
+rmdir(dir_path);
+closedir(d);
+}
+
+long int file_size(const char * path) {
+if (!path) return -1;
+
+struct stat fstat;
+if (stat(path, &fstat) < 0) {
+return -1;
+}
+return fstat.st_size;
+}
+
+int make_dir(const char * path) {
+if (!path) return 0;
 
 Review comment:
   Updated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320793551
 
 

 ##
 File path: nanofi/src/core/file_utils.c
 ##
 @@ -0,0 +1,138 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "core/string_utils.h"
+#include "core/file_utils.h"
+
+int is_directory(const char * path) {
+struct stat dir_stat;
+if (stat(path, &dir_stat) < 0) {
+return 0;
+}
+return S_ISDIR(dir_stat.st_mode);
+}
+
+char * get_separator(int force_posix)
+{
+char * sep = malloc(2 * sizeof(char));
+strcpy(sep, "/");
+#ifdef WIN32
+if (force_posix) {
+return sep;
+} else {
+char * sep = malloc(2 * sizeof(char));
 
 Review comment:
   Refactored. Please see updated code


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320793071
 
 

 ##
 File path: nanofi/ecu/log_aggregator.c
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/nanofi.h"
+#include "api/ecu.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+int main(int argc, char** argv) {
+
+if (argc < 6) {
+printf("Error: must run ./log_aggregator
 \n");
+exit(1);
+}
+
+char * file = argv[1];
+char * interval = argv[2];
+char * delimiter = argv[3];
+char * instance_str = argv[4];
+char * port_str = argv[5];
+
+if (access(file, F_OK) == -1) {
+printf("Error: %s doesn't exist!\n", file);
+exit(1);
+}
+
+struct stat stats;
+int ret = stat(file, &stats);
+
+if (ret == -1) {
+printf("Error occurred while getting file status {file: %s, error: 
%s}\n", file, strerror(errno));
+exit(1);
+}
+// Check for file existence
+if (S_ISDIR(stats.st_mode)){
+printf("Error: %s is a directory!\n", file);
+exit(1);
+}
+
+errno = 0;
+unsigned long intrvl = strtol(interval, NULL, 10);
+
+if (errno == ERANGE || intrvl == LONG_MAX || intrvl == LONG_MIN) {
 
 Review comment:
   Corrected. Please see updated code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-09-04 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r320789952
 
 

 ##
 File path: nanofi/src/core/flowfiles.c
 ##
 @@ -0,0 +1,169 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "api/nanofi.h"
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+
+#include "utlist.h"
+#include 
+#include 
+#include 
+#include 
+
+flow_file_list * add_flow_file_record(flow_file_list ** ff_list, 
flow_file_record * record) {
+if (!record) {
+return *ff_list;
+}
+
+struct flow_file_list * new_node = (struct flow_file_list 
*)malloc(sizeof(struct flow_file_list));
+new_node->ff_record = record;
+LL_APPEND(*ff_list, new_node);
+return new_node;
+}
+
+void free_flow_file_list(flow_file_list ** ff_list) {
+if (!*ff_list) {
+return;
+}
+flow_file_list * el = NULL;
+LL_FOREACH(*ff_list, el) {
+if (el) {
+free_flowfile(el->ff_record);
+}
+}
+}
+
+void add_attributes(flow_file_record * ffr, const char * file_path, uint64_t 
curr_offset) {
+char offset_str[21];
+snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+add_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+char content_location[strlen(ffr->contentLocation) + 1];
+snprintf(content_location, sizeof(content_location), "%s", 
ffr->contentLocation);
+add_attribute(ffr, "content location", content_location, 
strlen(content_location));
+add_attribute(ffr, "tailfile path", (char*)file_path, strlen(file_path));
+}
+
+void update_attributes(flow_file_record * ffr, const char * file_path, 
uint64_t curr_offset) {
+char offset_str[21];
+snprintf(offset_str, sizeof(offset_str), "%llu", curr_offset);
+update_attribute(ffr, "current offset", offset_str, strlen(offset_str));
+char content_location[strlen(ffr->contentLocation) + 1];
+snprintf(content_location, sizeof(content_location), "%s", 
ffr->contentLocation);
+update_attribute(ffr, "content location", content_location, 
strlen(content_location));
+update_attribute(ffr, "tailfile path", (char*)file_path, 
strlen(file_path));
+}
+
+void transmit_flow_files(nifi_instance * instance, flow_file_list * ff_list, 
int complete) {
+if (!instance || !ff_list) {
+return;
+}
+flow_file_list * el = NULL;
+LL_FOREACH(ff_list, el) {
+if (el) {
+if (!complete) {
+transmit_flowfile(el->ff_record, instance);
+}
+if (complete && el->complete) {
+transmit_flowfile(el->ff_record, instance);
+}
+}
+}
+}
+
+void read_payload_and_transmit(struct flow_file_list * ffl, struct 
CRawSiteToSiteClient * client) {
+if (!ffl || !client) {
+return;
+}
+
+char * file = ffl->ff_record->contentLocation;
+FILE * fp = fopen(file, "rb");
+
+struct stat statfs;
+stat(file, &statfs);
+size_t file_size = statfs.st_size;
+
+attribute attr;
+attr.key = "current offset";
+if (get_attribute(ffl->ff_record, &attr) < 0) {
+printf("Error looking up flow file attribute %s\n", attr.key);
+return;
+}
+
+errno = 0;
+uint64_t offset = strtoull((const char *)attr.value, NULL, 10);
+if (errno != 0) {
+printf("Error converting flow file offset value\n");
+return;
+}
+uint64_t begin_offset =  offset - file_size;
+if (fp) {
+char * buff = (char *)malloc(sizeof(char) * 4097);
+size_t count = 0;
+while ((count = fread(buff, 1, 4096, fp)) > 0) {
+buff[count] = '\0';
+begin_offset += count;
+char offset_str[21];
+snprintf(offset_str, sizeof(offset_str), "%llu", begin_offset);
+update_attribute(ffl->ff_record, "current offset", offset_str, 
strlen(offset_str));
+
+attribute_set as;
+uint64_t num_attrs = get_attribute_quantity(ffl->ff_record);
+as.size = num_attrs;
+as.attributes = (attribute *)malloc(num_attrs * sizeof(attribute));
+get_all_attributes(

[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-08-14 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r314056477
 
 

 ##
 File path: nanofi/include/api/ecu.h
 ##
 @@ -0,0 +1,89 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#ifndef NANOFI_INCLUDE_API_ECU_H_
+#define NANOFI_INCLUDE_API_ECU_H_
+
+#include 
+#include "api/nanofi.h"
+#include "uthash.h"
+#include "utlist.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+typedef struct processor_params {
+char uuid_str[37]; //key
+struct flow_file_list * ff_list;
+uint64_t curr_offset;
+UT_hash_handle hh;
+} processor_params;
+
+extern processor_params * procparams;
 
 Review comment:
   The only other way to deal with this kind of issue to not have globals is to 
modify the internal minifi C++ API we are using for nanofi. Since we will 
anyhow move away from using that, I did not want to deal with modifying that 
and throw away that work later. It wouldn't be a problem to have mutex 
protection when we deal with multiple threads.
   
   Can you please elaborate a use case where we would have to deal with 
multiple threads in an ECU given we offload EFM communication to the 
orchestrator agent ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-08-14 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r314053573
 
 

 ##
 File path: nanofi/src/api/ecu.c
 ##
 @@ -0,0 +1,493 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(const char * uuid) {
+
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+free_flow_file_list(&pp->ff_list);
+HASH_DEL(procparams, pp);
+free(pp);
+}
+}
+
+void signal_handler(int signum) {
+if (signum == SIGINT || signum == SIGTERM) {
+stopped = 1;
+}
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+if (args && *args) {
+input_params->file = args[1];
+input_params->interval = args[2];
+input_params->instance = args[4];
+input_params->tcp_port = args[5];
+input_params->nifi_port_uuid = args[6];
+}
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.delimiter = args[3];
+return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.chunk_size = args[3];
+return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, 
uint64_t * port_num) {
+if (access(params->file, F_OK) == -1) {
+printf("Error: %s doesn't exist!\n", params->file);
+return -1;
+}
+
+struct stat stats;
+int ret = stat(params->file, &stats);
+
+if (ret == -1) {
+printf("Error occurred while getting file status {file: %s, error: 
%s}\n", params->file, strerror(errno));
+return -1;
+}
+// Check for file existence
+if (S_ISDIR(stats.st_mode)){
+printf("Error: %s is a directory!\n", params->file);
+return -1;
+}
+
+errno = 0;
+*intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+if (errno != 0) {
+printf("Invalid interval value specified\n");
+return -1;
+}
+
+errno = 0;
+*port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+if (errno != 0) {
+printf("Cannot convert tcp port to numeric value\n");
+return -1;
+}
+return 0;
+}
+
+void setup_signal_action() {
+struct sigaction action;
+memset(&action, 0, sizeof(sigaction));
+action.sa_handler = signal_handler;
+sigaction(SIGTERM, &action, NULL);
+sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, 
const char * processor_name, void(*callback)(processor_session *, 
processor_context *)) {
+nifi_proc_params params;
+nifi_port port;
+port.port_id = input_params->nifi_port_uuid;
+
+nifi_instance * instance = create_instance(input_params->instance, &port);
+add_custom_processor(processor_name, callback);
+standalone_processor * proc = create_processor(processor_name, instance);
+params.instance = instance;
+params.processor = proc;
+return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, const char * 
uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp == NULL) {
+pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+memset(pp, 0, sizeof(struct processor_params));
+strcpy(pp->uuid_str, uuid);
+HASH_ADD_STR(procparams, uuid_str, pp);
+}
+
+add_flow_file_record(&pp->ff_list, ffr);
+pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_list;
+while (head) {
+struct flow_file_list * tmp = head;
+free_flowfile(tmp->ff_record);
+head = head->next;
+free(tmp);
+}
+pp->ff_list = head;
+}
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_list;
+while (head) {
+struct flow_file_list * tmp = head;
+if (tmp->complete) {
+free_flowfile(tmp->ff_reco

[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-08-14 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r314053283
 
 

 ##
 File path: nanofi/src/api/ecu.c
 ##
 @@ -0,0 +1,493 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(const char * uuid) {
+
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+free_flow_file_list(&pp->ff_list);
+HASH_DEL(procparams, pp);
+free(pp);
+}
+}
+
+void signal_handler(int signum) {
+if (signum == SIGINT || signum == SIGTERM) {
+stopped = 1;
+}
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+if (args && *args) {
+input_params->file = args[1];
+input_params->interval = args[2];
+input_params->instance = args[4];
+input_params->tcp_port = args[5];
+input_params->nifi_port_uuid = args[6];
+}
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.delimiter = args[3];
+return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.chunk_size = args[3];
+return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, 
uint64_t * port_num) {
+if (access(params->file, F_OK) == -1) {
+printf("Error: %s doesn't exist!\n", params->file);
+return -1;
+}
+
+struct stat stats;
+int ret = stat(params->file, &stats);
+
+if (ret == -1) {
+printf("Error occurred while getting file status {file: %s, error: 
%s}\n", params->file, strerror(errno));
+return -1;
+}
+// Check for file existence
+if (S_ISDIR(stats.st_mode)){
+printf("Error: %s is a directory!\n", params->file);
+return -1;
+}
+
+errno = 0;
+*intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+if (errno != 0) {
+printf("Invalid interval value specified\n");
+return -1;
+}
+
+errno = 0;
+*port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+if (errno != 0) {
+printf("Cannot convert tcp port to numeric value\n");
+return -1;
+}
+return 0;
+}
+
+void setup_signal_action() {
+struct sigaction action;
+memset(&action, 0, sizeof(sigaction));
+action.sa_handler = signal_handler;
+sigaction(SIGTERM, &action, NULL);
+sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, 
const char * processor_name, void(*callback)(processor_session *, 
processor_context *)) {
+nifi_proc_params params;
+nifi_port port;
+port.port_id = input_params->nifi_port_uuid;
+
+nifi_instance * instance = create_instance(input_params->instance, &port);
+add_custom_processor(processor_name, callback);
+standalone_processor * proc = create_processor(processor_name, instance);
+params.instance = instance;
+params.processor = proc;
+return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, const char * 
uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp == NULL) {
+pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+memset(pp, 0, sizeof(struct processor_params));
+strcpy(pp->uuid_str, uuid);
+HASH_ADD_STR(procparams, uuid_str, pp);
+}
+
+add_flow_file_record(&pp->ff_list, ffr);
+pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_list;
+while (head) {
+struct flow_file_list * tmp = head;
+free_flowfile(tmp->ff_record);
+head = head->next;
+free(tmp);
+}
+pp->ff_list = head;
+}
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_list;
+while (head) {
+struct flow_file_list * tmp = head;
+if (tmp->complete) {
+free_flowfile(tmp->ff_reco

[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-08-14 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r314053340
 
 

 ##
 File path: nanofi/src/api/ecu.c
 ##
 @@ -0,0 +1,493 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(const char * uuid) {
+
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+free_flow_file_list(&pp->ff_list);
+HASH_DEL(procparams, pp);
+free(pp);
+}
+}
+
+void signal_handler(int signum) {
+if (signum == SIGINT || signum == SIGTERM) {
+stopped = 1;
+}
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+if (args && *args) {
+input_params->file = args[1];
+input_params->interval = args[2];
+input_params->instance = args[4];
+input_params->tcp_port = args[5];
+input_params->nifi_port_uuid = args[6];
+}
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.delimiter = args[3];
+return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.chunk_size = args[3];
+return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, 
uint64_t * port_num) {
+if (access(params->file, F_OK) == -1) {
+printf("Error: %s doesn't exist!\n", params->file);
+return -1;
+}
+
+struct stat stats;
+int ret = stat(params->file, &stats);
+
+if (ret == -1) {
+printf("Error occurred while getting file status {file: %s, error: 
%s}\n", params->file, strerror(errno));
+return -1;
+}
+// Check for file existence
+if (S_ISDIR(stats.st_mode)){
+printf("Error: %s is a directory!\n", params->file);
+return -1;
+}
+
+errno = 0;
+*intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+if (errno != 0) {
+printf("Invalid interval value specified\n");
+return -1;
+}
+
+errno = 0;
+*port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+if (errno != 0) {
+printf("Cannot convert tcp port to numeric value\n");
+return -1;
+}
+return 0;
+}
+
+void setup_signal_action() {
+struct sigaction action;
+memset(&action, 0, sizeof(sigaction));
+action.sa_handler = signal_handler;
+sigaction(SIGTERM, &action, NULL);
+sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, 
const char * processor_name, void(*callback)(processor_session *, 
processor_context *)) {
+nifi_proc_params params;
+nifi_port port;
+port.port_id = input_params->nifi_port_uuid;
+
+nifi_instance * instance = create_instance(input_params->instance, &port);
+add_custom_processor(processor_name, callback);
+standalone_processor * proc = create_processor(processor_name, instance);
+params.instance = instance;
+params.processor = proc;
+return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, const char * 
uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp == NULL) {
+pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+memset(pp, 0, sizeof(struct processor_params));
+strcpy(pp->uuid_str, uuid);
+HASH_ADD_STR(procparams, uuid_str, pp);
+}
+
+add_flow_file_record(&pp->ff_list, ffr);
+pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_list;
+while (head) {
+struct flow_file_list * tmp = head;
+free_flowfile(tmp->ff_record);
+head = head->next;
+free(tmp);
+}
+pp->ff_list = head;
+}
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_list;
+while (head) {
+struct flow_file_list * tmp = head;
+if (tmp->complete) {
+free_flowfile(tmp->ff_reco

[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-08-12 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r313157859
 
 

 ##
 File path: nanofi/src/api/ecu.c
 ##
 @@ -0,0 +1,494 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(char * uuid) {
+
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+free_flow_file_list(&pp->ff_list);
+free(pp->uuid_str);
+HASH_DEL(procparams, pp);
+free(pp);
+}
+}
+
+void signal_handler(int signum) {
+if (signum == SIGINT || signum == SIGTERM) {
+stopped = 1;
+}
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+if (args && *args) {
+input_params->file = args[1];
+input_params->interval = args[2];
+input_params->instance = args[4];
+input_params->tcp_port = args[5];
+input_params->nifi_port_uuid = args[6];
+}
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.delimiter = args[3];
+return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.chunk_size = args[3];
+return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, 
uint64_t * port_num) {
+if (access(params->file, F_OK) == -1) {
+printf("Error: %s doesn't exist!\n", params->file);
+return -1;
+}
+
+struct stat stats;
+int ret = stat(params->file, &stats);
+
+if (ret == -1) {
+printf("Error occurred while getting file status {file: %s, error: 
%s}\n", params->file, strerror(errno));
+return -1;
+}
+// Check for file existence
+if (S_ISDIR(stats.st_mode)){
+printf("Error: %s is a directory!\n", params->file);
+return -1;
+}
+
+errno = 0;
+*intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+if (errno != 0) {
+printf("Invalid interval value specified\n");
+return -1;
+}
+
+errno = 0;
+*port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+if (errno != 0) {
+printf("Cannot convert tcp port to numeric value\n");
+return -1;
+}
+return 0;
+}
+
+void setup_signal_action() {
+struct sigaction action;
+memset(&action, 0, sizeof(sigaction));
+action.sa_handler = signal_handler;
+sigaction(SIGTERM, &action, NULL);
+sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, 
const char * processor_name, void(*callback)(processor_session *, 
processor_context *)) {
+nifi_proc_params params;
+nifi_port port;
+port.port_id = input_params->nifi_port_uuid;
+
+nifi_instance * instance = create_instance(input_params->instance, &port);
+add_custom_processor(processor_name, callback);
+standalone_processor * proc = create_processor(processor_name, instance);
+params.instance = instance;
+params.processor = proc;
+return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp == NULL) {
+pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+memset(pp, 0, sizeof(struct processor_params));
+char * uuid_str = (char *)malloc((strlen(uuid) + 1) * sizeof(char));
+strcpy(uuid_str, uuid);
+pp->uuid_str = uuid_str;
+HASH_ADD_KEYPTR(hh, procparams, pp->uuid_str, strlen(pp->uuid_str), 
pp);
+}
+add_flow_file_record(&pp->ff_list, ffr);
+pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_list;
+while (head) {
+struct flow_file_list * tmp = head;
+free_flowfile(tmp->ff_record);
+head = head->next;
+free(tmp);
+}
+pp->ff_list = head;
+}
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_

[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-08-12 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r313048735
 
 

 ##
 File path: nanofi/src/api/ecu.c
 ##
 @@ -0,0 +1,494 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(char * uuid) {
+
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+free_flow_file_list(&pp->ff_list);
+free(pp->uuid_str);
+HASH_DEL(procparams, pp);
+free(pp);
+}
+}
+
+void signal_handler(int signum) {
+if (signum == SIGINT || signum == SIGTERM) {
+stopped = 1;
+}
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+if (args && *args) {
+input_params->file = args[1];
+input_params->interval = args[2];
+input_params->instance = args[4];
+input_params->tcp_port = args[5];
+input_params->nifi_port_uuid = args[6];
+}
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.delimiter = args[3];
+return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.chunk_size = args[3];
+return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, 
uint64_t * port_num) {
+if (access(params->file, F_OK) == -1) {
+printf("Error: %s doesn't exist!\n", params->file);
+return -1;
+}
+
+struct stat stats;
+int ret = stat(params->file, &stats);
+
+if (ret == -1) {
+printf("Error occurred while getting file status {file: %s, error: 
%s}\n", params->file, strerror(errno));
+return -1;
+}
+// Check for file existence
+if (S_ISDIR(stats.st_mode)){
+printf("Error: %s is a directory!\n", params->file);
+return -1;
+}
+
+errno = 0;
+*intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+if (errno != 0) {
+printf("Invalid interval value specified\n");
+return -1;
+}
+
+errno = 0;
+*port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+if (errno != 0) {
+printf("Cannot convert tcp port to numeric value\n");
+return -1;
+}
+return 0;
+}
+
+void setup_signal_action() {
+struct sigaction action;
+memset(&action, 0, sizeof(sigaction));
+action.sa_handler = signal_handler;
+sigaction(SIGTERM, &action, NULL);
+sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, 
const char * processor_name, void(*callback)(processor_session *, 
processor_context *)) {
+nifi_proc_params params;
+nifi_port port;
+port.port_id = input_params->nifi_port_uuid;
+
+nifi_instance * instance = create_instance(input_params->instance, &port);
+add_custom_processor(processor_name, callback);
+standalone_processor * proc = create_processor(processor_name, instance);
+params.instance = instance;
+params.processor = proc;
+return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp == NULL) {
+pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+memset(pp, 0, sizeof(struct processor_params));
+char * uuid_str = (char *)malloc((strlen(uuid) + 1) * sizeof(char));
+strcpy(uuid_str, uuid);
+pp->uuid_str = uuid_str;
+HASH_ADD_KEYPTR(hh, procparams, pp->uuid_str, strlen(pp->uuid_str), 
pp);
+}
+add_flow_file_record(&pp->ff_list, ffr);
+pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_list;
+while (head) {
+struct flow_file_list * tmp = head;
+free_flowfile(tmp->ff_record);
+head = head->next;
+free(tmp);
+}
+pp->ff_list = head;
+}
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_

[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-08-12 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r313048506
 
 

 ##
 File path: nanofi/src/api/ecu.c
 ##
 @@ -0,0 +1,494 @@
+#include "api/ecu.h"
+#include "api/nanofi.h"
+#include "core/string_utils.h"
+#include "core/cstructs.h"
+#include "core/file_utils.h"
+#include "core/flowfiles.h"
+
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+processor_params * procparams = NULL;
+volatile sig_atomic_t stopped = 0;
+
+void free_proc_params(char * uuid) {
+
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+free_flow_file_list(&pp->ff_list);
+free(pp->uuid_str);
+HASH_DEL(procparams, pp);
+free(pp);
+}
+}
+
+void signal_handler(int signum) {
+if (signum == SIGINT || signum == SIGTERM) {
+stopped = 1;
+}
+}
+
+void init_common_input(tailfile_input_params * input_params, char ** args) {
+if (args && *args) {
+input_params->file = args[1];
+input_params->interval = args[2];
+input_params->instance = args[4];
+input_params->tcp_port = args[5];
+input_params->nifi_port_uuid = args[6];
+}
+}
+
+tailfile_input_params init_logaggregate_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.delimiter = args[3];
+return input_params;
+}
+
+tailfile_input_params init_tailfile_chunk_input(char ** args) {
+tailfile_input_params input_params;
+memset(&input_params, 0, sizeof(input_params));
+init_common_input(&input_params, args);
+input_params.chunk_size = args[3];
+return input_params;
+}
+
+int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, 
uint64_t * port_num) {
+if (access(params->file, F_OK) == -1) {
+printf("Error: %s doesn't exist!\n", params->file);
+return -1;
+}
+
+struct stat stats;
+int ret = stat(params->file, &stats);
+
+if (ret == -1) {
+printf("Error occurred while getting file status {file: %s, error: 
%s}\n", params->file, strerror(errno));
+return -1;
+}
+// Check for file existence
+if (S_ISDIR(stats.st_mode)){
+printf("Error: %s is a directory!\n", params->file);
+return -1;
+}
+
+errno = 0;
+*intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
+
+if (errno != 0) {
+printf("Invalid interval value specified\n");
+return -1;
+}
+
+errno = 0;
+*port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
+if (errno != 0) {
+printf("Cannot convert tcp port to numeric value\n");
+return -1;
+}
+return 0;
+}
+
+void setup_signal_action() {
+struct sigaction action;
+memset(&action, 0, sizeof(sigaction));
+action.sa_handler = signal_handler;
+sigaction(SIGTERM, &action, NULL);
+sigaction(SIGINT, &action, NULL);
+}
+
+nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, 
const char * processor_name, void(*callback)(processor_session *, 
processor_context *)) {
+nifi_proc_params params;
+nifi_port port;
+port.port_id = input_params->nifi_port_uuid;
+
+nifi_instance * instance = create_instance(input_params->instance, &port);
+add_custom_processor(processor_name, callback);
+standalone_processor * proc = create_processor(processor_name, instance);
+params.instance = instance;
+params.processor = proc;
+return params;
+}
+
+void add_to_hash_table(flow_file_record * ffr, uint64_t offset, char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp == NULL) {
+pp = (struct processor_params*)malloc(sizeof(struct processor_params));
+memset(pp, 0, sizeof(struct processor_params));
+char * uuid_str = (char *)malloc((strlen(uuid) + 1) * sizeof(char));
+strcpy(uuid_str, uuid);
+pp->uuid_str = uuid_str;
+HASH_ADD_KEYPTR(hh, procparams, pp->uuid_str, strlen(pp->uuid_str), 
pp);
+}
+add_flow_file_record(&pp->ff_list, ffr);
+pp->curr_offset = offset;
+}
+
+void delete_all_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_list;
+while (head) {
+struct flow_file_list * tmp = head;
+free_flowfile(tmp->ff_record);
+head = head->next;
+free(tmp);
+}
+pp->ff_list = head;
+}
+}
+
+void delete_completed_flow_files_from_proc(const char * uuid) {
+struct processor_params * pp = NULL;
+HASH_FIND_STR(procparams, uuid, pp);
+if (pp) {
+struct flow_file_list * head = pp->ff_

[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-07-17 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r304568258
 
 

 ##
 File path: nanofi/include/core/cstructs.h
 ##
 @@ -155,9 +155,33 @@ typedef struct cstream {
  * ##
  */
 
-typedef struct tokens {
-char ** str_list;
-uint64_t num_strings;
+typedef struct token_node {
+char * data;
+struct token_node * next;
+} token_node;
+
+typedef struct token_list {
 
 Review comment:
   Yes, I remember that and I mentioned in that PR that if I use utlist, I will 
have to use an external structure to keep track of extra variables. Currently 
it is encapsulated in token_list staying together with the tokens. Using utlist 
would make these variables to be unnecessarily copied into each linked list 
node.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-07-17 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r304564745
 
 

 ##
 File path: nanofi/include/core/cstructs.h
 ##
 @@ -155,9 +155,33 @@ typedef struct cstream {
  * ##
  */
 
-typedef struct tokens {
-char ** str_list;
-uint64_t num_strings;
+typedef struct token_node {
+char * data;
+struct token_node * next;
+} token_node;
+
+typedef struct token_list {
 
 Review comment:
   yes, I remember that and I mentioned in that PR that this token_list has 
some additional members that was important for the log_aggregator ecu. If I use 
utlist I will have to have some other structure outside of the list, to keep 
track of the offset in the file. Currently the token_list encapsulates that so 
that token nodes and the offset stay together and it is better to keep it that 
way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-07-17 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r304564745
 
 

 ##
 File path: nanofi/include/core/cstructs.h
 ##
 @@ -155,9 +155,33 @@ typedef struct cstream {
  * ##
  */
 
-typedef struct tokens {
-char ** str_list;
-uint64_t num_strings;
+typedef struct token_node {
+char * data;
+struct token_node * next;
+} token_node;
+
+typedef struct token_list {
 
 Review comment:
   yes, I remember that and I mentioned in that PR that this token_list has 
some additional members that was important for the log_aggregator ecu. If I use 
utlist I will have to have some other structure outside of the list, to keep 
track of the offset in the file. Currently the token_list encapsulates that so 
that token nodes and the offset stay together and it is better to keep it that 
way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-07-17 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r304563238
 
 

 ##
 File path: nanofi/include/api/ecu.h
 ##
 @@ -0,0 +1,57 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#ifndef NANOFI_INCLUDE_API_ECU_H_
+#define NANOFI_INCLUDE_API_ECU_H_
+
+#include 
+#include "api/nanofi.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern nifi_instance * instance;
+extern standalone_processor * proc;
+extern flow_file_list * ff_list;
+extern uint64_t curr_offset;
 
 Review comment:
   Currently these ECUs are used as a binary file run from command line 
supplying arguments as an individual unit. Still no connectivity like a flow 
from one ecu to other. That is the vision of upcoming ECU EFM integration if I 
am not wrong. I take your point that we should avoid the globals in order to 
setup ourselves to integrate EFM later on easily.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-07-17 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r304556732
 
 

 ##
 File path: nanofi/ecu/tailfile_chunk.c
 ##
 @@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#include "api/ecu.h"
+#include "core/flowfiles.h"
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+#include 
+
+int main(int argc, char** argv) {
 
 Review comment:
   Good point, Thought about it earlier but forgot to come back to it. Thanks 
for pointing out


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [nifi-minifi-cpp] msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi tailfile delimited processor

2019-07-16 Thread GitBox
msharee9 commented on a change in pull request #613: Minificpp 927 Nanofi 
tailfile delimited processor
URL: https://github.com/apache/nifi-minifi-cpp/pull/613#discussion_r304086842
 
 

 ##
 File path: nanofi/include/api/ecu.h
 ##
 @@ -0,0 +1,57 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+#ifndef NANOFI_INCLUDE_API_ECU_H_
+#define NANOFI_INCLUDE_API_ECU_H_
+
+#include 
+#include "api/nanofi.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern nifi_instance * instance;
+extern standalone_processor * proc;
+extern flow_file_list * ff_list;
+extern uint64_t curr_offset;
 
 Review comment:
   Even though these are global variables, each instance of the ecu will have 
its own variable. Why do you think that these variables will cause any problem? 
I agree that having global variables like these is not a good practice and I 
will try to correct it but I think global variables will not cause any problem 
with one or multiple instances of same or different ecus.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services