Hi y'all,
I have a question regarding your Kafka Source Connector. It seems as if the
Connector does not execute the jobs. Ill give you some code and thoughts and I
hope you can tell me whether the mistake is on my side or that might be a bug.
So this is the current kafka connect config file:
{
"config": {
"name": "plc4x-source-sensors-raw-data",
"connector.class": "org.apache.plc4x.kafka.Plc4xSourceConnector",
"default.topic": "default_topic",
"tasks.max": "1",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"sources": "machine",
"sources.machine.connectionString":
"opcua:tcp://10.202.70.13:4840?discovery=false",
"sources.machine.pollReturnInterval": "5000",
"sources.machine.bufferSize": "1000",
"sources.machine.jobReferences": "sensors",
"sources.machine.jobReferences.sensors.topic": "test_plc_4x",
"jobs": "sensors",
"jobs.sensors.interval": "1000",
"jobs.sensors.tags": "test1,test2,test3,test4",
"jobs.sensors.tags.test1": "ns=2;s=cfl.FV.rSupCircCurr;REAL",
"jobs.sensors.tags.test2": "ns=2;s=cfl.FV.rSupCircCounter;REAL",
"jobs.sensors.tags.test3": "ns=2;s=wir.FV.rWRTemp;REAL",
"jobs.sensors.tags.test4": "ns=2;s=wir.FV.rVerlegRePosition;REAL",
"bootstrap.servers": "stargate-kafka:29092",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"offset.storage.file.filename":"/tmp/connect.offsets",
"offset.flush.interval.ms":"10000",
"enable.idempotence":"true",
"acks":"all"
},
"name": "plc4x-source-sensors-raw-data"
}
The log in my Kafka connect displays this message here:
[2023-12-22 22:28:18,792] INFO
WorkerSourceTask{id=plc4x-source-sensors-raw-data-0} Either no records were
produced by the task since the last offset commit, or every record has been
filtered out by a transformation or dropped due to transformation or conversion
errors. (org.apache.kafka.connect.runtime.WorkerSourceTask)
It seems as if there's a job being executed but in fact it's not. This is the
code from the task:
log.info("before the scraper");
scraper = new TriggeredScraperImpl(scraperConfig, (jobName,
sourceName, results) -> {
try {
Long timestamp = System.currentTimeMillis();
Map<String, String> sourcePartition = new HashMap<>();
sourcePartition.put("sourceName", sourceName);
sourcePartition.put("jobName", jobName);
Map<String, Long> sourceOffset =
Collections.singletonMap("offset", timestamp);
log.info("step 1");
String topic = topics.get(jobName);
// Prepare the key structure.
Struct key = new Struct(KEY_SCHEMA)
.put(Constants.SOURCE_NAME_FIELD, sourceName)
.put(Constants.JOB_NAME_FIELD, jobName);
// Build the Schema for the result struct.
SchemaBuilder tagSchemaBuilder = SchemaBuilder.struct()
.name("org.apache.plc4x.kafka.schema.Tag");
log.info("step 2");
for (Map.Entry<String, Object> result : results.entrySet())
{
// Get tag-name and -value from the results.
String tagName = result.getKey();
Object tagValue = result.getValue();
// Get the schema for the given value type.
Schema valueSchema = getSchema(tagValue);
// Add the schema description for the current tag.
tagSchemaBuilder.field(tagName, valueSchema);
}
Schema tagSchema = tagSchemaBuilder.build();
log.info("step 3");
Schema recordSchema = SchemaBuilder.struct()
.name("org.apache.plc4x.kafka.schema.JobResult")
.doc("PLC Job result. This contains all of the
received PLCValues as well as a received timestamp")
.field(Constants.TAGS_CONFIG, tagSchema)
.field(Constants.TIMESTAMP_CONFIG,
Schema.INT64_SCHEMA)
.field(Constants.EXPIRES_CONFIG,
Schema.OPTIONAL_INT64_SCHEMA)
.build();
// Build the struct itself.
Struct tagStruct = new Struct(tagSchema);
for (Map.Entry<String, Object> result : results.entrySet())
{
// Get tag-name and -value from the results.
String tagName = result.getKey();
Object tagValue = result.getValue();
if (tagSchema.field(tagName).schema().type() ==
Schema.Type.ARRAY) {
tagStruct.put(tagName, ((List)
tagValue).stream().map(p -> ((PlcValue) p).getObject())
.collect(Collectors.toList()));
} else {
tagStruct.put(tagName, tagValue);
}
}
Struct recordStruct = new Struct(recordSchema)
.put(Constants.TAGS_CONFIG, tagStruct)
.put(Constants.TIMESTAMP_CONFIG, timestamp);
// Prepare the source-record element.
SourceRecord sourceRecord = new SourceRecord(
sourcePartition, sourceOffset,
topic,
KEY_SCHEMA, key,
recordSchema, recordStruct);
log.info("Thats the record:", sourceRecord);
// Add the new source-record to the buffer.
buffer.add(sourceRecord);
} catch (Exception e) {
log.error("Error while parsing returned values", e);
}
}, triggerCollector);
log.info("after the scraper");
I left some logging statements as you can see and only the statements before
and after scraper are being executed and the steps in between when the scraper
is started are not being displayed, so my assumption is that the ode which
produces the actual message is not being executed or am I wrong here?
For making sure it is not related to the plc and the tags, I used the read plc
as you can see here:
java -jar .\target\plc4j-examples-hello-world-plc4x-read-0.11.0-uber-jar.jar
--connection-string "opcua:tcp://10.202.70.13:4840?discovery=false"
--tag-addresses "ns=2;s=cfl.FV.rSupCircCurr;REAL"
"ns=2;s=cfl.FV.rSupCircCounter;REAL" "ns=2;s=wir.FV.rWRTemp;REAL"
"ns=2;s=wir.FV.rVerlegRePosition;REAL"
23:40:50.121 [main] INFO o.a.p.j.e.h.read.HelloPlc4xRead - Synchronous request
...
23:40:50.170 [main] INFO o.a.p.j.e.h.read.HelloPlc4xRead -
Value[value-ns=2;s=cfl.FV.rSupCircCurr;REAL]: 0.046645667
23:40:50.171 [main] INFO o.a.p.j.e.h.read.HelloPlc4xRead -
Value[value-ns=2;s=cfl.FV.rSupCircCounter;REAL]: 1911.0
23:40:50.171 [main] INFO o.a.p.j.e.h.read.HelloPlc4xRead -
Value[value-ns=2;s=wir.FV.rWRTemp;REAL]: 23.61865
23:40:50.171 [main] INFO o.a.p.j.e.h.read.HelloPlc4xRead -
Value[value-ns=2;s=wir.FV.rVerlegRePosition;REAL]: 235.443
and from my understanding of how the source connector works, it should be the
same because you stated you wanna implement subscribe functionality next year.
So it should use the same continuous read functionality, right?
I hope that you can help me out. Thanks in advance for reading the mail and I
hope you find the time to send me an answer.
Best regards
Peter
Die vorangehende E-Mail-Nachricht, einschlie?lich aller Anlagen, kann
vertrauliche Informationen beinhalten und ist nur f?r den oder die vorgesehenen
Empf?nger bestimmt. Unberechtigter Zugriff, Nachbearbeitung, Verwendung,
Freigabe, Verteilung oder Vervielf?ltigung der Inhalte dieser E-Mail ist
strengstens untersagt. Wenn Sie nicht der beabsichtigte Empf?nger sind,
kontaktieren Sie bitte sofort den Absender und l?schen Sie alle Kopien.
The preceding e-mail message, including any attachments, may contain
confidential information and is for the intended recipient(s) only.
Unauthorized access, review, use, sharing, distribution or reproduction of the
content of this e-mail is strictly prohibited. If you are not the intended
recipient, please contact the sender immediately and delete all copies.
The Mail was scanned by LAPMASTER WOLTERS Security System