@Sagar

Although we could get rid of locks on the SourceTask object by using an AtomicBoolean (or a volatile field, for that matter), we would still need to wait for it to become true. The SourceTask.poll() contract states that "this method should block but return control to the caller regularly (by returning null in order for the task to transition to the PAUSED state if requested to do so". Unfortunately, the AtomicBoolean API does not provide such a blocking operation, hence we would be required to implement it ourselves.

We could poll the AtomicBoolean value; the naive implementation would look something like this:

  while (!fetch.get()) {}

and would burn one CPU core. We could suspend the thread by:

  while (!fetch.get() { Thread.sleep(duration); }

but how do you choose a good value for duration? If it is too long, the effective polling rate would be lower than specified, and if it is too short, we would again waste cycles.

The basic idiom for this kind of synchronization problem is a simple wait/notify scheme, which has in fact been deployed in the code.

Other synchronization primitives have been considered, but rejected for the following reasons:

 - The CountDownLatch can not be reset

 - The CyclicBarrier would block the timer thread (which sets the fetch flag to true), distorting the effective polling rate again

 - The Phaser could work in this scenario, but its API is relatively complex and it would be overkill to employ it for this kind of problem

I hope that helps! :)

Best regards,

Andrey

On 09/11/2018 07:47 PM, Sagar wrote:
@Andrey,

Had a quick question regarding the Source Connector for Kafka connect.

I see that the awaitFetch() method have been created with a synchronized
block within which we are just checking if a particular boolean flag has
been set to true to or not. Considering it's just this and not any other
synchronization that's needed per se, wouldn't it make sense to change the
variable fetch to type AtomicBoolean and remove the synchronized from
method? Considering the only atomic operation to be performed is setting or
getting the value of the flag, can we avoid having synchronized method
blocks ?

Thanks!
Sagar.


On Sun, Sep 9, 2018 at 6:24 PM Christofer Dutz <christofer.d...@c-ware.de>
wrote:

Hi Sagar,

As I didn't hear anything from you, I encouraged colleagues if mine too
help with the adapter. Parallel I merged everything into the
feature/api-refactoring-chris-c branch. As soon as the last drivers are
refactored, we'll merge that back to master. So if you want to have a look,
I would suggest that branch.

Chridutz

Outlook for Android<https://aka.ms/ghei36> herunterladen

________________________________
From: Sagar <sagarmeansoc...@gmail.com>
Sent: Sunday, September 9, 2018 1:15:35 PM
To: dev@plc4x.apache.org
Subject: Re: Kafka Connect Integration

Hi Cristofer,

Looking at the other e-mail that you sent for the work that has happened,
looks like a lot of great progress has been made.

I just got caught up with some other things so could never start off post
our discussions here :(

Wanted to understand, once you have some bandwidth, what are the next steps
with the k-connect integration? Can I sync up with someone and start
looking at some of the pieces?

Thanks!
Sagar.

On Thu, Aug 30, 2018 at 12:47 AM Christofer Dutz <
christofer.d...@c-ware.de>
wrote:

Hi Sagar,

thanks for the Infos ... this way I learn more and more :-)

Looking forward to answering the questions as they come.

Chris

Am 29.08.18, 19:28 schrieb "Sagar" <sagarmeansoc...@gmail.com>:

     Hi Chris,

     Thanks. Typically kafka cluster will be separate set of nodes. And so
would
     be the k-connect workers which will connect to the PLC devices or
databases
     or whatever is the source and push to Kafka.

     I will start off with this information and extend your feature
branch.
     Would keep asking questions along the way

     Sagar.

     On Wed, Aug 29, 2018 at 7:51 PM Christofer Dutz <
christofer.d...@c-ware.de>
     wrote:

     > Hi Sagar,
     >
     > Great that we seem to be on the same page now ;-)
     >
     > Regarding the "Kafka Connecting" ... what I meant is that the
     > Kafca-Connect-PLC4X-Instance connects ... I was assuming the driver
to be
     > running on a Kafka Node, but that's just due to my limited
knowledge
of
     > everything ;-)
     >
     > Well the code for actively reading stuff from a PLC should already
be in
     > my example implementation. It should work out of the box this way
... As I
     > have seen several Mock Drivers implemented in PLC4X, I am currently
     > thinking of implementing one that you should be able to just import
and use
     > ... however I'm currently working hard on refactoring the API
completely,
     > so I would postpone that to after these changes are in there. But
rest
     > assured ... I would handle the refactoring so you could just assume
that it
     > works.
     >
     > Alternatively I could have you an account for our IoT VPN created.
Then
     > you could log-in to our VPN and talk to some real PLCs ...
     >
     > I think I wanted to create an account for Julian, but my guy
responsible
     > for creating them was on holidays ... will re-check this.
     >
     > Chris
     >
     >
     >
     > Am 29.08.18, 16:11 schrieb "Sagar" <sagarmeansoc...@gmail.com>:
     >
     >     Hi Chris,
     >
     >     That's perfectly fine :)
     >
     >     So, the way I understand this now is, we will have a bunch of
worker
     >     nodes(in kafka connect terminology, a worker is a JVM process
which
     > runs a
     >     set of connectors/tasks to poll a source and push data to
Kafka).
     >
     >     So, vis-a-vis a JDBC connection, we will have a connection URL
which
     > will
     >     let us connect to these PLC devices poll(poll in the sense you
meant it
     >     above), and then push data to Kafka. If this looks fine, then
can you
     > give
     >     me some documentation to refer to and also how can I start
testing
     > these?
     >
     >     And just one thing I wanted to clarify when you say Kafka nodes
     > connecting
     >     to devices. That's something which doesn't happen. Kafka
doesn't
     > connect to
     >     any device. I think you just mean it in a more abstract way
right?
     >
     >     @Julian,
     >
     >     Thanks, I was going through the link you sent. So, you're
saying
via
     >     scraping, we can push events to Kafka? Is that already
happening
and we
     >     should look to move this functionality out?
     >
     >     Thanks!
     >     Sagar.
     >
     >
     >     On Wed, Aug 29, 2018 at 11:05 AM Julian Feinauer <
     >     j.feina...@pragmaticminds.de> wrote:
     >
     >     > Hey Sagar,
     >     >
     >     > hey Chris,
     >     >
     >     >
     >     >
     >     > I want to join your discussion for part b3 as this is
something we
     > usually
     >     > require.
     >     >
     >     > We use a module we call the plc-scraper for that task (the
term
     > scraping
     >     > in that content is borrowed from Prometheus where it is used
     > extensively in
     >     > this context [1]).
     >     >
     >     > Generally speaking the scraper takes a config containing
addresses,
     >     > addresses and scrape rates and runs than as daemon and pushes
the
     > scrape
     >     > results downstream (usually Kafka but we also use other
"Queues").
     >     >
     >     >
     >     >
     >     > As we are currently rewriting this scraper I already
considered
     > donating
     >     > it to plc4x in the form of an example or perhaps even as a
standalone
     >     > module.
     >     >
     >     >
     >     >
     >     > So I agree with Chris that this should not be part of the
PlcDriver
     > Level
     >     > but rather on another layer "on top" and I would be more
interested
     > in the
     >     > specification of a "line protocol" which describes how
message
are
     >     > serialized for Kafka (or other sources).
     >     >
     >     > Can we come up with a common "schema" which fits many use
cases?
     >     >
     >     >
     >     >
     >     > Our messages contain the following informations:
     >     >
     >     > - timestamp
     >     >
     >     > - source
     >     >
     >     > - values
     >     >
     >     > - additional tags
     >     >
     >     >
     >     >
     >     > Best
     >     >
     >     > Julian
     >     >
     >     >
     >     >
     >     > [1]
https://prometheus.io/docs/prometheus/latest/getting_started/
     >     >
     >     >
     >     >
     >     > Am 28.08.18, 22:53 schrieb "Christofer Dutz" <
     > christofer.d...@c-ware.de>:
     >     >
     >     >
     >     >
     >     >     Hi Sagar,
     >     >
     >     >
     >     >
     >     >     sorry for not responding ... your mail must have skipped
my eye
     > ...
     >     > sorry for that.
     >     >
     >     >
     >     >
     >     >     a) PLC4X works exactly the same way ... it consists of
     > plc4x-core,
     >     > which only contains the DriverManager and plc4x-api which
contains
     > the API
     >     > clases. So with these two jars you can build a full PLC4X
     > application.
     >     >
     >     >     In order to connect to a PLC you need to add the jar
containing
     > the
     >     > required driver to the classpath.
     >     >
     >     >
     >     >
     >     >     b) Well if using something other than the connection url
as
     > partition
     >     > key as partition key, it is possible that multiple kafka
connect
     > nodes
     >     > would connect to the same PLC. In that case it could be a
problem to
     >     > control the order. I guess using the timestamp when
receiving a
     > response
     >     > (Probably generated by the KC plc4x driver) could be a valid
     > approach.
     >     >
     >     >
     >     >
     >     >     b2) Regarding the infinite loop ... I think we won't need
such a
     >     > mechanism. If we think of a set of fields from a PLC, we can
think
     > of a PLC
     >     > as a one-row database table. Producing diffs should be a lot
simpler
     > that
     >     > way.
     >     >
     >     >
     >     >
     >     >     b3) Regarding push events ... PLC4X has a subscription
mode next
     > to
     >     > the polling. So it would be possible to also define PLC4X
     > datasources that
     >     > actively push events to kafka ... I have to admit that this
would be
     > the
     >     > mode I would prefer most. But as not all protocols and PLCs
support
     > this
     >     > mode, I think it would be safest to use polling and to add
push
     > support
     >     > after that.
     >     >
     >     >
     >     >
     >     >     Regarding different languages: Currently we are
concentrating
     > mainly
     >     > on the Java implementation as it's the biggest challenge to
     > understand and
     >     > implement the protocols. Porting them to other languages
(especially
     > C and
     >     > C++) shouldn't be as hard as implementing the first version.
But
     > that's
     >     > currently a base uncovered as we don't have the resources to
     > implement all
     >     > of them at once.
     >     >
     >     >
     >     >
     >     >     And there are no stupid questions :-)
     >     >
     >     >
     >     >
     >     >     Hope I could answer all of yours. If not, just ask and
I'll
     > probably
     >     > not miss that one ;-)
     >     >
     >     >
     >     >
     >     >     Chris
     >     >
     >     >
     >     >
     >     >
     >     >
     >     >     Am 23.08.18, 19:52 schrieb "Sagar" <
sagarmeansoc...@gmail.com>:
     >     >
     >     >
     >     >
     >     >         Hi Chirstofer,
     >     >
     >     >
     >     >
     >     >         Thanks for the detailed responses. I would like to
ask
a
     > couple of
     >     > more
     >     >
     >     >         questions(which may be borderline naive or stupid :D
).
     >     >
     >     >
     >     >
     >     >         First thing that I would like to know- ignore my lack
of
     > knowledge
     >     > on PLCs-
     >     >
     >     >         but from what I understand are devices which are
small
     > devices
     >     > used to
     >     >
     >     >         execute program instructions. These would have very
small
     > memory
     >     > footprints
     >     >
     >     >         as well I believe? Also, when you say the Siemens one
can
     > handle 20
     >     >
     >     >         connections, would it be from different devices
connecting
     > to it?
     >     > The
     >     >
     >     >         reason I ask these questions are these ->
     >     >
     >     >
     >     >
     >     >         a) The way the kafka-connect framework is executed is
by
     >     > installing the
     >     >
     >     >         whole framework with all the relevant jars needed on
the
     >     > classpath. So, if
     >     >
     >     >         you talk about the JDBC connector for K-Connect, it
would
     > need the
     >     > mysql
     >     >
     >     >         driver jar(for example) and other jars needed to
support the
     >     > framework. If
     >     >
     >     >         we say choose to use avro, then we would need more
jars to
     > support
     >     > that.
     >     >
     >     >         Would we be able to install all that?
     >     >
     >     >
     >     >
     >     >         b) Also, if multiple devices do connect to it, then
won't we
     > have
     >     > events
     >     >
     >     >         arriving out of order from them? Does the ordering
matter
     > amongst
     >     > events
     >     >
     >     >         that are being pushed?
     >     >
     >     >
     >     >
     >     >         Regarding the infinite loop question, the reason JDBC
     > connector
     >     > uses that
     >     >
     >     >         is that it creates tasks for a given table and fires
queries
     > to
     >     > find
     >     >
     >     >         deltas. So, if the polling frequency is 2 seconds,
and
it
     > last ran
     >     > on
     >     >
     >     >         12.00.00 then it would run at 12.00.02 to figure out
what
     > changed
     >     > in that
     >     >
     >     >         time frame. So, the way PlcReaders read() runs, would
it keep
     >     > returning
     >     >
     >     >         newer data?
     >     >
     >     >
     >     >
     >     >         We can skip over the rest of the parts, but looking
at
parts
     > a and
     >     > b above,
     >     >
     >     >         would it make sense to have something like a
kafka-connect
     >     > framework for
     >     >
     >     >         pushing data to Kafka? Also, from the github link,
the
     > drivers are
     >     > to be
     >     >
     >     >         supported in 3 languages as well. How would that play
out?
     >     >
     >     >
     >     >
     >     >         Again- apologies if the questions seem stupid.
     >     >
     >     >
     >     >
     >     >         Thanks!
     >     >
     >     >         Sagar.
     >     >
     >     >
     >     >
     >     >         On Wed, Aug 22, 2018 at 10:39 PM Christofer Dutz <
     >     > christofer.d...@c-ware.de>
     >     >
     >     >         wrote:
     >     >
     >     >
     >     >
     >     >         > Hi Sagar,
     >     >
     >     >         >
     >     >
     >     >         > great that you managed to have a look ... I'll try
to
     > answer your
     >     >
     >     >         > questions.
     >     >
     >     >         > (I like to answer them postfix as whenever emails
are sort
     > of
     >     > answered
     >     >
     >     >         > in-line, they are extremely hard to read and follow
on
     > mobile
     >     > email clients
     >     >
     >     >         > __ )
     >     >
     >     >         >
     >     >
     >     >         > First of all I created the original plugin via the
     > archetype for
     >     >
     >     >         > kafka-connect plugins. The next thing I did, was to
have a
     > look
     >     > at the code
     >     >
     >     >         > of the JDBC Kafka Connect plugin (as you might have
     > guessed) as
     >     > I thought
     >     >
     >     >         > that it would have similar structure as we do.
     > Unfortunately I
     >     > think the
     >     >
     >     >         > JDBC plugin is far more complex than the plc4x
connector
     > will
     >     > have to be. I
     >     >
     >     >         > sort of picked some of the things I liked with the
     > archetype and
     >     > some I
     >     >
     >     >         > liked with the jdbc ... if there was a third, even
cooler
     > option
     >     > ... I will
     >     >
     >     >         > definitely have missed that. So if you think there
is a
     > thing
     >     > worth
     >     >
     >     >         > changing ... you can change anything you like.
     >     >
     >     >         >
     >     >
     >     >         > 1)
     >     >
     >     >         > The code of the jdbc plugin showed such a
while(true) loop,
     >     > however I
     >     >
     >     >         > think this was because the jdbc query could return
a
lot
     > of rows
     >     > and hereby
     >     >
     >     >         > Kafka events. In our case we have one request and
get one
     >     > response. The
     >     >
     >     >         > code in my example directly calls "get()" on the
request
     > and is
     >     > hereby
     >     >
     >     >         > blocking. I don't know if this is good, but from
reading
     > the
     >     > jdbc example,
     >     >
     >     >         > this should be blocking too ...
     >     >
     >     >         > So the PlcReaders read() method returns a
completable
     > future ...
     >     > this
     >     >
     >     >         > could be completed asynchronously and the callback
could
     > fire
     >     > the kafka
     >     >
     >     >         > events, but I didn't know if this was ok with
kafka.
If it
     > is
     >     > possible,
     >     >
     >     >         > please have a look at this example code:
     >     >
     >     >         >
     >     >
     >

https://github.com/apache/incubator-plc4x/blob/master/plc4j/protocols/s7/src/test/java/org/apache/plc4x/java/s7/S7PlcReaderSample.java
     >     >
     >     >         > It demonstrates with comments the different usage
types.
     >     >
     >     >         >
     >     >
     >     >         > While at it ... is there also an option for a Kafka
     > connector
     >     > that is able
     >     >
     >     >         > to push data? So if an incoming event arrives, this
is
     >     > automatically pushed
     >     >
     >     >         > without a fixed polling interval?
     >     >
     >     >         >
     >     >
     >     >         > 2)
     >     >
     >     >         > I have absolutely no idea as I am not quite
familiar
with
     > the
     >     > concepts
     >     >
     >     >         > inside kafka. What I do know is that probably the
     > partition-key
     >     > should be
     >     >
     >     >         > based upon the connection url. The problem is, that
with
     > kafka I
     >     > could have
     >     >
     >     >         > 1000 nodes connecting to one PLC. While Kafka
wouldn't have
     >     > problems with
     >     >
     >     >         > that, the PLCs have very limited resources. So as
far as I
     >     > decoded the
     >     >
     >     >         > responses of my Siemens S7 1200 it can handle up to
20
     >     > connections (Usually
     >     >
     >     >         > a control-system already consuming 2-3 of them)
... I
     > think it
     >     > would be
     >     >
     >     >         > ideal, if on one Kafka node (or partition) there
would be
     > one
     >     > PlcConnection
     >     >
     >     >         > ... this connection should then be shared among all
     > requests to
     >     > a PLC with
     >     >
     >     >         > a shared connection url (I hope I'm not writing
nonsense).
     > So if
     >     > a
     >     >
     >     >         > workerTask is responsible for managing all request
to one
     >     > partition, then
     >     >
     >     >         > I'd say it should be 1 ... otherwise the number
could be
     > bigger.
     >     >
     >     >         >
     >     >
     >     >         > If it makes things easier, I'm absolutely fine with
using
     > those
     >     >
     >     >         > ConnectorUtils
     >     >
     >     >         >
     >     >
     >     >         > Regarding the connector offsets ... are you
referring to
     > that
     >     > counter
     >     >
     >     >         > Kafka uses to let the clients know the sequence of
events
     > and
     >     > which they
     >     >
     >     >         > use to sort of say: "Hi, I have number 237367 of
topic
     > 'ABC',
     >     > plese
     >     >
     >     >         > continue" ... is that what you are referring to? If
it is,
     > well
     >     > ... I have
     >     >
     >     >         > to admit ... I don't know ... ok ... if it isn't
then
     > probably
     >     > also ;-)
     >     >
     >     >         > How do other plugins do this?
     >     >
     >     >         >
     >     >
     >     >         > 3)
     >     >
     >     >         > Well I guess both options would be cool ... JSON is
     > definitely
     >     > simpler,
     >     >
     >     >         > but for high volume transports the binary
counterparts
     >     > definitely are worth
     >     >
     >     >         > consideration. Currently PLC4X tries to deliver
what
you
     >     > request, but
     >     >
     >     >         > that's actually something we're currently
discussing
on
     >     > refactoring. But
     >     >
     >     >         > for the moment - as shown in the example code I
referenced
     > a few
     >     > lines
     >     >
     >     >         > above - you do a TypedRequest and for example ask
for an
     >     > Integer, then you
     >     >
     >     >         > will receive an array (probably of size 1) of
Integers.
     >     >
     >     >         >
     >     >
     >     >         > 4)
     >     >
     >     >         > Well I agree ... well at least I can't even say
that
I
     > make a
     >     > secret about
     >     >
     >     >         > where I stole things from ;-)
     >     >
     >     >         >
     >     >
     >     >         > If I can be of any assistance ... just ask.
     >     >
     >     >         >
     >     >
     >     >         > Thanks for taking the time.
     >     >
     >     >         >
     >     >
     >     >         > Chris
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >     >         > Am 22.08.18, 17:55 schrieb "Sagar" <
     > sagarmeansoc...@gmail.com>:
     >     >
     >     >         >
     >     >
     >     >         >     Hi All,
     >     >
     >     >         >
     >     >
     >     >         >     I was going through the K-Connect stubs created
by
     > Chris in
     >     > the kafka
     >     >
     >     >         >     feature branch.
     >     >
     >     >         >
     >     >
     >     >         >     Some of the findings I found are here(let me
know if
     > they
     >     > are valid or
     >     >
     >     >         > not):
     >     >
     >     >         >
     >     >
     >     >         >     1)
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >

https://github.com/apache/incubator-plc4x/blob/feature/apache-kafka/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java#L98
     >     >
     >     >         >
     >     >
     >     >         >     Should this block of code be within an infinite
loop
     > like
     >     > while(true)?
     >     >
     >     >         > I am
     >     >
     >     >         >     not exactly sure of the semantics of the
PlcReader
     > hence
     >     > asking this
     >     >
     >     >         >     question.
     >     >
     >     >         >
     >     >
     >     >         >     2) Another question is, what are the maxTasks
that we
     >     > envision here?
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >

https://github.com/apache/incubator-plc4x/blob/feature/apache-kafka/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceConnector.java#L46
     >     >
     >     >         >
     >     >
     >     >         >     Also, as part of documentation, there's a
utility
     > called
     >     > ConnectorUtils
     >     >
     >     >         >     which typically should be used to create the
     > configs(not a
     >     > hard and
     >     >
     >     >         > fast
     >     >
     >     >         >     rule though):
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >

https://docs.confluent.io/current/connect/javadocs/index.html?org/apache/kafka/connect/util/ConnectorUtils.html
     >     >
     >     >         >
     >     >
     >     >         >     If we go that route, then we also need to
specify how
     > the
     >     > offsets
     >     >
     >     >         > would be
     >     >
     >     >         >     stored in the offsets topic(by using the task
name).
     > So, if
     >     > it can be
     >     >
     >     >         >     figured out as to how would the connectors be
setup,
     > then
     >     > that'll be
     >     >
     >     >         >     helpful.
     >     >
     >     >         >
     >     >
     >     >         >     3) While building the SourceRecord ->
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >

https://github.com/apache/incubator-plc4x/blob/feature/apache-kafka/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java#L109
     >     >
     >     >         >
     >     >
     >     >         >     , we would also need some DataConverter layer
to
have
     > them
     >     > mapped to
     >     >
     >     >         > the
     >     >
     >     >         >     connect types. Also, which message types would
be
     > supported?
     >     > Json or
     >     >
     >     >         > binary
     >     >
     >     >         >     protocols like Avro/protobuf etc or some other
     > protocols?
     >     > Those things
     >     >
     >     >         >     might also need to be factored in.
     >     >
     >     >         >
     >     >
     >     >         >     4) Lastly, need to remove the JdbcSourceTask
from the
     > catch
     >     > block here
     >     >
     >     >         > :) ->
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >

https://github.com/apache/incubator-plc4x/blob/feature/apache-kafka/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/source/Plc4xSourceTask.java#L67
     >     >
     >     >         >
     >     >
     >     >         >     Thanks!
     >     >
     >     >         >     Sagar.
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >     >         >
     >     >
     >     >
     >     >
     >     >
     >     >
     >     >
     >     >
     >     >
     >     >
     >
     >
     >




Reply via email to