Github user markap14 commented on the pull request: https://github.com/apache/nifi/pull/420#issuecomment-218531630 @ToivoAdams After looking at this a bit more, I've got a few very high-level thoughts. First is that what you have built here is incredibly awesome and powerful! I think FilterCSVColumns is the wrong name for this Processor - it should be perhaps TransformCSV, as leveraging Calcite allows us to do some pretty powerful queries, such as "SELECT * FROM CSV.A WHERE AMOUNT2 < 99" and I imagine that this type of use case will be very common. I do have a concern, though, which is that if you attempt to perform a JOIN operation, for example "SELECT X.AMOUNT2, X.AMOUNT3 FROM CSV.A as X JOIN CSV.A AS Y ON X.AMOUNT2=Y.AMOUNT2" we end up with an IOException: Stream closed. This is because Calcite will have to read the data multiple times in order to perform the JOIN. I think we can get around this by changing the CsvSchemaFactory2 to be something like CsvInputStreamFactory, and that class, rather than receiving the InputStream directly would be passed the FlowFIle and ProcessSession and could create the InputStream on-demand. This would allow the data to be read multiple times by creating two InputStream's. The nice thing is that if this runs on a system with sufficient RAM the Operating System's disk cache will generally mean that we don't even have to read the data for the second pass unless it's a really massive amount of CSV. Additionally, I think we need to have some sort of validator for the SQL Select Statement property, as right now if the query is invalid, the processor is valid and just routes everything to failure.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---