[ https://issues.apache.org/jira/browse/FLINK-3872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324346#comment-15324346 ]
ASF GitHub Bot commented on FLINK-3872: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2069#discussion_r66603012 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -735,6 +738,123 @@ public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> o } } + /** + * Runs a table source test with JSON data. + * + * The table source needs to parse the following JSON fields: + * - "long" -> number + * - "string" -> "string" + * - "boolean" -> true|false + * - "double" -> fraction + */ + public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception { + final ObjectMapper mapper = new ObjectMapper(); + + final int numElements = 1024; + final long[] longs = new long[numElements]; + final String[] strings = new String[numElements]; + final boolean[] booleans = new boolean[numElements]; + final double[] doubles = new double[numElements]; + + final byte[][] serializedJson = new byte[numElements][]; + + ThreadLocalRandom random = ThreadLocalRandom.current(); + + for (int i = 0; i < numElements; i++) { + longs[i] = random.nextLong(); + strings[i] = Integer.toHexString(random.nextInt()); + booleans[i] = random.nextBoolean(); + doubles[i] = random.nextDouble(); + + ObjectNode entry = mapper.createObjectNode(); + entry.put("long", longs[i]); + entry.put("string", strings[i]); + entry.put("boolean", booleans[i]); + entry.put("double", doubles[i]); + + serializedJson[i] = mapper.writeValueAsBytes(entry); + } + + // Produce serialized JSON data + createTestTopic(topic, 1, 1); + + StreamExecutionEnvironment env = StreamExecutionEnvironment + .createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + + env.addSource(new SourceFunction<byte[]>() { + @Override + public void run(SourceContext<byte[]> ctx) throws Exception { + for (int i = 0; i < numElements; i++) { + ctx.collect(serializedJson[i]); + } + } + + @Override + public void cancel() { + } + }).addSink(kafkaServer.getProducer( + topic, + new ByteArraySerializationSchema(), + standardProps, + null)); + + // Execute blocks + env.execute(); + + // Register as table source + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env); + tableEnvironment.registerTableSource("kafka", kafkaTableSource); + + Table result = tableEnvironment.ingest("kafka"); + + tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() { + + int i = 0; + + @Override + public void invoke(Row value) throws Exception { + if (i > numElements) { + throw new IllegalStateException("Received too many rows."); + } + + assertEquals(longs[i], value.productElement(0)); + assertEquals(strings[i], value.productElement(1)); + assertEquals(booleans[i], value.productElement(2)); + assertEquals(doubles[i], value.productElement(3)); + + if (i == numElements-1) { + throw new SuccessException(); --- End diff -- Doesn't this prevent to check whether the source emits too many records, i.e., the check in line 817 would never evaluate `true` right? > Add Kafka TableSource with JSON serialization > --------------------------------------------- > > Key: FLINK-3872 > URL: https://issues.apache.org/jira/browse/FLINK-3872 > Project: Flink > Issue Type: New Feature > Components: Table API > Reporter: Fabian Hueske > Assignee: Ufuk Celebi > Fix For: 1.1.0 > > > Add a Kafka TableSource which reads JSON serialized data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)