Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2069#discussion_r66603012
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
    @@ -735,6 +738,123 @@ public void flatMap(Tuple3<Integer, Integer, String> 
value, Collector<Integer> o
                }
        }
     
    +   /**
    +    * Runs a table source test with JSON data.
    +    *
    +    * The table source needs to parse the following JSON fields:
    +    * - "long" -> number
    +    * - "string" -> "string"
    +    * - "boolean" -> true|false
    +    * - "double" -> fraction
    +    */
    +   public void runJsonTableSource(String topic, KafkaTableSource 
kafkaTableSource) throws Exception {
    +           final ObjectMapper mapper = new ObjectMapper();
    +
    +           final int numElements = 1024;
    +           final long[] longs = new long[numElements];
    +           final String[] strings = new String[numElements];
    +           final boolean[] booleans = new boolean[numElements];
    +           final double[] doubles = new double[numElements];
    +
    +           final byte[][] serializedJson = new byte[numElements][];
    +
    +           ThreadLocalRandom random = ThreadLocalRandom.current();
    +
    +           for (int i = 0; i < numElements; i++) {
    +                   longs[i] = random.nextLong();
    +                   strings[i] = Integer.toHexString(random.nextInt());
    +                   booleans[i] = random.nextBoolean();
    +                   doubles[i] = random.nextDouble();
    +
    +                   ObjectNode entry = mapper.createObjectNode();
    +                   entry.put("long", longs[i]);
    +                   entry.put("string", strings[i]);
    +                   entry.put("boolean", booleans[i]);
    +                   entry.put("double", doubles[i]);
    +
    +                   serializedJson[i] = mapper.writeValueAsBytes(entry);
    +           }
    +
    +           // Produce serialized JSON data
    +           createTestTopic(topic, 1, 1);
    +
    +           StreamExecutionEnvironment env = StreamExecutionEnvironment
    +                           .createRemoteEnvironment("localhost", 
flinkPort);
    +           env.getConfig().disableSysoutLogging();
    +
    +           env.addSource(new SourceFunction<byte[]>() {
    +                   @Override
    +                   public void run(SourceContext<byte[]> ctx) throws 
Exception {
    +                           for (int i = 0; i < numElements; i++) {
    +                                   ctx.collect(serializedJson[i]);
    +                           }
    +                   }
    +
    +                   @Override
    +                   public void cancel() {
    +                   }
    +           }).addSink(kafkaServer.getProducer(
    +                           topic,
    +                           new ByteArraySerializationSchema(),
    +                           standardProps,
    +                           null));
    +
    +           // Execute blocks
    +           env.execute();
    +
    +           // Register as table source
    +           StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.getTableEnvironment(env);
    +           tableEnvironment.registerTableSource("kafka", kafkaTableSource);
    +
    +           Table result = tableEnvironment.ingest("kafka");
    +
    +           tableEnvironment.toDataStream(result, Row.class).addSink(new 
SinkFunction<Row>() {
    +
    +                   int i = 0;
    +
    +                   @Override
    +                   public void invoke(Row value) throws Exception {
    +                           if (i > numElements) {
    +                                   throw new 
IllegalStateException("Received too many rows.");
    +                           }
    +
    +                           assertEquals(longs[i], value.productElement(0));
    +                           assertEquals(strings[i], 
value.productElement(1));
    +                           assertEquals(booleans[i], 
value.productElement(2));
    +                           assertEquals(doubles[i], 
value.productElement(3));
    +
    +                           if (i == numElements-1) {
    +                                   throw new SuccessException();
    --- End diff --
    
    Doesn't this prevent to check whether the source emits too many records, 
i.e., the check in line 817 would never evaluate `true` right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to