Re: AutoCloseable for Reader / Writer

2018-09-11 Thread Christofer Dutz
No objections :-)

Outlook for Android herunterladen


From: Julian Feinauer 
Sent: Tuesday, September 11, 2018 8:57:26 PM
To: dev@plc4x.apache.org
Subject: AutoCloseable for Reader / Writer

Hey Everybody,

we just head the short notice about the „Connection Pool“ earlier today and one 
thing I wanted to ask with regards to Chris refactoring is, if there is a 
reason behind the fact that PlcConnection is AutoCloseable but Reader and 
Writer are not.
Should they have a lifecycle which is strongly bound to “their” PlcConnection 
(for S7 its pretty obvious).

The reason behind this question is that without AutoCloseable I would not like 
to use it in a ConnectionPool as the chance is pretty high that clients forget 
to give back the connection.

If you agree, I think now would be a good time to decide as the current 
refactoring will change many thinks so this should be no issue.

Best
Julian


Re: Kafka Connect Integration

2018-09-11 Thread Andrey Skorikov

@Sagar

Although we could get rid of locks on the SourceTask object by using an 
AtomicBoolean (or a volatile field, for that matter), we would still 
need to wait for it to become true. The SourceTask.poll() contract 
states that "this method should block but return control to the caller 
regularly (by returning null in order for the task to transition to the 
PAUSED state if requested to do so". Unfortunately, the AtomicBoolean 
API does not provide such a blocking operation, hence we would be 
required to implement it ourselves.


We could poll the AtomicBoolean value; the naive implementation would 
look something like this:


  while (!fetch.get()) {}

and would burn one CPU core. We could suspend the thread by:

  while (!fetch.get() { Thread.sleep(duration); }

but how do you choose a good value for duration? If it is too long, the 
effective polling rate would be lower than specified, and if it is too 
short, we would again waste cycles.


The basic idiom for this kind of synchronization problem is a simple 
wait/notify scheme, which has in fact been deployed in the code.


Other synchronization primitives have been considered, but rejected for 
the following reasons:


 - The CountDownLatch can not be reset

 - The CyclicBarrier would block the timer thread (which sets the fetch 
flag to true), distorting the effective polling rate again


 - The Phaser could work in this scenario, but its API is relatively 
complex and it would be overkill to employ it for this kind of problem


I hope that helps! :)

Best regards,

Andrey

On 09/11/2018 07:47 PM, Sagar wrote:

@Andrey,

Had a quick question regarding the Source Connector for Kafka connect.

I see that the awaitFetch() method have been created with a synchronized
block within which we are just checking if a particular boolean flag has
been set to true to or not. Considering it's just this and not any other
synchronization that's needed per se, wouldn't it make sense to change the
variable fetch to type AtomicBoolean and remove the synchronized from
method? Considering the only atomic operation to be performed is setting or
getting the value of the flag, can we avoid having synchronized method
blocks ?

Thanks!
Sagar.


On Sun, Sep 9, 2018 at 6:24 PM Christofer Dutz 
wrote:


Hi Sagar,

As I didn't hear anything from you, I encouraged colleagues if mine too
help with the adapter. Parallel I merged everything into the
feature/api-refactoring-chris-c branch. As soon as the last drivers are
refactored, we'll merge that back to master. So if you want to have a look,
I would suggest that branch.

Chridutz

Outlook for Android herunterladen


From: Sagar 
Sent: Sunday, September 9, 2018 1:15:35 PM
To: dev@plc4x.apache.org
Subject: Re: Kafka Connect Integration

Hi Cristofer,

Looking at the other e-mail that you sent for the work that has happened,
looks like a lot of great progress has been made.

I just got caught up with some other things so could never start off post
our discussions here :(

Wanted to understand, once you have some bandwidth, what are the next steps
with the k-connect integration? Can I sync up with someone and start
looking at some of the pieces?

Thanks!
Sagar.

On Thu, Aug 30, 2018 at 12:47 AM Christofer Dutz <
christofer.d...@c-ware.de>
wrote:


Hi Sagar,

thanks for the Infos ... this way I learn more and more :-)

Looking forward to answering the questions as they come.

Chris

Am 29.08.18, 19:28 schrieb "Sagar" :

 Hi Chris,

 Thanks. Typically kafka cluster will be separate set of nodes. And so
would
 be the k-connect workers which will connect to the PLC devices or
databases
 or whatever is the source and push to Kafka.

 I will start off with this information and extend your feature

branch.

 Would keep asking questions along the way

 Sagar.

 On Wed, Aug 29, 2018 at 7:51 PM Christofer Dutz <
christofer.d...@c-ware.de>
 wrote:

 > Hi Sagar,
 >
 > Great that we seem to be on the same page now ;-)
 >
 > Regarding the "Kafka Connecting" ... what I meant is that the
 > Kafca-Connect-PLC4X-Instance connects ... I was assuming the driver
to be
 > running on a Kafka Node, but that's just due to my limited

knowledge

of
 > everything ;-)
 >
 > Well the code for actively reading stuff from a PLC should already
be in
 > my example implementation. It should work out of the box this way
... As I
 > have seen several Mock Drivers implemented in PLC4X, I am currently
 > thinking of implementing one that you should be able to just import
and use
 > ... however I'm currently working hard on refactoring the API
completely,
 > so I would postpone that to after these changes are in there. But
rest
 > assured ... I would handle the refactoring so you could just assume
that it
 > works.
 >
 > Alternatively I could have you an account for our IoT VPN created.
Then
 > you 

AutoCloseable for Reader / Writer

2018-09-11 Thread Julian Feinauer
Hey Everybody,

we just head the short notice about the „Connection Pool“ earlier today and one 
thing I wanted to ask with regards to Chris refactoring is, if there is a 
reason behind the fact that PlcConnection is AutoCloseable but Reader and 
Writer are not.
Should they have a lifecycle which is strongly bound to “their” PlcConnection 
(for S7 its pretty obvious).

The reason behind this question is that without AutoCloseable I would not like 
to use it in a ConnectionPool as the chance is pretty high that clients forget 
to give back the connection.

If you agree, I think now would be a good time to decide as the current 
refactoring will change many thinks so this should be no issue.

Best
Julian


Re: Kafka Connect Integration

2018-09-11 Thread Sagar
@Andrey,

Had a quick question regarding the Source Connector for Kafka connect.

I see that the awaitFetch() method have been created with a synchronized
block within which we are just checking if a particular boolean flag has
been set to true to or not. Considering it's just this and not any other
synchronization that's needed per se, wouldn't it make sense to change the
variable fetch to type AtomicBoolean and remove the synchronized from
method? Considering the only atomic operation to be performed is setting or
getting the value of the flag, can we avoid having synchronized method
blocks ?

Thanks!
Sagar.


On Sun, Sep 9, 2018 at 6:24 PM Christofer Dutz 
wrote:

> Hi Sagar,
>
> As I didn't hear anything from you, I encouraged colleagues if mine too
> help with the adapter. Parallel I merged everything into the
> feature/api-refactoring-chris-c branch. As soon as the last drivers are
> refactored, we'll merge that back to master. So if you want to have a look,
> I would suggest that branch.
>
> Chridutz
>
> Outlook for Android herunterladen
>
> 
> From: Sagar 
> Sent: Sunday, September 9, 2018 1:15:35 PM
> To: dev@plc4x.apache.org
> Subject: Re: Kafka Connect Integration
>
> Hi Cristofer,
>
> Looking at the other e-mail that you sent for the work that has happened,
> looks like a lot of great progress has been made.
>
> I just got caught up with some other things so could never start off post
> our discussions here :(
>
> Wanted to understand, once you have some bandwidth, what are the next steps
> with the k-connect integration? Can I sync up with someone and start
> looking at some of the pieces?
>
> Thanks!
> Sagar.
>
> On Thu, Aug 30, 2018 at 12:47 AM Christofer Dutz <
> christofer.d...@c-ware.de>
> wrote:
>
> > Hi Sagar,
> >
> > thanks for the Infos ... this way I learn more and more :-)
> >
> > Looking forward to answering the questions as they come.
> >
> > Chris
> >
> > Am 29.08.18, 19:28 schrieb "Sagar" :
> >
> > Hi Chris,
> >
> > Thanks. Typically kafka cluster will be separate set of nodes. And so
> > would
> > be the k-connect workers which will connect to the PLC devices or
> > databases
> > or whatever is the source and push to Kafka.
> >
> > I will start off with this information and extend your feature
> branch.
> > Would keep asking questions along the way
> >
> > Sagar.
> >
> > On Wed, Aug 29, 2018 at 7:51 PM Christofer Dutz <
> > christofer.d...@c-ware.de>
> > wrote:
> >
> > > Hi Sagar,
> > >
> > > Great that we seem to be on the same page now ;-)
> > >
> > > Regarding the "Kafka Connecting" ... what I meant is that the
> > > Kafca-Connect-PLC4X-Instance connects ... I was assuming the driver
> > to be
> > > running on a Kafka Node, but that's just due to my limited
> knowledge
> > of
> > > everything ;-)
> > >
> > > Well the code for actively reading stuff from a PLC should already
> > be in
> > > my example implementation. It should work out of the box this way
> > ... As I
> > > have seen several Mock Drivers implemented in PLC4X, I am currently
> > > thinking of implementing one that you should be able to just import
> > and use
> > > ... however I'm currently working hard on refactoring the API
> > completely,
> > > so I would postpone that to after these changes are in there. But
> > rest
> > > assured ... I would handle the refactoring so you could just assume
> > that it
> > > works.
> > >
> > > Alternatively I could have you an account for our IoT VPN created.
> > Then
> > > you could log-in to our VPN and talk to some real PLCs ...
> > >
> > > I think I wanted to create an account for Julian, but my guy
> > responsible
> > > for creating them was on holidays ... will re-check this.
> > >
> > > Chris
> > >
> > >
> > >
> > > Am 29.08.18, 16:11 schrieb "Sagar" :
> > >
> > > Hi Chris,
> > >
> > > That's perfectly fine :)
> > >
> > > So, the way I understand this now is, we will have a bunch of
> > worker
> > > nodes(in kafka connect terminology, a worker is a JVM process
> > which
> > > runs a
> > > set of connectors/tasks to poll a source and push data to
> Kafka).
> > >
> > > So, vis-a-vis a JDBC connection, we will have a connection URL
> > which
> > > will
> > > let us connect to these PLC devices poll(poll in the sense you
> > meant it
> > > above), and then push data to Kafka. If this looks fine, then
> > can you
> > > give
> > > me some documentation to refer to and also how can I start
> > testing
> > > these?
> > >
> > > And just one thing I wanted to clarify when you say Kafka nodes
> > > connecting
> > > to devices. That's something which doesn't happen. Kafka
> doesn't
> > > connect to
> > > any device. I think you just mean

AW: Feature prospect of project plc4x

2018-09-11 Thread Uschold Andreas
Hi Chris and Julian,

I'm glad to hear that pooling and caching are not out of scope. This topic is 
pretty complex and one of our most disruptive obstacles.

Regarding fragmentation of too big ReadRequests I didn't dig too deep through 
the source. Instead I did some simple test, just reading the first byte of 30 
different data blocks. I filed two issues, PLC4X-47 and PLC4X-48 (a NPE thrown 
when parsing the read response when multiple items are included in one 
ReadRequest). May be the second issue is the cause of the first one.

With hierarchic namespaces I simply mean structured, nestable container with a 
unique and qualifying name for data points. Since S7 does not provide a lot of 
information via the S7 protocol about this aspect, this feature is not very 
powerful. OPC on the other hand makes heavy use of this concept.
Our product is used by people with a minor technical background. If we can 
offer a UI which provides all the data points available in a human 
understandable way, we provide a valuable assistance to our users and enable 
them to work without the need to inspect the PLC software to find out where 
which information is located.
A very basic approach might even be to just plain SEQ files with symbolic 
information.
I agree that a namespace feature has nothing to do with the native 
communication and should be realized in a superordinate layer.

Best regards
Andreas

-Ursprüngliche Nachricht-
Hey,

this is also what I thought about, but I would suggest to use a syntax like 
Apache Calcite uses it fort he jdbc connecton [1]:

s7://10.10.64.20/1/1;symbolicAddressFile=/some/path/to/a/file;..

Julian

[1] https://calcite.apache.org/avatica/docs/client_reference.html

Am 11.09.18, 16:36 schrieb "Christofer Dutz" :

Hi Julian and Andreas,

one way to provide such lists to a connection manually would be to have 
that information in a file and to pass that in as option to the driverManager

s7://10.10.64.20/1/1?symbolicAddressFile=/some/path/to/a/file

The Drivers and Dirver Manager can already process such options ... these 
could be used to override some defaults (Want a smaller PDU-Size, 
Longer-Timeouts, Specified-Freshness-Factor for all requests, or the just 
mentioned address-files)

Chris

Am 11.09.18, 16:28 schrieb "Julian Feinauer" :

Hi Andreas,

a warm welcome also from my side!

Regarding your second aspect, I agree with chris and the interface 
where one could integrate arbitrary complex logic.

Regarding your first aspect, I am not sure what you mean with the 
"hierarchical" interface. I mean, it is surely possible to define a generic 
datamodel where all PLCs can be integrated somehow but the question is how 
useful it will be at the end. 
Considering S7 I agree with you that there is something missing (except 
you like the raw byte arrays from DBs then SZL is sufficient).
I spent some time looking at TIAs ap14 files to be able to extract all 
type information from there (not there, yet) and I would totally agree with you 
to define a format where one can extract its DB layouts from TIA which can then 
be read in.
This would mainly solve the addressing Issues with S7 and give some 
"Symbolic" Addressing features.
But I agree with Chris that this should be one layer on top of the 
"PlcConnection" Layer (but should be part of the project, I think).

Regarding the suggestion with the event collapsing, this is something 
very interesting which we also thought about. Currently we are implementing and 
testing a ConnectionPool for PlcConnetions which is very useful when many 
threads want to communicate with very few PLCs. As this pool gets all requests 
from all threads this could be used for more advanced features like Request 
Pooling and Caching (just return cached results if the value is not older than 
X).
We have use cases where we want to use such a feature but we have also 
use cases where we have scrape rates of some ms so we want our values as fresh 
as possible.

I'm looking forward to hear your thoughts on this and I like that more 
and more "use cases" join the discussion.

Best
Julian

Am 11.09.18, 16:18 schrieb "Christofer Dutz" 
:

Hi Andreas,

first of all welcome here :-)

It makes me very happy to hear that you are thinking about 
eventually joining forces. We can definitely use every helping hand and gladly 
welcome new contributors.

Regarding the two aspects:

I initially defined the PlcLister interface as I thought it was 
important. However I had no usable information on how to implement this for the 
S7 devices and as no other driver implementation had such functionality 
implemented, I even recently removed it in the 

[GitHub] asfgit closed pull request #19: Add URL Field to Key Schema in Kafka Source Connector

2018-09-11 Thread GitBox
asfgit closed pull request #19: Add URL Field to Key Schema in Kafka Source 
Connector
URL: https://github.com/apache/incubator-plc4x/pull/19
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
index c354a1ea2..7d0ed8621 100644
--- 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
+++ 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java
@@ -20,6 +20,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -32,10 +34,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import org.apache.plc4x.java.api.types.PlcResponseCode;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.*;
 
 /**
@@ -48,6 +47,15 @@ Licensed to the Apache Software Foundation (ASF) under one
 private final static long WAIT_LIMIT_MILLIS = 100;
 private final static long TIMEOUT_LIMIT_MILLIS = 5000;
 
+private final static String URL_FIELD = "url";
+private final static String QUERY_FIELD = "query";
+
+private final static Schema KEY_SCHEMA =
+new SchemaBuilder(Schema.Type.STRUCT)
+.field(URL_FIELD, Schema.STRING_SCHEMA)
+.field(QUERY_FIELD, Schema.STRING_SCHEMA)
+.build();
+
 private String topic;
 private String url;
 private List queries;
@@ -56,6 +64,8 @@ Licensed to the Apache Software Foundation (ASF) under one
 private PlcReader plcReader;
 private PlcReadRequest plcRequest;
 
+
+
 // TODO: should we use shared (static) thread pool for this?
 private ScheduledExecutorService scheduler;
 private ScheduledFuture timer;
@@ -166,11 +176,16 @@ private synchronized boolean awaitFetch(long 
milliseconds) throws InterruptedExc
 continue;
 }
 
-Object rawValue = response.getObject(query);
-Schema valueSchema = getSchema(rawValue.getClass());
-Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? 
rawValue.toString() : rawValue;
+Struct key = new Struct(KEY_SCHEMA)
+.put(URL_FIELD, url)
+.put(QUERY_FIELD, query);
+
+Object value = response.getObject(query);
+Schema valueSchema = getSchema(value);
 Long timestamp = System.currentTimeMillis();
-Map sourcePartition = 
Collections.singletonMap("url", url);
+Map sourcePartition = new HashMap<>();
+sourcePartition.put("url", url);
+sourcePartition.put("query", query);
 Map sourceOffset = 
Collections.singletonMap("offset", timestamp);
 
 SourceRecord record =
@@ -178,8 +193,8 @@ private synchronized boolean awaitFetch(long milliseconds) 
throws InterruptedExc
 sourcePartition,
 sourceOffset,
 topic,
-Schema.STRING_SCHEMA,
-query,
+KEY_SCHEMA,
+key,
 valueSchema,
 value
 );
@@ -190,20 +205,38 @@ private synchronized boolean awaitFetch(long 
milliseconds) throws InterruptedExc
 return result;
 }
 
-private Schema getSchema(Class type) {
-if (type.equals(Byte.class))
+private Schema getSchema(Object value) {
+Objects.requireNonNull(value);
+
+if (value instanceof Byte)
 return Schema.INT8_SCHEMA;
 
-if (type.equals(Short.class))
+if (value instanceof Short)
 return Schema.INT16_SCHEMA;
 
-if (type.equals(Integer.class))
+if (value instanceof Integer)
 return Schema.INT32_SCHEMA;
 
-if (type.equals(Long.class))
+if (value instanceof Long)
 return Schema.INT64_SCHEMA;
 
-return Schema.STRING_SCHEMA; // default case; invoke .toString on value
+if (value instanceof Float)
+return Schema.FLOAT32_SCHEMA;
+
+if (value instanceof Double)
+return Schema.FLOAT64_SCHEMA;
+
+if (value instanceof Boolean)
+  

Re: Feature prospect of project plc4x

2018-09-11 Thread Julian Feinauer
Hey,

this is also what I thought about, but I would suggest to use a syntax like 
Apache Calcite uses it fort he jdbc connecton [1]:

s7://10.10.64.20/1/1;symbolicAddressFile=/some/path/to/a/file;..

Julian

[1] https://calcite.apache.org/avatica/docs/client_reference.html

Am 11.09.18, 16:36 schrieb "Christofer Dutz" :

Hi Julian and Andreas,

one way to provide such lists to a connection manually would be to have 
that information in a file and to pass that in as option to the driverManager

s7://10.10.64.20/1/1?symbolicAddressFile=/some/path/to/a/file

The Drivers and Dirver Manager can already process such options ... these 
could be used to override some defaults (Want a smaller PDU-Size, 
Longer-Timeouts, Specified-Freshness-Factor for all requests, or the just 
mentioned address-files)

Chris

Am 11.09.18, 16:28 schrieb "Julian Feinauer" :

Hi Andreas,

a warm welcome also from my side!

Regarding your second aspect, I agree with chris and the interface 
where one could integrate arbitrary complex logic.

Regarding your first aspect, I am not sure what you mean with the 
"hierarchical" interface. I mean, it is surely possible to define a generic 
datamodel where all PLCs can be integrated somehow but the question is how 
useful it will be at the end. 
Considering S7 I agree with you that there is something missing (except 
you like the raw byte arrays from DBs then SZL is sufficient).
I spent some time looking at TIAs ap14 files to be able to extract all 
type information from there (not there, yet) and I would totally agree with you 
to define a format where one can extract its DB layouts from TIA which can then 
be read in.
This would mainly solve the addressing Issues with S7 and give some 
"Symbolic" Addressing features.
But I agree with Chris that this should be one layer on top of the 
"PlcConnection" Layer (but should be part of the project, I think).

Regarding the suggestion with the event collapsing, this is something 
very interesting which we also thought about. Currently we are implementing and 
testing a ConnectionPool for PlcConnetions which is very useful when many 
threads want to communicate with very few PLCs. As this pool gets all requests 
from all threads this could be used for more advanced features like Request 
Pooling and Caching (just return cached results if the value is not older than 
X).
We have use cases where we want to use such a feature but we have also 
use cases where we have scrape rates of some ms so we want our values as fresh 
as possible.

I'm looking forward to hear your thoughts on this and I like that more 
and more "use cases" join the discussion.

Best
Julian

Am 11.09.18, 16:18 schrieb "Christofer Dutz" 
:

Hi Andreas,

first of all welcome here :-)

It makes me very happy to hear that you are thinking about 
eventually joining forces. We can definitely use every helping hand and gladly 
welcome new contributors.

Regarding the two aspects:

I initially defined the PlcLister interface as I thought it was 
important. However I had no usable information on how to implement this for the 
S7 devices and as no other driver implementation had such functionality 
implemented, I even recently removed it in the API refactoring. If you could 
help us with implementing this functionality, I would be more than happy to 
have that included as it does simplify things quite a bit. Don't know if 
PlcLister is a good name, but we can name it whatever seems fit.

Regarding the second aspect. I am a little surprised as I have 
implemented exactly the functionality of the PDU splitting. If you have a look 
at the S7Driver the code itself doesn't do any splitting up. I outsourced this 
to the S7MessageProcessor. This was mainly due to the fact that I knew that 
this optimization can become quite complex, and I wanted to be able to exchange 
optimization strategies. Here the DefaultS7MessageProcessor keeps tracks of the 
sizes and splits them up. When having a look at the 
DefaultS7MessageProcessorTest you should see it doing its magic.

In parallel I am trying to convince a good friend of mine - a 
mathematician specialized on optimization problems - to contribute in this 
sector. Especially rewriting queries as you suggested are on my to-do list.

Right now only the S7 Driver contains such optimizations but I 
would really like to come up with a strategy to generally allow a lot of these 
optimizations no matter what protocol is being used. 

I would consider your "reasonably fresh" optimization as something 
different however. I agree it makes s

Re: Feature prospect of project plc4x

2018-09-11 Thread Christofer Dutz
Hi Julian and Andreas,

one way to provide such lists to a connection manually would be to have that 
information in a file and to pass that in as option to the driverManager

s7://10.10.64.20/1/1?symbolicAddressFile=/some/path/to/a/file

The Drivers and Dirver Manager can already process such options ... these could 
be used to override some defaults (Want a smaller PDU-Size, Longer-Timeouts, 
Specified-Freshness-Factor for all requests, or the just mentioned 
address-files)

Chris

Am 11.09.18, 16:28 schrieb "Julian Feinauer" :

Hi Andreas,

a warm welcome also from my side!

Regarding your second aspect, I agree with chris and the interface where 
one could integrate arbitrary complex logic.

Regarding your first aspect, I am not sure what you mean with the 
"hierarchical" interface. I mean, it is surely possible to define a generic 
datamodel where all PLCs can be integrated somehow but the question is how 
useful it will be at the end. 
Considering S7 I agree with you that there is something missing (except you 
like the raw byte arrays from DBs then SZL is sufficient).
I spent some time looking at TIAs ap14 files to be able to extract all type 
information from there (not there, yet) and I would totally agree with you to 
define a format where one can extract its DB layouts from TIA which can then be 
read in.
This would mainly solve the addressing Issues with S7 and give some 
"Symbolic" Addressing features.
But I agree with Chris that this should be one layer on top of the 
"PlcConnection" Layer (but should be part of the project, I think).

Regarding the suggestion with the event collapsing, this is something very 
interesting which we also thought about. Currently we are implementing and 
testing a ConnectionPool for PlcConnetions which is very useful when many 
threads want to communicate with very few PLCs. As this pool gets all requests 
from all threads this could be used for more advanced features like Request 
Pooling and Caching (just return cached results if the value is not older than 
X).
We have use cases where we want to use such a feature but we have also use 
cases where we have scrape rates of some ms so we want our values as fresh as 
possible.

I'm looking forward to hear your thoughts on this and I like that more and 
more "use cases" join the discussion.

Best
Julian

Am 11.09.18, 16:18 schrieb "Christofer Dutz" :

Hi Andreas,

first of all welcome here :-)

It makes me very happy to hear that you are thinking about eventually 
joining forces. We can definitely use every helping hand and gladly welcome new 
contributors.

Regarding the two aspects:

I initially defined the PlcLister interface as I thought it was 
important. However I had no usable information on how to implement this for the 
S7 devices and as no other driver implementation had such functionality 
implemented, I even recently removed it in the API refactoring. If you could 
help us with implementing this functionality, I would be more than happy to 
have that included as it does simplify things quite a bit. Don't know if 
PlcLister is a good name, but we can name it whatever seems fit.

Regarding the second aspect. I am a little surprised as I have 
implemented exactly the functionality of the PDU splitting. If you have a look 
at the S7Driver the code itself doesn't do any splitting up. I outsourced this 
to the S7MessageProcessor. This was mainly due to the fact that I knew that 
this optimization can become quite complex, and I wanted to be able to exchange 
optimization strategies. Here the DefaultS7MessageProcessor keeps tracks of the 
sizes and splits them up. When having a look at the 
DefaultS7MessageProcessorTest you should see it doing its magic.

In parallel I am trying to convince a good friend of mine - a 
mathematician specialized on optimization problems - to contribute in this 
sector. Especially rewriting queries as you suggested are on my to-do list.

Right now only the S7 Driver contains such optimizations but I would 
really like to come up with a strategy to generally allow a lot of these 
optimizations no matter what protocol is being used. 

I would consider your "reasonably fresh" optimization as something 
different however. I agree it makes sense to implement something like this. 
However I would probably add that as a driver independent layer within PLC4X.

You can see ... we have achieved quite a bit in our first year, we're 
still on it, but there's still a lot to do. So if you want to join in the team, 
you're more than welcome.

Chris 



Am 11.09.18, 15:26 schrieb "Uschold Andreas" 
:

Hi all,

In early 2017 we started the development of a Java based 
v

Re: Feature prospect of project plc4x

2018-09-11 Thread Julian Feinauer
Hi Andreas,

a warm welcome also from my side!

Regarding your second aspect, I agree with chris and the interface where one 
could integrate arbitrary complex logic.

Regarding your first aspect, I am not sure what you mean with the 
"hierarchical" interface. I mean, it is surely possible to define a generic 
datamodel where all PLCs can be integrated somehow but the question is how 
useful it will be at the end. 
Considering S7 I agree with you that there is something missing (except you 
like the raw byte arrays from DBs then SZL is sufficient).
I spent some time looking at TIAs ap14 files to be able to extract all type 
information from there (not there, yet) and I would totally agree with you to 
define a format where one can extract its DB layouts from TIA which can then be 
read in.
This would mainly solve the addressing Issues with S7 and give some "Symbolic" 
Addressing features.
But I agree with Chris that this should be one layer on top of the 
"PlcConnection" Layer (but should be part of the project, I think).

Regarding the suggestion with the event collapsing, this is something very 
interesting which we also thought about. Currently we are implementing and 
testing a ConnectionPool for PlcConnetions which is very useful when many 
threads want to communicate with very few PLCs. As this pool gets all requests 
from all threads this could be used for more advanced features like Request 
Pooling and Caching (just return cached results if the value is not older than 
X).
We have use cases where we want to use such a feature but we have also use 
cases where we have scrape rates of some ms so we want our values as fresh as 
possible.

I'm looking forward to hear your thoughts on this and I like that more and more 
"use cases" join the discussion.

Best
Julian

Am 11.09.18, 16:18 schrieb "Christofer Dutz" :

Hi Andreas,

first of all welcome here :-)

It makes me very happy to hear that you are thinking about eventually 
joining forces. We can definitely use every helping hand and gladly welcome new 
contributors.

Regarding the two aspects:

I initially defined the PlcLister interface as I thought it was important. 
However I had no usable information on how to implement this for the S7 devices 
and as no other driver implementation had such functionality implemented, I 
even recently removed it in the API refactoring. If you could help us with 
implementing this functionality, I would be more than happy to have that 
included as it does simplify things quite a bit. Don't know if PlcLister is a 
good name, but we can name it whatever seems fit.

Regarding the second aspect. I am a little surprised as I have implemented 
exactly the functionality of the PDU splitting. If you have a look at the 
S7Driver the code itself doesn't do any splitting up. I outsourced this to the 
S7MessageProcessor. This was mainly due to the fact that I knew that this 
optimization can become quite complex, and I wanted to be able to exchange 
optimization strategies. Here the DefaultS7MessageProcessor keeps tracks of the 
sizes and splits them up. When having a look at the 
DefaultS7MessageProcessorTest you should see it doing its magic.

In parallel I am trying to convince a good friend of mine - a mathematician 
specialized on optimization problems - to contribute in this sector. Especially 
rewriting queries as you suggested are on my to-do list.

Right now only the S7 Driver contains such optimizations but I would really 
like to come up with a strategy to generally allow a lot of these optimizations 
no matter what protocol is being used. 

I would consider your "reasonably fresh" optimization as something 
different however. I agree it makes sense to implement something like this. 
However I would probably add that as a driver independent layer within PLC4X.

You can see ... we have achieved quite a bit in our first year, we're still 
on it, but there's still a lot to do. So if you want to join in the team, 
you're more than welcome.

Chris 



Am 11.09.18, 15:26 schrieb "Uschold Andreas" 
:

Hi all,

In early 2017 we started the development of a Java based visualization 
/ SCADA in the context of ware houses and logistic centers. One design goal was 
(and still is) to be able to use the system in a transparent way with any kind 
of data source, not only PLCs, but also network equipment, embedded systems, 
building technology, IT hardware and high level IT processes (e.g. ERP). So we 
developed a modular platform very similar to what is available in PLC4X now. We 
already implemented PlugIns for the S7 and Beckhoff AMS communication from 
scratch, Allen Bradleys CIP and SNMP are next in line.
The goals and the design of PLC4X seem to be pretty similar to ours. We 
want to evaluate whether we can expect mid-term or long-term benefits by 
switching over to plc4x and participate in

Re: Feature prospect of project plc4x

2018-09-11 Thread Christofer Dutz
Hi Andreas,

first of all welcome here :-)

It makes me very happy to hear that you are thinking about eventually joining 
forces. We can definitely use every helping hand and gladly welcome new 
contributors.

Regarding the two aspects:

I initially defined the PlcLister interface as I thought it was important. 
However I had no usable information on how to implement this for the S7 devices 
and as no other driver implementation had such functionality implemented, I 
even recently removed it in the API refactoring. If you could help us with 
implementing this functionality, I would be more than happy to have that 
included as it does simplify things quite a bit. Don't know if PlcLister is a 
good name, but we can name it whatever seems fit.

Regarding the second aspect. I am a little surprised as I have implemented 
exactly the functionality of the PDU splitting. If you have a look at the 
S7Driver the code itself doesn't do any splitting up. I outsourced this to the 
S7MessageProcessor. This was mainly due to the fact that I knew that this 
optimization can become quite complex, and I wanted to be able to exchange 
optimization strategies. Here the DefaultS7MessageProcessor keeps tracks of the 
sizes and splits them up. When having a look at the 
DefaultS7MessageProcessorTest you should see it doing its magic.

In parallel I am trying to convince a good friend of mine - a mathematician 
specialized on optimization problems - to contribute in this sector. Especially 
rewriting queries as you suggested are on my to-do list.

Right now only the S7 Driver contains such optimizations but I would really 
like to come up with a strategy to generally allow a lot of these optimizations 
no matter what protocol is being used. 

I would consider your "reasonably fresh" optimization as something different 
however. I agree it makes sense to implement something like this. However I 
would probably add that as a driver independent layer within PLC4X.

You can see ... we have achieved quite a bit in our first year, we're still on 
it, but there's still a lot to do. So if you want to join in the team, you're 
more than welcome.

Chris 



Am 11.09.18, 15:26 schrieb "Uschold Andreas" :

Hi all,

In early 2017 we started the development of a Java based visualization / 
SCADA in the context of ware houses and logistic centers. One design goal was 
(and still is) to be able to use the system in a transparent way with any kind 
of data source, not only PLCs, but also network equipment, embedded systems, 
building technology, IT hardware and high level IT processes (e.g. ERP). So we 
developed a modular platform very similar to what is available in PLC4X now. We 
already implemented PlugIns for the S7 and Beckhoff AMS communication from 
scratch, Allen Bradleys CIP and SNMP are next in line.
The goals and the design of PLC4X seem to be pretty similar to ours. We 
want to evaluate whether we can expect mid-term or long-term benefits by 
switching over to plc4x and participate in its develoment process.

Until now i identified two functional aspects plc4x doesn't seem to address 
which are very important to us.

The first aspect is a normalized, hierarchic, human readable namespace, 
just like a table of contents for any device.
Each variable on a PLC can be represented as a "data point" and provides 
information on how to gather the real data from a data source (e.g. just a 
mapping to an address (s7://...) or even more complex transformations). In case 
of AMS or OPC it is straight forward to read the namespace information from a 
device. With S7 you still can make use of the information provided by the SZL 
and offer "raw namespaces", representing I/Q/F/T/C/DB as big byte arrays / word 
arrays. Import the information from a Step7 project is also no witch craft.
Mostly frontends benefit from this concept.
I know about the PlcLister-Interface, but it is just empty and even the 
AdsTcpPlcConnection doesn't provide an implementation. Are namespaces the way 
described above on the road map or at least in scope?

The second aspect is throughput optimization and request reorganization.
Protocols like S7 and CIP have very strong restrictions on the the amount 
of data which can be fetched with one telegram. If one request addresses to 
much information, it should be up to the connection to split the request 
transparently and join the partial responses. The current state of PLC4Js 
s7-driver just cuts of after 19 items in one telegram. All surplus items are 
silently ignored. Thus a user must know about the limitations of the underlying 
protocol and must design requests accordingly.
On the other hand most requests are probably very small (ints, words, even 
only a bit). Reading only those individually each with one call renders 
bandwith to nearly zero.
Often fetched data is adjacent to another relevant information. Requests 
could be merged so effectively more data c

[GitHub] skorikov opened a new pull request #19: Add URL Field to Key Schema in Kafka Source Connector

2018-09-11 Thread GitBox
skorikov opened a new pull request #19: Add URL Field to Key Schema in Kafka 
Source Connector
URL: https://github.com/apache/incubator-plc4x/pull/19
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


Feature prospect of project plc4x

2018-09-11 Thread Uschold Andreas
Hi all,

In early 2017 we started the development of a Java based visualization / SCADA 
in the context of ware houses and logistic centers. One design goal was (and 
still is) to be able to use the system in a transparent way with any kind of 
data source, not only PLCs, but also network equipment, embedded systems, 
building technology, IT hardware and high level IT processes (e.g. ERP). So we 
developed a modular platform very similar to what is available in PLC4X now. We 
already implemented PlugIns for the S7 and Beckhoff AMS communication from 
scratch, Allen Bradleys CIP and SNMP are next in line.
The goals and the design of PLC4X seem to be pretty similar to ours. We want to 
evaluate whether we can expect mid-term or long-term benefits by switching over 
to plc4x and participate in its develoment process.

Until now i identified two functional aspects plc4x doesn't seem to address 
which are very important to us.

The first aspect is a normalized, hierarchic, human readable namespace, just 
like a table of contents for any device.
Each variable on a PLC can be represented as a "data point" and provides 
information on how to gather the real data from a data source (e.g. just a 
mapping to an address (s7://...) or even more complex transformations). In case 
of AMS or OPC it is straight forward to read the namespace information from a 
device. With S7 you still can make use of the information provided by the SZL 
and offer "raw namespaces", representing I/Q/F/T/C/DB as big byte arrays / word 
arrays. Import the information from a Step7 project is also no witch craft.
Mostly frontends benefit from this concept.
I know about the PlcLister-Interface, but it is just empty and even the 
AdsTcpPlcConnection doesn't provide an implementation. Are namespaces the way 
described above on the road map or at least in scope?

The second aspect is throughput optimization and request reorganization.
Protocols like S7 and CIP have very strong restrictions on the the amount of 
data which can be fetched with one telegram. If one request addresses to much 
information, it should be up to the connection to split the request 
transparently and join the partial responses. The current state of PLC4Js 
s7-driver just cuts of after 19 items in one telegram. All surplus items are 
silently ignored. Thus a user must know about the limitations of the underlying 
protocol and must design requests accordingly.
On the other hand most requests are probably very small (ints, words, even only 
a bit). Reading only those individually each with one call renders bandwith to 
nearly zero.
Often fetched data is adjacent to another relevant information. Requests could 
be merged so effectively more data can be fetched within one telegram.
In most of our scenarios it is not important to have the "fresh" read value, 
but to have a _recent_ value. A value read up to three seconds ago is 
sufficient for most our use cases, especially since most values can be 
considered as volatile on the PLC side. We have a bunch of independent systems 
which need to gather a lot of information from one PLC. Some information is 
gathered only by one system, some information by all systems. Sometimes 
relevant information is accumulated in one memory area, sometimes it is 
fragmented over the entire PLC.
With a central component coordinating and optimizing all current requests, it 
is possible to take the burden of taking care about through put from the user 
of the library, optimize the throughput for everyone and eliminate duplicate 
reads from different systems by sharing data.
Right now i don't see any efforts regarding this in PLC4J.

I would like to hear your optinions on these two aspects and whether it is 
relevant for the PLC4X project.


Best regards
Andreas



[GitHub] asfgit closed pull request #18: Add support for multiple tasks in kafka sink connector

2018-09-11 Thread GitBox
asfgit closed pull request #18: Add support for multiple tasks in kafka sink 
connector
URL: https://github.com/apache/incubator-plc4x/pull/18
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java
 
b/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java
index 41090f8f5..35e78f4eb 100644
--- 
a/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java
+++ 
b/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java
@@ -148,7 +148,7 @@ private void runFactory() {
 
 // Define the event stream.
 // 1) PLC4X source generating a stream of bytes.
-Supplier plcSupplier = PlcFunctions.byteSupplier(plcAdapter, 
"OUTPUTS/0");
+Supplier plcSupplier = PlcFunctions.byteSupplier(plcAdapter, 
"%Q0:BYTE");
 // 2) Use polling to get an item from the byte-stream in regular 
intervals.
 TStream plcOutputStates = top.poll(plcSupplier, 100, 
TimeUnit.MILLISECONDS);
 
diff --git 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 45ae926eb..fa2e32d62 100644
--- 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -18,26 +18,20 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class Plc4xSinkConnector extends SinkConnector {
 static final String URL_CONFIG = "url";
 private static final String URL_DOC = "Connection string used by PLC4X to 
connect to the PLC";
 
-static final String QUERY_CONFIG = "query";
-private static final String QUERY_DOC = "Field query to be sent to the 
PLC";
-
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
-.define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
URL_DOC)
-.define(QUERY_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, QUERY_DOC);
+static final ConfigDef CONFIG_DEF = new ConfigDef()
+.define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
URL_DOC);
 
 private String url;
 private String query;
@@ -49,18 +43,19 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 @Override
 public List> taskConfigs(int maxTasks) {
-Map taskConfig = new HashMap<>();
-taskConfig.put(URL_CONFIG, url);
-taskConfig.put(QUERY_CONFIG, query);
-
-// Only one task will be created; ignoring maxTasks for now
-return Collections.singletonList(taskConfig);
+List> configs = new LinkedList<>();
+for (int i = 0; i < maxTasks; i++) {
+Map taskConfig = new HashMap<>();
+taskConfig.put(URL_CONFIG, url);
+configs.add(taskConfig);
+}
+return configs;
 }
 
 @Override
 public void start(Map props) {
-url = props.get(URL_CONFIG);
-query = props.get(QUERY_CONFIG);
+AbstractConfig config = new 
AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+url = config.getString(URL_CONFIG);
 }
 
 @Override
diff --git 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index b29418f18..648a32e4a 100644
--- 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -33,10 +34,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.util.concurrent.ExecutionException;
 
 public class Plc4xSinkTask extends SinkTask {
-private fi

[GitHub] skorikov opened a new pull request #18: Add support for multiple tasks in kafka sink connector

2018-09-11 Thread GitBox
skorikov opened a new pull request #18: Add support for multiple tasks in kafka 
sink connector
URL: https://github.com/apache/incubator-plc4x/pull/18
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #17: Add support for multiple queries in kafka source connector

2018-09-11 Thread GitBox
asfgit closed pull request #17: Add support for multiple queries in kafka 
source connector
URL: https://github.com/apache/incubator-plc4x/pull/17
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
index 45ae926eb..189920886 100644
--- 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
+++ 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkConnector.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.sink.SinkConnector;
@@ -35,7 +36,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 static final String QUERY_CONFIG = "query";
 private static final String QUERY_DOC = "Field query to be sent to the 
PLC";
 
-private static final ConfigDef CONFIG_DEF = new ConfigDef()
+static final ConfigDef CONFIG_DEF = new ConfigDef()
 .define(URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, 
URL_DOC)
 .define(QUERY_CONFIG, ConfigDef.Type.STRING, 
ConfigDef.Importance.HIGH, QUERY_DOC);
 
@@ -59,8 +60,9 @@ Licensed to the Apache Software Foundation (ASF) under one
 
 @Override
 public void start(Map props) {
-url = props.get(URL_CONFIG);
-query = props.get(QUERY_CONFIG);
+AbstractConfig config = new 
AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+url = config.getString(URL_CONFIG);
+query = config.getString(QUERY_CONFIG);
 }
 
 @Override
diff --git 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
index b29418f18..a54d5b08b 100644
--- 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
+++ 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSinkTask.java
@@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
@@ -33,8 +34,6 @@ Licensed to the Apache Software Foundation (ASF) under one
 import java.util.concurrent.ExecutionException;
 
 public class Plc4xSinkTask extends SinkTask {
-private final static String FIELD_KEY = "key"; // TODO: is this really 
necessary?
-
 private String url;
 private String query;
 
@@ -48,8 +47,9 @@ public String version() {
 
 @Override
 public void start(Map props) {
-url = props.get(Plc4xSinkConnector.URL_CONFIG);
-query = props.get(Plc4xSinkConnector.QUERY_CONFIG);
+AbstractConfig config = new 
AbstractConfig(Plc4xSinkConnector.CONFIG_DEF, props);
+url = config.getString(Plc4xSinkConnector.URL_CONFIG);
+query = config.getString(Plc4xSinkConnector.QUERY_CONFIG);
 
 openConnection();
 
@@ -66,7 +66,7 @@ public void stop() {
 public void put(Collection records) {
 for (SinkRecord record: records) {
 String value = record.value().toString(); // TODO: implement other 
data types
-PlcWriteRequest plcRequest = 
plcWriter.writeRequestBuilder().addItem(FIELD_KEY, query, value).build();
+PlcWriteRequest plcRequest = 
plcWriter.writeRequestBuilder().addItem(query, query, value).build();
 doWrite(plcRequest);
 }
 }
diff --git 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
index 4d1d9d026..4d014a535 100644
--- 
a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
+++ 
b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java
@@ -18,13 +18,15 @@ Licensed to the Apache Software Foundation (ASF) under one
 */
 package org.apache.plc4x.kafka;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
+import org.apache.kafka.connect.util.ConnectorUtils;
 import org.apache.plc4x.kafka.util.VersionUtil;
 
-import java.util.Collections;
 im

[GitHub] skorikov opened a new pull request #17: Add support for multiple queries in kafka source connector

2018-09-11 Thread GitBox
skorikov opened a new pull request #17: Add support for multiple queries in 
kafka source connector
URL: https://github.com/apache/incubator-plc4x/pull/17
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services