[
https://issues.apache.org/jira/browse/FLINK-3872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324381#comment-15324381
]
ASF GitHub Bot commented on FLINK-3872:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2069#discussion_r66606216
--- Diff:
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
@@ -735,6 +738,123 @@ public void flatMap(Tuple3<Integer, Integer, String>
value, Collector<Integer> o
}
}
+ /**
+ * Runs a table source test with JSON data.
+ *
+ * The table source needs to parse the following JSON fields:
+ * - "long" -> number
+ * - "string" -> "string"
+ * - "boolean" -> true|false
+ * - "double" -> fraction
+ */
+ public void runJsonTableSource(String topic, KafkaTableSource
kafkaTableSource) throws Exception {
+ final ObjectMapper mapper = new ObjectMapper();
+
+ final int numElements = 1024;
+ final long[] longs = new long[numElements];
+ final String[] strings = new String[numElements];
+ final boolean[] booleans = new boolean[numElements];
+ final double[] doubles = new double[numElements];
+
+ final byte[][] serializedJson = new byte[numElements][];
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ for (int i = 0; i < numElements; i++) {
+ longs[i] = random.nextLong();
+ strings[i] = Integer.toHexString(random.nextInt());
+ booleans[i] = random.nextBoolean();
+ doubles[i] = random.nextDouble();
+
+ ObjectNode entry = mapper.createObjectNode();
+ entry.put("long", longs[i]);
+ entry.put("string", strings[i]);
+ entry.put("boolean", booleans[i]);
+ entry.put("double", doubles[i]);
+
+ serializedJson[i] = mapper.writeValueAsBytes(entry);
+ }
+
+ // Produce serialized JSON data
+ createTestTopic(topic, 1, 1);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createRemoteEnvironment("localhost",
flinkPort);
+ env.getConfig().disableSysoutLogging();
+
+ env.addSource(new SourceFunction<byte[]>() {
+ @Override
+ public void run(SourceContext<byte[]> ctx) throws
Exception {
+ for (int i = 0; i < numElements; i++) {
+ ctx.collect(serializedJson[i]);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }).addSink(kafkaServer.getProducer(
+ topic,
+ new ByteArraySerializationSchema(),
+ standardProps,
+ null));
+
+ // Execute blocks
+ env.execute();
+
+ // Register as table source
+ StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.getTableEnvironment(env);
+ tableEnvironment.registerTableSource("kafka", kafkaTableSource);
+
+ Table result = tableEnvironment.ingest("kafka");
+
+ tableEnvironment.toDataStream(result, Row.class).addSink(new
SinkFunction<Row>() {
+
+ int i = 0;
+
+ @Override
+ public void invoke(Row value) throws Exception {
+ if (i > numElements) {
+ throw new
IllegalStateException("Received too many rows.");
+ }
+
+ assertEquals(longs[i], value.productElement(0));
+ assertEquals(strings[i],
value.productElement(1));
+ assertEquals(booleans[i],
value.productElement(2));
+ assertEquals(doubles[i],
value.productElement(3));
+
+ if (i == numElements-1) {
+ throw new SuccessException();
--- End diff --
Yes, the first check can be removed. I think it's fine to just have the
exact check in place.
> Add Kafka TableSource with JSON serialization
> ---------------------------------------------
>
> Key: FLINK-3872
> URL: https://issues.apache.org/jira/browse/FLINK-3872
> Project: Flink
> Issue Type: New Feature
> Components: Table API
> Reporter: Fabian Hueske
> Assignee: Ufuk Celebi
> Fix For: 1.1.0
>
>
> Add a Kafka TableSource which reads JSON serialized data.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)