I have  one  config   file  inside  my  nifi  environment,   in   which  i 
should change, check and  update  values ,  but  i  should    make  this  
operation in  independent threads   so  that(threads should't   flush)
here  are several  things  i am  interested  in:
1. Can  i  use  Synchronized  code  blocks  inside  executeScript 
processor?
2.How  can i  use   Thread  locks  inside nifi?

here is  my  code   but it  doesn't   work  properly  what  should i  change 
to make  my  logic   work?
class Main2 extends Thread{
    static String content = "";
    static File file = new File("C:/Users/Desktop/test/conf.xml");
    static BufferedReader s;
    static BufferedWriter w;
    static RandomAccessFile ini= new RandomAccessFile(file, "rwd");
    static FileLock lock= ini.getChannel().lock();
    synchronized void read()
    {
        try {
            String sCurrentLine;
            s = new BufferedReader(Channels.newReader(ini.getChannel(),
"UTF-8"));
            while ((sCurrentLine = s.readLine()) != null) {
                content += sCurrentLine;
            }

            ini.seek(0);
            /* def flowFile1 = session.create()
                   flowFile1 = session.putAttribute(flowFile1, "filename",
"conf.xml");
                   session.write(flowFile1, new StreamCallback() {
                       @Override
                       public void process(InputStream inputStream1,
OutputStream outputStream) throws IOException {

                          
outputStream.write(content.getBytes(StandardCharsets.UTF_8))
                       }

                   });
                   session.transfer(flowFile1, REL_SUCCESS);*/

            def xml = new XmlParser().parseText(content);
            xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody
'false'};
            def newxml1 = XmlUtil.serialize(xml);
            String data = newxml1;
            if (!data.isEmpty()) {
                ini.setLength(0);
                w = new BufferedWriter(Channels.newWriter(ini.getChannel(),
"UTF-8"));
                w.write(data);
                lock.release();
                w.close();

            }
            println data;

        }catch (FileNotFoundException e) {
            TimeUnit.SECONDS.sleep(50000);
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();

        } catch(OverlappingFileLockException e){
            TimeUnit.SECONDS.sleep(50000);
            lock.release();
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            w.close();
            ini.close();
        }

    }

    synchronized void write()
    {try {

        String sCurrentLine;
        s = new BufferedReader(Channels.newReader(ini.getChannel(),
"UTF-8"));
        while ((sCurrentLine = s.readLine()) != null) {
            content += sCurrentLine;
        }
        // println content;
        ini.seek(0);

        /* def flowFile = session.get();
         if (flowFile != null) return;
         def serviceName = flowFile.getAttribute('serviceName');
         def date = flowFile.getAttribute('filename').substring(0, 10);
         def xml = new XmlParser().parseText(content)
         if (serviceName == 'borderCrossDecl') {

             xml.RS.borderCrossDecl.details.findAll({ p ->
                 p.runAs[0].text() == "false" && p.start[0].text() ==
date.toString();
             }).each({ p ->
                 p.start[0].value = addDays(p.start[0].text())
                 p.runAs[0].value = "true"
             })
         }*/

        def newXml = groovy.xml.XmlUtil.serialize(content)

        String data = newXml.toString();
        if (!data.isEmpty()) {
            ini.setLength(0);
            w = new BufferedWriter(Channels.newWriter(ini.getChannel(),
"UTF-8"));
            w.write(data);
            lock.release();
            w.close();

        }
    }catch (FileNotFoundException e) {
        TimeUnit.SECONDS.sleep(50000);
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();

    } catch(OverlappingFileLockException e){
        TimeUnit.SECONDS.sleep(50000);
        lock.release();
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        w.close();
        ini.close();
    }


    }

    public static  void main(String [] args){
        Main2 r = new Main2();
        Thread one = new Thread(r);
        one.start();

    }
}




--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/

Reply via email to