Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/2069#discussion_r66606216 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -735,6 +738,123 @@ public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> o } } + /** + * Runs a table source test with JSON data. + * + * The table source needs to parse the following JSON fields: + * - "long" -> number + * - "string" -> "string" + * - "boolean" -> true|false + * - "double" -> fraction + */ + public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception { + final ObjectMapper mapper = new ObjectMapper(); + + final int numElements = 1024; + final long[] longs = new long[numElements]; + final String[] strings = new String[numElements]; + final boolean[] booleans = new boolean[numElements]; + final double[] doubles = new double[numElements]; + + final byte[][] serializedJson = new byte[numElements][]; + + ThreadLocalRandom random = ThreadLocalRandom.current(); + + for (int i = 0; i < numElements; i++) { + longs[i] = random.nextLong(); + strings[i] = Integer.toHexString(random.nextInt()); + booleans[i] = random.nextBoolean(); + doubles[i] = random.nextDouble(); + + ObjectNode entry = mapper.createObjectNode(); + entry.put("long", longs[i]); + entry.put("string", strings[i]); + entry.put("boolean", booleans[i]); + entry.put("double", doubles[i]); + + serializedJson[i] = mapper.writeValueAsBytes(entry); + } + + // Produce serialized JSON data + createTestTopic(topic, 1, 1); + + StreamExecutionEnvironment env = StreamExecutionEnvironment + .createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + + env.addSource(new SourceFunction<byte[]>() { + @Override + public void run(SourceContext<byte[]> ctx) throws Exception { + for (int i = 0; i < numElements; i++) { + ctx.collect(serializedJson[i]); + } + } + + @Override + public void cancel() { + } + }).addSink(kafkaServer.getProducer( + topic, + new ByteArraySerializationSchema(), + standardProps, + null)); + + // Execute blocks + env.execute(); + + // Register as table source + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env); + tableEnvironment.registerTableSource("kafka", kafkaTableSource); + + Table result = tableEnvironment.ingest("kafka"); + + tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() { + + int i = 0; + + @Override + public void invoke(Row value) throws Exception { + if (i > numElements) { + throw new IllegalStateException("Received too many rows."); + } + + assertEquals(longs[i], value.productElement(0)); + assertEquals(strings[i], value.productElement(1)); + assertEquals(booleans[i], value.productElement(2)); + assertEquals(doubles[i], value.productElement(3)); + + if (i == numElements-1) { + throw new SuccessException(); --- End diff -- Yes, the first check can be removed. I think it's fine to just have the exact check in place.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---