Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66606216
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
    @@ -735,6 +738,123 @@ public void flatMap(Tuple3<Integer, Integer, String> 
value, Collector<Integer> o
                }
        }
     
    +   /**
    +    * Runs a table source test with JSON data.
    +    *
    +    * The table source needs to parse the following JSON fields:
    +    * - "long" -> number
    +    * - "string" -> "string"
    +    * - "boolean" -> true|false
    +    * - "double" -> fraction
    +    */
    +   public void runJsonTableSource(String topic, KafkaTableSource 
kafkaTableSource) throws Exception {
    +           final ObjectMapper mapper = new ObjectMapper();
    +
    +           final int numElements = 1024;
    +           final long[] longs = new long[numElements];
    +           final String[] strings = new String[numElements];
    +           final boolean[] booleans = new boolean[numElements];
    +           final double[] doubles = new double[numElements];
    +
    +           final byte[][] serializedJson = new byte[numElements][];
    +
    +           ThreadLocalRandom random = ThreadLocalRandom.current();
    +
    +           for (int i = 0; i < numElements; i++) {
    +                   longs[i] = random.nextLong();
    +                   strings[i] = Integer.toHexString(random.nextInt());
    +                   booleans[i] = random.nextBoolean();
    +                   doubles[i] = random.nextDouble();
    +
    +                   ObjectNode entry = mapper.createObjectNode();
    +                   entry.put("long", longs[i]);
    +                   entry.put("string", strings[i]);
    +                   entry.put("boolean", booleans[i]);
    +                   entry.put("double", doubles[i]);
    +
    +                   serializedJson[i] = mapper.writeValueAsBytes(entry);
    +           }
    +
    +           // Produce serialized JSON data
    +           createTestTopic(topic, 1, 1);
    +
    +           StreamExecutionEnvironment env = StreamExecutionEnvironment
    +                           .createRemoteEnvironment("localhost", 
flinkPort);
    +           env.getConfig().disableSysoutLogging();
    +
    +           env.addSource(new SourceFunction<byte[]>() {
    +                   @Override
    +                   public void run(SourceContext<byte[]> ctx) throws 
Exception {
    +                           for (int i = 0; i < numElements; i++) {
    +                                   ctx.collect(serializedJson[i]);
    +                           }
    +                   }
    +
    +                   @Override
    +                   public void cancel() {
    +                   }
    +           }).addSink(kafkaServer.getProducer(
    +                           topic,
    +                           new ByteArraySerializationSchema(),
    +                           standardProps,
    +                           null));
    +
    +           // Execute blocks
    +           env.execute();
    +
    +           // Register as table source
    +           StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.getTableEnvironment(env);
    +           tableEnvironment.registerTableSource("kafka", kafkaTableSource);
    +
    +           Table result = tableEnvironment.ingest("kafka");
    +
    +           tableEnvironment.toDataStream(result, Row.class).addSink(new 
SinkFunction<Row>() {
    +
    +                   int i = 0;
    +
    +                   @Override
    +                   public void invoke(Row value) throws Exception {
    +                           if (i > numElements) {
    +                                   throw new 
IllegalStateException("Received too many rows.");
    +                           }
    +
    +                           assertEquals(longs[i], value.productElement(0));
    +                           assertEquals(strings[i], 
value.productElement(1));
    +                           assertEquals(booleans[i], 
value.productElement(2));
    +                           assertEquals(doubles[i], 
value.productElement(3));
    +
    +                           if (i == numElements-1) {
    +                                   throw new SuccessException();
    --- End diff --
    
    Yes, the first check can be removed. I think it's fine to just have the 
exact check in place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to