This is an automated email from the ASF dual-hosted git repository. erobinet pushed a commit to branch feature/scraper in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 912c1effdfc6244b4b7a63f9d316df6ea2710a4d Author: Etienne Robinet <etienne1995robi...@gmail.com> AuthorDate: Tue May 12 08:54:56 2020 +0200 Camel-Scraper --- plc4j/integrations/apache-camel/pom.xml | 18 ++++ .../org/apache/plc4x/camel/Plc4XComponent.java | 8 ++ .../java/org/apache/plc4x/camel/Plc4XConsumer.java | 99 ++++++++++++++++------ .../java/org/apache/plc4x/camel/Plc4XEndpoint.java | 59 ++++++++++--- .../main/java/org/apache/plc4x/camel/TagData.java | 12 +++ plc4j/tools/scraper/pom.xml | 14 +-- .../triggerhandler/TriggerConfiguration.java | 2 - pom.xml | 2 +- 8 files changed, 163 insertions(+), 51 deletions(-) diff --git a/plc4j/integrations/apache-camel/pom.xml b/plc4j/integrations/apache-camel/pom.xml index cfa551a..85cd04f 100644 --- a/plc4j/integrations/apache-camel/pom.xml +++ b/plc4j/integrations/apache-camel/pom.xml @@ -128,6 +128,24 @@ <version>3.1.0</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.plc4x</groupId> + <artifactId>plc4j-connection-pool</artifactId> + <version>0.7.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.plc4x</groupId> + <artifactId>plc4j-scraper</artifactId> + <version>0.7.0-SNAPSHOT</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.plc4x</groupId> + <artifactId>plc4j-driver-eip</artifactId> + <version>0.7.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> </dependencies> <dependencyManagement> diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java index 82763ff..f7e705f 100644 --- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java +++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XComponent.java @@ -38,6 +38,14 @@ public class Plc4XComponent extends DefaultComponent { if(tags!=null){ ((Plc4XEndpoint)endpoint).setTags(tags); } + String trigger = getAndRemoveOrResolveReferenceParameter(parameters,"trigger",String.class); + if(trigger!=null){ + ((Plc4XEndpoint)endpoint).setTrigger(trigger); + } + int period = getAndRemoveOrResolveReferenceParameter(parameters,"period",Integer.class); + if(period!=0){ + ((Plc4XEndpoint)endpoint).setPeriod(period); + } setProperties(endpoint,parameters); return endpoint; } diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java index 18161db..132aed8 100644 --- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java +++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XConsumer.java @@ -27,13 +27,19 @@ import org.apache.plc4x.java.api.PlcConnection; import org.apache.plc4x.java.api.exceptions.PlcException; import org.apache.plc4x.java.api.messages.PlcReadRequest; import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse; +import org.apache.plc4x.java.scraper.ScrapeJob; +import org.apache.plc4x.java.scraper.config.JobConfigurationImpl; +import org.apache.plc4x.java.scraper.config.ScraperConfiguration; +import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl; +import org.apache.plc4x.java.scraper.exception.ScraperException; +import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl; +import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl; +import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector; +import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.*; public class Plc4XConsumer extends DefaultConsumer { @@ -42,18 +48,25 @@ public class Plc4XConsumer extends DefaultConsumer { private ExceptionHandler exceptionHandler; private PlcConnection plcConnection; private List<TagData> tags; - private Map parameters; + private Map<String,String> fields; + private String trigger; private PlcSubscriptionResponse subscriptionResponse; private Plc4XEndpoint plc4XEndpoint; private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); private ScheduledFuture<?> future; + private final static String TRIGGER = "TRIGGER_VAR"; + private final static String PLC_NAME = "PLC"; + public Plc4XConsumer(Plc4XEndpoint endpoint, Processor processor) throws PlcException { super(endpoint, processor); plc4XEndpoint =endpoint; this.plcConnection = endpoint.getConnection(); this.tags = endpoint.getTags(); + this.fields = TagData.toMap(this.tags); + this.trigger= endpoint.getTrigger(); + plc4XEndpoint=endpoint; } @Override @@ -76,31 +89,30 @@ public class Plc4XConsumer extends DefaultConsumer { @Override protected void doStart() throws InterruptedException, ExecutionException { - PlcReadRequest.Builder builder = plcConnection.readRequestBuilder(); - if (tags.size()==1){ - TagData tag = tags.get(0); - builder.addItem(tag.getTagName(),tag.getQuery()); - - } - else{ - for(TagData tag : tags){ - builder.addItem(tag.getTagName(),tag.getQuery()); - } - } - PlcReadRequest request = builder.build(); - future = executorService.schedule(() -> { - request.execute().thenAccept(response -> { + if(trigger==null) { + PlcReadRequest.Builder builder = plcConnection.readRequestBuilder(); + if (tags.size() == 1) { + TagData tag = tags.get(0); + builder.addItem(tag.getTagName(), tag.getQuery()); + + } else { + for (TagData tag : tags) { + builder.addItem(tag.getTagName(), tag.getQuery()); + } + } + PlcReadRequest request = builder.build(); + future = executorService.schedule(() -> { + request.execute().thenAccept(response -> { try { Exchange exchange = plc4XEndpoint.createExchange(); - if (tags.size()>1){ + if (tags.size() > 1) { List<TagData> values = new ArrayList<>(); - for(TagData tag : tags){ + for (TagData tag : tags) { tag.setValue(response.getObject(tag.getTagName())); values.add(tag); } exchange.getIn().setBody(values); - } - else { + } else { TagData tag = tags.get(0); tag.setValue(response.getAllObjects(tag.getTagName())); exchange.getIn().setBody(tag); @@ -110,7 +122,46 @@ public class Plc4XConsumer extends DefaultConsumer { exceptionHandler.handleException(e); } }); - }, 500, TimeUnit.MILLISECONDS); + }, 500, TimeUnit.MILLISECONDS); + } + else{ + + ScraperConfiguration configuration = getScraperConfig(TagData.toMap(plc4XEndpoint.getTags())); + TriggerCollector collector = new TriggerCollectorImpl(plc4XEndpoint.getPlcDriverManager()); + try { + TriggeredScraperImpl scraper = new TriggeredScraperImpl(configuration, (job, alias, response) -> { + try { + Exchange exchange = plc4XEndpoint.createExchange(); + if (tags.size() > 1) { + List<TagData> values = new ArrayList<>(); + for (TagData tag : tags) { + tag.setValue(response.get(tag.getTagName())); + values.add(tag); + } + exchange.getIn().setBody(values); + } else { + TagData tag = tags.get(0); + tag.setValue(response.get(tag.getTagName())); + exchange.getIn().setBody(tag); + } + getProcessor().process(exchange); + } catch (Exception e) { + exceptionHandler.handleException(e); + }; + },collector); + scraper.start(); + collector.start(); + } catch (ScraperException e) { + e.printStackTrace(); + } + } + } + + private ScraperConfigurationTriggeredImpl getScraperConfig(Map<String,String> tagList){ + String config = "(TRIGGER_VAR,"+plc4XEndpoint.getPeriod()+",("+ plc4XEndpoint.getTrigger() +")==(true))"; + List<JobConfigurationImpl> job = Collections.singletonList(new JobConfigurationImpl("PLC4X-Camel",config,0,Collections.singletonList(PLC_NAME),tagList)); + Map<String,String> source = Collections.singletonMap(PLC_NAME,plc4XEndpoint.getUri()); + return new ScraperConfigurationTriggeredImpl(source,job); } @Override diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java index c7e6a0e..5e493ec 100644 --- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java +++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/Plc4XEndpoint.java @@ -27,7 +27,7 @@ import org.apache.camel.spi.UriPath; import org.apache.plc4x.java.PlcDriverManager; import org.apache.plc4x.java.api.PlcConnection; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; -import org.slf4j.LoggerFactory; +import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager; import java.util.List; import java.util.Map; @@ -42,24 +42,61 @@ public class Plc4XEndpoint extends DefaultEndpoint { @UriParam private List<TagData> tags; + @UriParam + private String trigger; + + @UriParam + private int period; + + public int getPeriod() { + return period; + } + + public void setPeriod(int period) { + this.period = period; + } private final PlcDriverManager plcDriverManager; private PlcConnection connection; private String uri; + public String getUri() { + return uri; + } + + public String getTrigger() { + return trigger; + } + + public void setTrigger(String trigger) { + this.trigger = trigger; + } + public Plc4XEndpoint(String endpointUri, Component component) { super(endpointUri, component); - plcDriverManager= new PlcDriverManager(); - uri = endpointUri; - - //Here we establish the connection in the endpoint, as it is created once during the context - // to avoid disconnecting and reconnecting for every request - try { + if(trigger==null) { + plcDriverManager = new PlcDriverManager(); + uri = endpointUri; + //Here we establish the connection in the endpoint, as it is created once during the context + // to avoid disconnecting and reconnecting for every request + try { + String plc4xURI = uri.replaceFirst("plc4x:/?/?", ""); + uri=plc4xURI; + connection = plcDriverManager.getConnection(plc4xURI); + + } catch (PlcConnectionException e) { + e.printStackTrace(); + } + } + else { + plcDriverManager = new PooledPlcDriverManager(); String plc4xURI = uri.replaceFirst("plc4x:/?/?", ""); - connection = plcDriverManager.getConnection(plc4xURI); - - } catch (PlcConnectionException e) { - e.printStackTrace(); + uri=plc4xURI; + try { + connection = plcDriverManager.getConnection(plc4xURI); + } catch (PlcConnectionException e) { + e.printStackTrace(); + } } } diff --git a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java index f57774f..c6b5d53 100644 --- a/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java +++ b/plc4j/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/TagData.java @@ -18,7 +18,10 @@ under the License. */ package org.apache.plc4x.camel; +import org.slf4j.LoggerFactory; + import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.Predicate; @@ -114,4 +117,13 @@ public class TagData { } + + public static Map<String,String> toMap(List<TagData> tags){ + Map<String,String> map = new HashMap<>(); + LoggerFactory.getLogger(TagData.class).info("Classloader {} ", Thread.currentThread().getContextClassLoader()); + for(TagData tag : tags){ + map.put(tag.getTagName(),tag.getQuery()); + } + return map; + } } diff --git a/plc4j/tools/scraper/pom.xml b/plc4j/tools/scraper/pom.xml index ce16973..61605f8 100644 --- a/plc4j/tools/scraper/pom.xml +++ b/plc4j/tools/scraper/pom.xml @@ -101,18 +101,7 @@ <version>0.7.0-SNAPSHOT</version> <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.plc4x</groupId> - <artifactId>plc4j-driver-s7</artifactId> - <version>0.7.0-SNAPSHOT</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.apache.plc4x</groupId> - <artifactId>plc4j-driver-s7</artifactId> - <version>0.7.0-SNAPSHOT</version> - <scope>compile</scope> - </dependency> + </dependencies> <build> @@ -122,7 +111,6 @@ <artifactId>maven-dependency-plugin</artifactId> <configuration> <usedDependencies combine.children="append"> - <!--usedDependency>org.apache.plc4x:plc4j-driver-s7</usedDependency--> </usedDependencies> </configuration> </plugin> diff --git a/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java b/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java index 3390432..0f3ddc3 100644 --- a/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java +++ b/plc4j/tools/scraper/src/main/java/org/apache/plc4x/java/scraper/triggeredscraper/triggerhandler/TriggerConfiguration.java @@ -20,11 +20,9 @@ package org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler; import org.apache.plc4x.java.PlcDriverManager; -import org.apache.plc4x.java.api.PlcConnection; import org.apache.plc4x.java.api.PlcDriver; import org.apache.plc4x.java.api.exceptions.PlcConnectionException; import org.apache.plc4x.java.api.model.PlcField; -import org.apache.plc4x.java.s7.readwrite.field.S7Field; import org.apache.plc4x.java.scraper.exception.ScraperConfigurationException; import org.apache.plc4x.java.scraper.exception.ScraperException; import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScrapeJobImpl; diff --git a/pom.xml b/pom.xml index 6932577..1af28d2 100644 --- a/pom.xml +++ b/pom.xml @@ -155,7 +155,7 @@ <pcap4j.version>1.8.2</pcap4j.version> <scala.version>2.12.6</scala.version> <slf4j.version>1.7.25</slf4j.version> - <snakeyaml.version>1.23</snakeyaml.version> + <snakeyaml.version>1.24</snakeyaml.version> <spock-reports.version>1.6.1</spock-reports.version> <spock.version>1.2-groovy-2.5</spock.version> <t-digest.version>3.2</t-digest.version>