Hello, thanks everyone for the prompt response.

With your aid I was able to figure it out.

Mostly my problem was to understand the difference between the Grouping
Regular Expression and extracting the date parameter which in my case are
pretty much the same expression.

Also I have to admit that the RouteText.Group attribute was not something
easy to find even in the documentation.

I feel that reading a TCP connection with logs and store it partitioned
directly to a Hive table should be a fairly common use case, so I'm
attaching the template as a grain of sand contribution.

Thanks again


2016-11-02 22:10 GMT-03:00 Joe Witt <[email protected]>:

> I agree with James.  The general pattern here is
>
> Split with Grouping:
>   Take a look at RouteText.  This allows you to efficiently split up
> line oriented data into groups based on matching values rather than
> spilt text which will be a line for line split.
>
> Merge Grouped Data:
>   MergeContent processor will do the trick and you can use correlation
> feature to align only those which are from the same group/pattern.
>
> Write to destination:
>   You can write directly to HDFS using PutHDFS or you can prepare the
> data and write to Hive.
>
> Thanks
> Joe
>
> On Wed, Nov 2, 2016 at 9:01 PM, James Wing <[email protected]> wrote:
> > This is absolutely possible.  A sample sequence of processors might
> include:
> >
> > 1. UpdateAttribute - to extract a record date from the flowfile content
> into
> > an attribute, 'recordgroup' for example
> > 2. MergeContent - to group related records together, setting the
> Correlation
> > Attribute Name property to use 'recordgroup'
> > 3. UpdateAttribute - (optional) to apply the 'recordgroup' attribute to
> the
> > 'path' and/or 'filename' attributes, depending on how you do #4.  May be
> > useful to get customized filenames with extensions.
> > 4. Put* - to write the grouped file to storage (PutFile, PutHDFS,
> > PutS3Object, etc.).  With PutHDFS for example, use Expression Language in
> > the Directory property to apply your grouping - like
> > '/tmp/hive/records/${recordgroup}' to get '/tmp/hive/records/2016-01-01'
> .
> >
> > In concept, it's that simple.  The #2 MergeContent step can be more
> > complicated as you consider how many files should be output from the
> stream,
> > how big they should be, how frequently, and how many bins are likely to
> be
> > open collecting files at any one time.  You might also consider
> compressing
> > the files.
> >
> > Thanks,
> >
> > James
> >
> > On Wed, Nov 2, 2016 at 5:34 PM, Santiago Ciciliani
> > <[email protected]> wrote:
> >>
> >> I'm trying to split a stream of data into multiple different files based
> >> on the content date.
> >>
> >> So imagine that you are receiving streams of logs and you want to save
> as
> >> a Hive partitioned table so for example all records with date 2016-01-01
> >> into directory dt=2016-01-01.
> >>
> >> Is this even possible?
> >>
> >> Thanks
> >>
> >
>
<?xml version="1.0" ?>
<template encoding-version="1.0">
  <description></description>
  <groupId>01581012-335d-1808-2052-62a3fef49c9a</groupId>
  <name>RecordTextToPartition</name>
  <snippet>
    <processors>
      <id>01581003-335d-1808-0000-000000000000</id>
      <parentGroupId>01581012-335d-1808-0000-000000000000</parentGroupId>
      <position>
        <x>0.0</x>
        <y>0.0</y>
      </position>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Input Directory</key>
            <value>
              <name>Input Directory</name>
            </value>
          </entry>
          <entry>
            <key>File Filter</key>
            <value>
              <name>File Filter</name>
            </value>
          </entry>
          <entry>
            <key>Path Filter</key>
            <value>
              <name>Path Filter</name>
            </value>
          </entry>
          <entry>
            <key>Batch Size</key>
            <value>
              <name>Batch Size</name>
            </value>
          </entry>
          <entry>
            <key>Keep Source File</key>
            <value>
              <name>Keep Source File</name>
            </value>
          </entry>
          <entry>
            <key>Recurse Subdirectories</key>
            <value>
              <name>Recurse Subdirectories</name>
            </value>
          </entry>
          <entry>
            <key>Polling Interval</key>
            <value>
              <name>Polling Interval</name>
            </value>
          </entry>
          <entry>
            <key>Ignore Hidden Files</key>
            <value>
              <name>Ignore Hidden Files</name>
            </value>
          </entry>
          <entry>
            <key>Minimum File Age</key>
            <value>
              <name>Minimum File Age</name>
            </value>
          </entry>
          <entry>
            <key>Maximum File Age</key>
            <value>
              <name>Maximum File Age</name>
            </value>
          </entry>
          <entry>
            <key>Minimum File Size</key>
            <value>
              <name>Minimum File Size</name>
            </value>
          </entry>
          <entry>
            <key>Maximum File Size</key>
            <value>
              <name>Maximum File Size</name>
            </value>
          </entry>
        </descriptors>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Input Directory</key>
            <value>/tmp/inbound</value>
          </entry>
          <entry>
            <key>File Filter</key>
            <value>[^\.].*</value>
          </entry>
          <entry>
            <key>Path Filter</key>
          </entry>
          <entry>
            <key>Batch Size</key>
            <value>10</value>
          </entry>
          <entry>
            <key>Keep Source File</key>
            <value>false</value>
          </entry>
          <entry>
            <key>Recurse Subdirectories</key>
            <value>true</value>
          </entry>
          <entry>
            <key>Polling Interval</key>
            <value>0 sec</value>
          </entry>
          <entry>
            <key>Ignore Hidden Files</key>
            <value>true</value>
          </entry>
          <entry>
            <key>Minimum File Age</key>
            <value>0 sec</value>
          </entry>
          <entry>
            <key>Maximum File Age</key>
          </entry>
          <entry>
            <key>Minimum File Size</key>
            <value>0 B</value>
          </entry>
          <entry>
            <key>Maximum File Size</key>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>GetFile</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.GetFile</type>
    </processors>
    <processors>
      <id>01581004-335d-1808-0000-000000000000</id>
      <parentGroupId>01581012-335d-1808-0000-000000000000</parentGroupId>
      <position>
        <x>4.759607738080376</x>
        <y>231.64192111939514</y>
      </position>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Routing Strategy</key>
            <value>
              <name>Routing Strategy</name>
            </value>
          </entry>
          <entry>
            <key>Matching Strategy</key>
            <value>
              <name>Matching Strategy</name>
            </value>
          </entry>
          <entry>
            <key>Character Set</key>
            <value>
              <name>Character Set</name>
            </value>
          </entry>
          <entry>
            <key>Ignore Leading/Trailing Whitespace</key>
            <value>
              <name>Ignore Leading/Trailing Whitespace</name>
            </value>
          </entry>
          <entry>
            <key>Ignore Case</key>
            <value>
              <name>Ignore Case</name>
            </value>
          </entry>
          <entry>
            <key>Grouping Regular Expression</key>
            <value>
              <name>Grouping Regular Expression</name>
            </value>
          </entry>
          <entry>
            <key>recordData</key>
            <value>
              <name>recordData</name>
            </value>
          </entry>
        </descriptors>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Routing Strategy</key>
            <value>Route to 'matched' if line matches all conditions</value>
          </entry>
          <entry>
            <key>Matching Strategy</key>
            <value>Matches Regular Expression</value>
          </entry>
          <entry>
            <key>Character Set</key>
            <value>UTF-8</value>
          </entry>
          <entry>
            <key>Ignore Leading/Trailing Whitespace</key>
            <value>true</value>
          </entry>
          <entry>
            <key>Ignore Case</key>
            <value>false</value>
          </entry>
          <entry>
            <key>Grouping Regular Expression</key>
            <value>([0-9\-]+)T.*</value>
          </entry>
          <entry>
            <key>recordData</key>
            <value>.*</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>RouteText</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>matched</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>original</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>unmatched</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.RouteText</type>
    </processors>
    <processors>
      <id>01581007-335d-1808-0000-000000000000</id>
      <parentGroupId>01581012-335d-1808-0000-000000000000</parentGroupId>
      <position>
        <x>7.29680870128459</x>
        <y>931.8918446950502</y>
      </position>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Directory</key>
            <value>
              <name>Directory</name>
            </value>
          </entry>
          <entry>
            <key>Conflict Resolution Strategy</key>
            <value>
              <name>Conflict Resolution Strategy</name>
            </value>
          </entry>
          <entry>
            <key>Create Missing Directories</key>
            <value>
              <name>Create Missing Directories</name>
            </value>
          </entry>
          <entry>
            <key>Maximum File Count</key>
            <value>
              <name>Maximum File Count</name>
            </value>
          </entry>
          <entry>
            <key>Last Modified Time</key>
            <value>
              <name>Last Modified Time</name>
            </value>
          </entry>
          <entry>
            <key>Permissions</key>
            <value>
              <name>Permissions</name>
            </value>
          </entry>
          <entry>
            <key>Owner</key>
            <value>
              <name>Owner</name>
            </value>
          </entry>
          <entry>
            <key>Group</key>
            <value>
              <name>Group</name>
            </value>
          </entry>
        </descriptors>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Directory</key>
            <value>/tmp/outbound/recordText/${RouteText.Group}</value>
          </entry>
          <entry>
            <key>Conflict Resolution Strategy</key>
            <value>fail</value>
          </entry>
          <entry>
            <key>Create Missing Directories</key>
            <value>true</value>
          </entry>
          <entry>
            <key>Maximum File Count</key>
          </entry>
          <entry>
            <key>Last Modified Time</key>
          </entry>
          <entry>
            <key>Permissions</key>
          </entry>
          <entry>
            <key>Owner</key>
          </entry>
          <entry>
            <key>Group</key>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>PutFile</name>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>failure</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.PutFile</type>
    </processors>
    <processors>
      <id>01581009-335d-1808-0000-000000000000</id>
      <parentGroupId>01581012-335d-1808-0000-000000000000</parentGroupId>
      <position>
        <x>13.49750423570822</x>
        <y>468.9516633428093</y>
      </position>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Merge Strategy</key>
            <value>
              <name>Merge Strategy</name>
            </value>
          </entry>
          <entry>
            <key>Merge Format</key>
            <value>
              <name>Merge Format</name>
            </value>
          </entry>
          <entry>
            <key>Attribute Strategy</key>
            <value>
              <name>Attribute Strategy</name>
            </value>
          </entry>
          <entry>
            <key>Correlation Attribute Name</key>
            <value>
              <name>Correlation Attribute Name</name>
            </value>
          </entry>
          <entry>
            <key>Minimum Number of Entries</key>
            <value>
              <name>Minimum Number of Entries</name>
            </value>
          </entry>
          <entry>
            <key>Maximum Number of Entries</key>
            <value>
              <name>Maximum Number of Entries</name>
            </value>
          </entry>
          <entry>
            <key>Minimum Group Size</key>
            <value>
              <name>Minimum Group Size</name>
            </value>
          </entry>
          <entry>
            <key>Maximum Group Size</key>
            <value>
              <name>Maximum Group Size</name>
            </value>
          </entry>
          <entry>
            <key>Max Bin Age</key>
            <value>
              <name>Max Bin Age</name>
            </value>
          </entry>
          <entry>
            <key>Maximum number of Bins</key>
            <value>
              <name>Maximum number of Bins</name>
            </value>
          </entry>
          <entry>
            <key>Delimiter Strategy</key>
            <value>
              <name>Delimiter Strategy</name>
            </value>
          </entry>
          <entry>
            <key>Header File</key>
            <value>
              <name>Header File</name>
            </value>
          </entry>
          <entry>
            <key>Footer File</key>
            <value>
              <name>Footer File</name>
            </value>
          </entry>
          <entry>
            <key>Demarcator File</key>
            <value>
              <name>Demarcator File</name>
            </value>
          </entry>
          <entry>
            <key>Compression Level</key>
            <value>
              <name>Compression Level</name>
            </value>
          </entry>
          <entry>
            <key>Keep Path</key>
            <value>
              <name>Keep Path</name>
            </value>
          </entry>
        </descriptors>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Merge Strategy</key>
            <value>Bin-Packing Algorithm</value>
          </entry>
          <entry>
            <key>Merge Format</key>
            <value>Binary Concatenation</value>
          </entry>
          <entry>
            <key>Attribute Strategy</key>
            <value>Keep Only Common Attributes</value>
          </entry>
          <entry>
            <key>Correlation Attribute Name</key>
            <value>RouteText.Group</value>
          </entry>
          <entry>
            <key>Minimum Number of Entries</key>
            <value>1</value>
          </entry>
          <entry>
            <key>Maximum Number of Entries</key>
          </entry>
          <entry>
            <key>Minimum Group Size</key>
            <value>0 B</value>
          </entry>
          <entry>
            <key>Maximum Group Size</key>
          </entry>
          <entry>
            <key>Max Bin Age</key>
          </entry>
          <entry>
            <key>Maximum number of Bins</key>
            <value>100</value>
          </entry>
          <entry>
            <key>Delimiter Strategy</key>
            <value>Filename</value>
          </entry>
          <entry>
            <key>Header File</key>
          </entry>
          <entry>
            <key>Footer File</key>
          </entry>
          <entry>
            <key>Demarcator File</key>
          </entry>
          <entry>
            <key>Compression Level</key>
            <value>1</value>
          </entry>
          <entry>
            <key>Keep Path</key>
            <value>false</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>MergeContent</name>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>failure</name>
      </relationships>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>merged</name>
      </relationships>
      <relationships>
        <autoTerminate>true</autoTerminate>
        <name>original</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.standard.MergeContent</type>
    </processors>
    <processors>
      <id>0158100f-335d-1808-0000-000000000000</id>
      <parentGroupId>01581012-335d-1808-0000-000000000000</parentGroupId>
      <position>
        <x>12.036232483661479</x>
        <y>707.0195028030091</y>
      </position>
      <config>
        <bulletinLevel>WARN</bulletinLevel>
        <comments></comments>
        <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount>
        <descriptors>
          <entry>
            <key>Delete Attributes Expression</key>
            <value>
              <name>Delete Attributes Expression</name>
            </value>
          </entry>
          <entry>
            <key>filename</key>
            <value>
              <name>filename</name>
            </value>
          </entry>
        </descriptors>
        <lossTolerant>false</lossTolerant>
        <penaltyDuration>30 sec</penaltyDuration>
        <properties>
          <entry>
            <key>Delete Attributes Expression</key>
          </entry>
          <entry>
            <key>filename</key>
            <value>${RouteText.Group}</value>
          </entry>
        </properties>
        <runDurationMillis>0</runDurationMillis>
        <schedulingPeriod>0 sec</schedulingPeriod>
        <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
        <yieldDuration>1 sec</yieldDuration>
      </config>
      <name>UpdateAttribute</name>
      <relationships>
        <autoTerminate>false</autoTerminate>
        <name>success</name>
      </relationships>
      <style></style>
      <type>org.apache.nifi.processors.attributes.UpdateAttribute</type>
    </processors>
  </snippet>
  <timestamp>11/03/2016 00:25:53 ART</timestamp>
</template>

Reply via email to