[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-13-11-20-22-134.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: pull-request-available, starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, > image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, > image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, > image-2021-07-13-11-02-37-451.png, image-2021-07-13-11-03-30-740.png, > image-2021-07-13-11-15-06-977.png, image-2021-07-13-11-15-47-392.png, > image-2021-07-13-11-20-22-134.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-13-11-15-47-392.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: pull-request-available, starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, > image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, > image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, > image-2021-07-13-11-02-37-451.png, image-2021-07-13-11-03-30-740.png, > image-2021-07-13-11-15-06-977.png, image-2021-07-13-11-15-47-392.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-13-11-15-06-977.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: pull-request-available, starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, > image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, > image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, > image-2021-07-13-11-02-37-451.png, image-2021-07-13-11-03-30-740.png, > image-2021-07-13-11-15-06-977.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-13-11-03-30-740.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: pull-request-available, starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, > image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, > image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, > image-2021-07-13-11-02-37-451.png, image-2021-07-13-11-03-30-740.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I",
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-13-11-02-37-451.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: pull-request-available, starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, > image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, > image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, > image-2021-07-13-11-02-37-451.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-22969: --- Labels: pull-request-available starter (was: starter) > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: pull-request-available, starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, > image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, > image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-06-19-07-01-607.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, > image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-06-19-07-27-936.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, > image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, > image-2021-07-06-19-07-27-936.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-06-19-05-48-964.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, > image-2021-07-06-19-05-48-964.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-06-19-04-53-651.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-06-19-04-16-530.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-06-19-03-32-050.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, > image-2021-07-06-19-04-16-530.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-06-19-03-22-899.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, > image-2021-07-06-19-03-22-899.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, > "payload 3"), > changelogRow( >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-06-19-01-22-483.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, > "payload 3"), > changelogRow( > "-U", >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-06-18-55-54-109.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > Attachments: image-2021-07-06-18-55-22-235.png, > image-2021-07-06-18-55-54-109.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, > "payload 3"), > changelogRow( > "-U", > 2L, >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] longwang0616 updated FLINK-22969: - Attachment: image-2021-07-06-18-55-22-235.png > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > Attachments: image-2021-07-06-18-55-22-235.png > > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, > "payload 3"), > changelogRow( > "-U", > 2L, > "name 2", >
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johannes Moser updated FLINK-22969: --- Labels: starter (was: ) > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > Labels: starter > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, > "payload 3"), > changelogRow( > "-U", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123")
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-22969: Component/s: Table SQL / Ecosystem Connectors / Kafka > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, > "payload 3"), > changelogRow( > "-U", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang updated FLINK-22969: -- Issue Type: Improvement (was: Bug) > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement >Reporter: Shengkai Fang >Priority: Major > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, > "payload 3"), > changelogRow( > "-U", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "pa
[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function
[ https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shengkai Fang updated FLINK-22969: -- Affects Version/s: 1.14.0 > Validate the topic is not null or empty string when create kafka source/sink > function > -- > > Key: FLINK-22969 > URL: https://issues.apache.org/jira/browse/FLINK-22969 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.14.0 >Reporter: Shengkai Fang >Priority: Major > > Add test in UpsertKafkaTableITCase > {code:java} > @Test > public void testSourceSinkWithKeyAndPartialValue() throws Exception { > // we always use a different topic name for each parameterized topic, > // in order to make sure the topic can be created. > final String topic = "key_partial_value_topic_" + format; > createTestTopic(topic, 1, 1); // use single partition to guarantee > orders in tests > // -- Produce an event time stream into Kafka > --- > String bootstraps = standardProps.getProperty("bootstrap.servers"); > // k_user_id and user_id have different data types to verify the > correct mapping, > // fields are reordered on purpose > final String createTable = > String.format( > "CREATE TABLE upsert_kafka (\n" > + " `k_user_id` BIGINT,\n" > + " `name` STRING,\n" > + " `timestamp` TIMESTAMP(3) METADATA,\n" > + " `k_event_id` BIGINT,\n" > + " `user_id` INT,\n" > + " `payload` STRING,\n" > + " PRIMARY KEY (k_event_id, k_user_id) NOT > ENFORCED" > + ") WITH (\n" > + " 'connector' = 'upsert-kafka',\n" > + " 'topic' = '%s',\n" > + " 'properties.bootstrap.servers' = '%s',\n" > + " 'key.format' = '%s',\n" > + " 'key.fields-prefix' = 'k_',\n" > + " 'value.format' = '%s',\n" > + " 'value.fields-include' = 'EXCEPT_KEY'\n" > + ")", > "", bootstraps, format, format); > tEnv.executeSql(createTable); > String initialValues = > "INSERT INTO upsert_kafka\n" > + "VALUES\n" > + " (1, 'name 1', TIMESTAMP '2020-03-08 > 13:12:11.123', 100, 41, 'payload 1'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-09 > 13:12:11.123', 101, 42, 'payload 2'),\n" > + " (3, 'name 3', TIMESTAMP '2020-03-10 > 13:12:11.123', 102, 43, 'payload 3'),\n" > + " (2, 'name 2', TIMESTAMP '2020-03-11 > 13:12:11.123', 101, 42, 'payload')"; > tEnv.executeSql(initialValues).await(); > // -- Consume stream from Kafka --- > final List result = collectRows(tEnv.sqlQuery("SELECT * FROM > upsert_kafka"), 5); > final List expected = > Arrays.asList( > changelogRow( > "+I", > 1L, > "name 1", > > LocalDateTime.parse("2020-03-08T13:12:11.123"), > 100L, > 41, > "payload 1"), > changelogRow( > "+I", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, > "payload 2"), > changelogRow( > "+I", > 3L, > "name 3", > > LocalDateTime.parse("2020-03-10T13:12:11.123"), > 102L, > 43, > "payload 3"), > changelogRow( > "-U", > 2L, > "name 2", > > LocalDateTime.parse("2020-03-09T13:12:11.123"), > 101L, > 42, >