asfgit closed pull request #19: Add URL Field to Key Schema in Kafka Source Connector URL: https://github.com/apache/incubator-plc4x/pull/19
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java index c354a1ea2..7d0ed8621 100644 --- a/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java +++ b/integrations/apache-kafka/src/main/java/org/apache/plc4x/kafka/Plc4xSourceTask.java @@ -20,6 +20,8 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -32,10 +34,7 @@ Licensed to the Apache Software Foundation (ASF) under one import org.apache.plc4x.java.api.types.PlcResponseCode; import org.apache.plc4x.kafka.util.VersionUtil; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.*; /** @@ -48,6 +47,15 @@ Licensed to the Apache Software Foundation (ASF) under one private final static long WAIT_LIMIT_MILLIS = 100; private final static long TIMEOUT_LIMIT_MILLIS = 5000; + private final static String URL_FIELD = "url"; + private final static String QUERY_FIELD = "query"; + + private final static Schema KEY_SCHEMA = + new SchemaBuilder(Schema.Type.STRUCT) + .field(URL_FIELD, Schema.STRING_SCHEMA) + .field(QUERY_FIELD, Schema.STRING_SCHEMA) + .build(); + private String topic; private String url; private List<String> queries; @@ -56,6 +64,8 @@ Licensed to the Apache Software Foundation (ASF) under one private PlcReader plcReader; private PlcReadRequest plcRequest; + + // TODO: should we use shared (static) thread pool for this? private ScheduledExecutorService scheduler; private ScheduledFuture<?> timer; @@ -166,11 +176,16 @@ private synchronized boolean awaitFetch(long milliseconds) throws InterruptedExc continue; } - Object rawValue = response.getObject(query); - Schema valueSchema = getSchema(rawValue.getClass()); - Object value = valueSchema.equals(Schema.STRING_SCHEMA) ? rawValue.toString() : rawValue; + Struct key = new Struct(KEY_SCHEMA) + .put(URL_FIELD, url) + .put(QUERY_FIELD, query); + + Object value = response.getObject(query); + Schema valueSchema = getSchema(value); Long timestamp = System.currentTimeMillis(); - Map<String, String> sourcePartition = Collections.singletonMap("url", url); + Map<String, String> sourcePartition = new HashMap<>(); + sourcePartition.put("url", url); + sourcePartition.put("query", query); Map<String, Long> sourceOffset = Collections.singletonMap("offset", timestamp); SourceRecord record = @@ -178,8 +193,8 @@ private synchronized boolean awaitFetch(long milliseconds) throws InterruptedExc sourcePartition, sourceOffset, topic, - Schema.STRING_SCHEMA, - query, + KEY_SCHEMA, + key, valueSchema, value ); @@ -190,20 +205,38 @@ private synchronized boolean awaitFetch(long milliseconds) throws InterruptedExc return result; } - private Schema getSchema(Class<?> type) { - if (type.equals(Byte.class)) + private Schema getSchema(Object value) { + Objects.requireNonNull(value); + + if (value instanceof Byte) return Schema.INT8_SCHEMA; - if (type.equals(Short.class)) + if (value instanceof Short) return Schema.INT16_SCHEMA; - if (type.equals(Integer.class)) + if (value instanceof Integer) return Schema.INT32_SCHEMA; - if (type.equals(Long.class)) + if (value instanceof Long) return Schema.INT64_SCHEMA; - return Schema.STRING_SCHEMA; // default case; invoke .toString on value + if (value instanceof Float) + return Schema.FLOAT32_SCHEMA; + + if (value instanceof Double) + return Schema.FLOAT64_SCHEMA; + + if (value instanceof Boolean) + return Schema.BOOLEAN_SCHEMA; + + if (value instanceof String) + return Schema.STRING_SCHEMA; + + if (value instanceof byte[]) + return Schema.BYTES_SCHEMA; + + // TODO: add support for collective and complex types + throw new ConnectException(String.format("Unsupported data type %s", value.getClass().getName())); } } \ No newline at end of file diff --git a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java index 3966deb8a..b93e545e4 100644 --- a/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java +++ b/plc4j/protocols/test/src/main/java/org/apache/plc4x/java/test/TestDevice.java @@ -69,22 +69,45 @@ void set(TestField field, FieldItem value) { private FieldItem randomValue(Class<?> type) { Object result = null; - // TODO: implement for further data types + if (type.equals(Byte.class)) + result = (byte) random.nextInt(1 << 8); - if (type == Integer.class) + if (type.equals(Short.class)) + result = (short) random.nextInt(1 << 16); + + if (type.equals(Integer.class)) result = random.nextInt(); - if (type == Byte.class) { - byte[] bytes = new byte[1]; - random.nextBytes(bytes); - result = bytes[0]; + if (type.equals(Long.class)) + result = random.nextLong(); + + if (type.equals(Float.class)) + result = random.nextFloat(); + + if (type.equals(Double.class)) + result = random.nextDouble(); + + if (type.equals(Boolean.class)) + result = random.nextBoolean(); + + if (type.equals(String.class)) { + int length = random.nextInt(100); + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + char c = (char)('a' + random.nextInt(26)); + sb.append(c); + } + result = sb.toString(); } - if (type == Short.class) { - result = random.nextInt(1 << 16); + if (type.equals(byte[].class)) { + int length = random.nextInt(100); + byte[] bytes = new byte[length]; + random.nextBytes(bytes); + result = bytes; } - return new TestFieldItem(new Object[]{result}); + return new TestFieldItem(new Object[] { result }); } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services