[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
longwang0616 updated FLINK-22969: --------------------------------- Attachment: image-2021-07-13-11-02-37-451.png > Validate the topic is not null or empty string when create kafka source/sink > function > -------------------------------------------------------------------------------------- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem > Affects Versions: 1.14.0 > Reporter: Shengkai Fang > Priority: Major > Labels: pull-request-available, starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, > image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, > image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, > image-2021-07-13-11-02-37-451.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // ---------- Produce an event time stream into Kafka > ------------------- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // ---------- Consume stream from Kafka ------------------- > final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List<Row> expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, > "payload 3"), > changelogRow( > "-U", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+U", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-11T13:12:11.123"), > 101L, > 42, > "payload")); > assertThat(result, deepEqualTo(expected, true)); > // ------------- cleanup ------------------- > deleteTestTopic(topic); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)