[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-12 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-13-11-20-22-134.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available, starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, 
> image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, 
> image-2021-07-13-11-02-37-451.png, image-2021-07-13-11-03-30-740.png, 
> image-2021-07-13-11-15-06-977.png, image-2021-07-13-11-15-47-392.png, 
> image-2021-07-13-11-20-22-134.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
>

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-12 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-13-11-15-47-392.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available, starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, 
> image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, 
> image-2021-07-13-11-02-37-451.png, image-2021-07-13-11-03-30-740.png, 
> image-2021-07-13-11-15-06-977.png, image-2021-07-13-11-15-47-392.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
>  

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-12 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-13-11-15-06-977.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available, starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, 
> image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, 
> image-2021-07-13-11-02-37-451.png, image-2021-07-13-11-03-30-740.png, 
> image-2021-07-13-11-15-06-977.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> 

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-12 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-13-11-03-30-740.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available, starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, 
> image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, 
> image-2021-07-13-11-02-37-451.png, image-2021-07-13-11-03-30-740.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-12 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-13-11-02-37-451.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available, starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, 
> image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, 
> image-2021-07-13-11-02-37-451.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-07 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-22969:
---
Labels: pull-request-available starter  (was: starter)

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: pull-request-available, starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, 
> image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> 

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-06 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-06-19-07-01-607.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
>  

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-06 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-06-19-07-27-936.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, 
> image-2021-07-06-19-07-27-936.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-06 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-06-19-05-48-964.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-06 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-06-19-04-53-651.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-06 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-06-19-04-16-530.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
>

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-06 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-06-19-03-32-050.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
>

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-06 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-06-19-03-22-899.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
> "payload 3"),
> changelogRow(
> 

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-06 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-06-19-01-22-483.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
> "payload 3"),
> changelogRow(
> "-U",
>   

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-06 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-06-18-55-54-109.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
> Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
> "payload 3"),
> changelogRow(
> "-U",
> 2L,
>

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-07-06 Thread longwang0616 (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

longwang0616 updated FLINK-22969:
-
Attachment: image-2021-07-06-18-55-22-235.png

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
> Attachments: image-2021-07-06-18-55-22-235.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
> "payload 3"),
> changelogRow(
> "-U",
> 2L,
> "name 2",
>  

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-06-11 Thread Johannes Moser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johannes Moser updated FLINK-22969:
---
Labels: starter  (was: )

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>  Labels: starter
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
> "payload 3"),
> changelogRow(
> "-U",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123")

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-06-10 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-22969:

Component/s: Table SQL / Ecosystem
 Connectors / Kafka

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Ecosystem
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
> "payload 3"),
> changelogRow(
> "-U",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-06-10 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang updated FLINK-22969:
--
Issue Type: Improvement  (was: Bug)

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shengkai Fang
>Priority: Major
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
> "payload 3"),
> changelogRow(
> "-U",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "pa

[jira] [Updated] (FLINK-22969) Validate the topic is not null or empty string when create kafka source/sink function

2021-06-10 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang updated FLINK-22969:
--
Affects Version/s: 1.14.0

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Shengkai Fang
>Priority: Major
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
> // -- Produce an event time stream into Kafka 
> ---
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the 
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + "  `k_user_id` BIGINT,\n"
> + "  `name` STRING,\n"
> + "  `timestamp` TIMESTAMP(3) METADATA,\n"
> + "  `k_event_id` BIGINT,\n"
> + "  `user_id` INT,\n"
> + "  `payload` STRING,\n"
> + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
> + ") WITH (\n"
> + "  'connector' = 'upsert-kafka',\n"
> + "  'topic' = '%s',\n"
> + "  'properties.bootstrap.servers' = '%s',\n"
> + "  'key.format' = '%s',\n"
> + "  'key.fields-prefix' = 'k_',\n"
> + "  'value.format' = '%s',\n"
> + "  'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // -- Consume stream from Kafka ---
> final List result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
> final List expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
> 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
> 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
> "payload 3"),
> changelogRow(
> "-U",
> 2L,
> "name 2",
> 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
>