I have one config file inside my nifi environment, in which i should change, check and update values , but i should make this operation in independent threads so that(threads should't flush) here are several things i am interested in: 1. Can i use Synchronized code blocks inside executeScript processor? 2.How can i use Thread locks inside nifi?
here is my code but it doesn't work properly what should i change to make my logic work? class Main2 extends Thread{ static String content = ""; static File file = new File("C:/Users/Desktop/test/conf.xml"); static BufferedReader s; static BufferedWriter w; static RandomAccessFile ini= new RandomAccessFile(file, "rwd"); static FileLock lock= ini.getChannel().lock(); synchronized void read() { try { String sCurrentLine; s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8")); while ((sCurrentLine = s.readLine()) != null) { content += sCurrentLine; } ini.seek(0); /* def flowFile1 = session.create() flowFile1 = session.putAttribute(flowFile1, "filename", "conf.xml"); session.write(flowFile1, new StreamCallback() { @Override public void process(InputStream inputStream1, OutputStream outputStream) throws IOException { outputStream.write(content.getBytes(StandardCharsets.UTF_8)) } }); session.transfer(flowFile1, REL_SUCCESS);*/ def xml = new XmlParser().parseText(content); xml.'**'.findAll{it.name() == 'runAs'}.each{ it.replaceBody 'false'}; def newxml1 = XmlUtil.serialize(xml); String data = newxml1; if (!data.isEmpty()) { ini.setLength(0); w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8")); w.write(data); lock.release(); w.close(); } println data; }catch (FileNotFoundException e) { TimeUnit.SECONDS.sleep(50000); e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch(OverlappingFileLockException e){ TimeUnit.SECONDS.sleep(50000); lock.release(); e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { w.close(); ini.close(); } } synchronized void write() {try { String sCurrentLine; s = new BufferedReader(Channels.newReader(ini.getChannel(), "UTF-8")); while ((sCurrentLine = s.readLine()) != null) { content += sCurrentLine; } // println content; ini.seek(0); /* def flowFile = session.get(); if (flowFile != null) return; def serviceName = flowFile.getAttribute('serviceName'); def date = flowFile.getAttribute('filename').substring(0, 10); def xml = new XmlParser().parseText(content) if (serviceName == 'borderCrossDecl') { xml.RS.borderCrossDecl.details.findAll({ p -> p.runAs[0].text() == "false" && p.start[0].text() == date.toString(); }).each({ p -> p.start[0].value = addDays(p.start[0].text()) p.runAs[0].value = "true" }) }*/ def newXml = groovy.xml.XmlUtil.serialize(content) String data = newXml.toString(); if (!data.isEmpty()) { ini.setLength(0); w = new BufferedWriter(Channels.newWriter(ini.getChannel(), "UTF-8")); w.write(data); lock.release(); w.close(); } }catch (FileNotFoundException e) { TimeUnit.SECONDS.sleep(50000); e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch(OverlappingFileLockException e){ TimeUnit.SECONDS.sleep(50000); lock.release(); e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } finally { w.close(); ini.close(); } } public static void main(String [] args){ Main2 r = new Main2(); Thread one = new Thread(r); one.start(); } } -- Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/