martinzink commented on code in PR #2186:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2186#discussion_r3339532007
##########
minifi_rust/minifi_native/src/api/processor_wrappers/flow_file_stream_transform.rs:
##########
@@ -0,0 +1,175 @@
+use crate::api::process_session::IoState;
+use crate::api::processor::AdvancedProcessorFeatures;
+use
crate::api::processor_wrappers::utils::context_session_flowfile_bundle::ContextSessionFlowFileBundle;
+use crate::api::raw_processor::{MultiThreadedTrigger, SingleThreadedTrigger};
+use crate::{
+ CalculateMetrics, Concurrent, Exclusive, GetAttribute,
GetControllerService, GetProperty,
+ InputStream, LogLevel, Logger, MinifiError, OnTriggerResult, OutputStream,
ProcessContext,
+ ProcessSession, Processor, Relationship, Schedule,
+};
+use std::collections::HashMap;
+
+pub struct TransformStreamResult {
+ target_relationship_name: &'static str,
+ attributes_to_add: HashMap<String, String>,
+ write_status: IoState,
+}
+
+impl TransformStreamResult {
+ pub fn new(
+ target_relationship: &Relationship,
+ attributes_to_add: HashMap<String, String>,
+ ) -> Self {
+ Self {
+ target_relationship_name: target_relationship.name,
+ attributes_to_add,
+ write_status: IoState::Ok,
+ }
+ }
+
+ pub fn route_without_changes(target_relationship: &Relationship) -> Self {
+ Self {
+ target_relationship_name: target_relationship.name,
+ attributes_to_add: HashMap::new(),
+ write_status: IoState::Cancel,
+ }
+ }
+
+ pub fn target_relationship_name(&self) -> &'static str {
+ self.target_relationship_name
+ }
+
+ pub fn get_attribute(&self, name: &str) -> Option<String> {
+ self.attributes_to_add.get(name).cloned()
+ }
+
+ pub fn write_status(&self) -> IoState {
+ self.write_status
+ }
+}
+
+pub trait FlowFileStreamTransform {
+ fn transform<Ctx: GetProperty + GetControllerService + GetAttribute,
LoggerImpl: Logger>(
+ &self,
+ context: &Ctx,
+ input_stream: &mut dyn InputStream,
+ output_stream: &mut dyn OutputStream,
+ logger: &LoggerImpl,
+ ) -> Result<TransformStreamResult, MinifiError>;
+}
+
+pub trait MutFlowFileStreamTransform {
+ fn transform<Ctx: GetProperty + GetControllerService + GetAttribute,
LoggerImpl: Logger>(
+ &mut self,
+ context: &Ctx,
+ input_stream: &mut dyn InputStream,
+ output_stream: &mut dyn OutputStream,
+ logger: &LoggerImpl,
+ ) -> Result<TransformStreamResult, MinifiError>;
+}
+
+pub struct FlowFileStreamTransformProcessorType {}
+
+fn handle_stream_transform<PC, PS, L, F>(
+ context: &mut PC,
+ session: &mut PS,
+ logger: &L,
+ mut transform_fn: F,
+) -> Result<OnTriggerResult, MinifiError>
+where
+ PC: ProcessContext,
+ PS: ProcessSession<FlowFile = PC::FlowFile>,
+ L: Logger,
+ F: FnMut(
+ &ContextSessionFlowFileBundle<PC, PS>,
+ &mut dyn InputStream,
+ &mut dyn OutputStream,
+ ) -> Result<TransformStreamResult, MinifiError>,
+{
+ if let Some(mut flow_file) = session.get() {
+ let simple_context = ContextSessionFlowFileBundle::new(context,
session, Some(&flow_file));
+
+ let (relationship, attrs) = session.read_stream(&flow_file,
|input_stream| {
+ session.write_stream(&flow_file, |output_stream| {
+ let transformed = transform_fn(&simple_context, input_stream,
output_stream)?;
+
+ Ok((
+ (
+ transformed.target_relationship_name,
+ transformed.attributes_to_add,
+ ),
+ transformed.write_status,
+ ))
+ })
+ })?;
+
+ for (k, v) in attrs {
+ session.set_attribute(&mut flow_file, &k, &v)?;
+ }
+
+ session.transfer(flow_file, relationship)?;
+
+ Ok(OnTriggerResult::Ok)
+ } else {
+ logger.log(LogLevel::Trace, format_args!("No flowfile to transform"));
+ Ok(OnTriggerResult::Yield)
+ }
+}
+
+// Concurrent Implementation (Multi-Threaded)
+impl<'a, Implementation, L> MultiThreadedTrigger
+ for Processor<Implementation, FlowFileStreamTransformProcessorType,
Concurrent, L>
Review Comment:
This is a streaming type transformator higher level API, it simultaneously
reads and writes the flowfiles (this is supported in all content repo afaik)
which makes the memory footprint managable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]