This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 363095e07 [FLINK-38244][hotfix] Fix the full snapshot phase, the field
case is adjusted based on isTableIdCaseInsensitive (#4284)
363095e07 is described below
commit 363095e07b77fb7f3353702a0bd686aece6d1a70
Author: Pei Yu <[email protected]>
AuthorDate: Wed Mar 11 19:51:24 2026 +0800
[FLINK-38244][hotfix] Fix the full snapshot phase, the field case is
adjusted based on isTableIdCaseInsensitive (#4284)
---
.../connectors/mysql/source/MySqlDataSource.java | 9 ++-
.../source/reader/MySqlPipelineRecordEmitter.java | 18 ++++-
.../mysql/source/MySqlDataSourceFactoryTest.java | 3 +-
.../mysql/source/MySqlPipelineITCase.java | 8 +-
.../source/MySqlTableIdCaseInsensitveITCase.java | 91 +++++++++++++++-------
.../src/test/resources/ddl/inventory.sql | 22 +++++-
6 files changed, 114 insertions(+), 37 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
index 62a4a4ced..f492ca4f1 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java
@@ -66,7 +66,7 @@ public class MySqlDataSource implements DataSource {
.getBoolean(
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
false);
-
+ boolean isTableIdCaseInsensitive =
MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig);
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
DebeziumChangelogMode.ALL,
@@ -74,7 +74,7 @@ public class MySqlDataSource implements DataSource {
readableMetadataList,
includeComments,
sourceConfig.isTreatTinyInt1AsBoolean(),
-
MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig));
+ isTableIdCaseInsensitive);
MySqlSource<Event> source =
new MySqlSource<>(
@@ -82,7 +82,10 @@ public class MySqlDataSource implements DataSource {
deserializer,
(sourceReaderMetrics, sourceConfig) ->
new MySqlPipelineRecordEmitter(
- deserializer, sourceReaderMetrics,
sourceConfig));
+ deserializer,
+ sourceReaderMetrics,
+ sourceConfig,
+ isTableIdCaseInsensitive));
return FlinkSourceProvider.of(source);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index eb9e658cf..41befa1f3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -55,9 +55,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import static
org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;
import static
org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getTableId;
@@ -80,6 +82,7 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
// Used when startup mode is snapshot
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
private boolean isBounded = false;
+ private final boolean isTableIdCaseInsensitive;
private final DebeziumDeserializationSchema<Event>
debeziumDeserializationSchema;
@@ -88,7 +91,8 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
public MySqlPipelineRecordEmitter(
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
- MySqlSourceConfig sourceConfig) {
+ MySqlSourceConfig sourceConfig,
+ boolean isTableIdCaseInsensitive) {
super(
debeziumDeserializationSchema,
sourceReaderMetrics,
@@ -102,6 +106,7 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
((DebeziumEventDeserializationSchema)
debeziumDeserializationSchema)
.getCreateTableEventCache();
this.isBounded =
StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
+ this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
}
@Override
@@ -261,7 +266,10 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
- String colName = column.name();
+ String colName =
+ this.isTableIdCaseInsensitive
+ ? column.name().toLowerCase(Locale.ROOT)
+ : column.name();
DataType dataType =
MySqlTypeUtils.fromDbzColumn(column,
sourceConfig.isTreatTinyInt1AsBoolean());
if (!column.isOptional()) {
@@ -277,6 +285,12 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
List<String> primaryKey = table.primaryKeyColumnNames();
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
+ if (this.isTableIdCaseInsensitive) {
+ primaryKey =
+ primaryKey.stream()
+ .map(key -> key.toLowerCase(Locale.ROOT))
+ .collect(Collectors.toList());
+ }
tableBuilder.primaryKey(primaryKey);
}
return tableBuilder.build();
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
index 74f3ef52a..75fb6590d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java
@@ -135,7 +135,8 @@ class MySqlDataSourceFactoryTest extends
MySqlSourceTestBase {
Arrays.asList(
inventoryDatabase.getDatabaseName() +
".customers",
inventoryDatabase.getDatabaseName() +
".multi_max_table",
- inventoryDatabase.getDatabaseName() +
".products"));
+ inventoryDatabase.getDatabaseName() +
".products",
+ inventoryDatabase.getDatabaseName() +
".uppercase_products"));
}
@Test
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 2c45c14f4..80388034c 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -292,7 +292,7 @@ class MySqlPipelineITCase extends MySqlSourceTestBase {
+ " (default,\"hammer\",\"14oz
carpenter's hammer\",0.875),\n"
+ " (default,\"hammer\",\"16oz
carpenter's hammer\",1.0),\n"
+ " (default,\"rocks\",\"box of
assorted rocks\",5.3),\n"
- + " (default,\"jacket\",\"water
resistent black wind breaker\",0.1),\n"
+ + " (default,\"jacket\",\"water
resistant black wind breaker\",0.1),\n"
+ " (default,\"spare tire\",\"24
inch spare tire\",22.2);",
StatementUtils.quote(inventoryDatabase.getDatabaseName()),
StatementUtils.quote(sqlInjectionTable)));
@@ -540,8 +540,8 @@ class MySqlPipelineITCase extends MySqlSourceTestBase {
.tableList(databaseName + ".*")
.excludeTableList(
String.format(
- "%s.customers, %s.orders,
%s.multi_max_table",
- databaseName, databaseName,
databaseName))
+ "%s.customers, %s.orders,
%s.multi_max_table, %s.uppercase_products",
+ databaseName, databaseName,
databaseName, databaseName))
.startupOptions(StartupOptions.initial())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
@@ -1835,7 +1835,7 @@ class MySqlPipelineITCase extends MySqlSourceTestBase {
108,
BinaryStringData.fromString("jacket"),
BinaryStringData.fromString(
- "water resistent black wind
breaker"),
+ "water resistant black wind
breaker"),
0.1f
})));
snapshotExpected.add(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java
index a1b9935ae..ac126682b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java
@@ -44,7 +44,8 @@ import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.lifecycle.Startables;
import java.sql.Connection;
@@ -96,8 +97,10 @@ class MySqlTableIdCaseInsensitveITCase extends
MySqlSourceTestBase {
env.setRestartStrategy(RestartStrategies.noRestart());
}
- @Test
- public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws
Exception {
+ @ParameterizedTest
+ @ValueSource(strings = {"products", "uppercase_products"})
+ public void testParseAlterStatementWhenTableNameAndColumnIsUpper(String
tableName)
+ throws Exception {
env.setParallelism(1);
inventoryDatabase.createAndInitialize();
MySqlSourceConfigFactory configFactory =
@@ -107,7 +110,7 @@ class MySqlTableIdCaseInsensitveITCase extends
MySqlSourceTestBase {
.username(TEST_USER)
.password(TEST_PASSWORD)
.databaseList(inventoryDatabase.getDatabaseName())
- .tableList(inventoryDatabase.getDatabaseName() +
"\\.products")
+ .tableList(inventoryDatabase.getDatabaseName() + "\\."
+ tableName)
.startupOptions(StartupOptions.latest())
.serverId(getServerId(env.getParallelism()))
.serverTimeZone("UTC")
@@ -124,17 +127,17 @@ class MySqlTableIdCaseInsensitveITCase extends
MySqlSourceTestBase {
.executeAndCollect();
Thread.sleep(5_000);
- TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(),
"products");
+ TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(),
tableName);
List<Event> expected = new ArrayList<>();
expected.add(getProductsCreateTableEvent(tableId));
try (Connection connection = inventoryDatabase.getJdbcConnection();
Statement statement = connection.createStatement()) {
- expected.addAll(executeAlterAndProvideExpected(tableId,
statement));
+ expected.addAll(executeAlterAndProvideExpected(tableId, statement,
tableName));
statement.execute(
String.format(
- "ALTER TABLE `%s`.`PRODUCTS` ADD `cols1`
VARCHAR(45);",
- inventoryDatabase.getDatabaseName()));
+ "ALTER TABLE `%s`.`%s` ADD `COLS1` VARCHAR(45);",
+ inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AddColumnEvent(
tableId,
@@ -147,6 +150,42 @@ class MySqlTableIdCaseInsensitveITCase extends
MySqlSourceTestBase {
assertThat(actual).isEqualTo(expected);
}
+ @ParameterizedTest
+ @ValueSource(strings = {"products", "uppercase_products"})
+ public void testSnapshotModeWhenTableNameAndColumnIsUpper(String
tableName) throws Exception {
+ env.setParallelism(1);
+ inventoryDatabase.createAndInitialize();
+ MySqlSourceConfigFactory configFactory =
+ new MySqlSourceConfigFactory()
+ .hostname(MYSQL8_CONTAINER.getHost())
+ .port(MYSQL8_CONTAINER.getDatabasePort())
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(inventoryDatabase.getDatabaseName())
+ .tableList(inventoryDatabase.getDatabaseName() + "\\."
+ tableName)
+ .startupOptions(StartupOptions.snapshot())
+ .serverId(getServerId(env.getParallelism()))
+ .serverTimeZone("UTC")
+
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider) new
MySqlDataSource(configFactory).getEventSourceProvider();
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ MySqlDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+ Thread.sleep(5_000);
+
+ TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(),
tableName);
+ List<Event> expected = new ArrayList<>();
+ expected.add(getProductsCreateTableEvent(tableId));
+ List<Event> actual = fetchResults(events, expected.size());
+ assertThat(actual).isEqualTo(expected);
+ }
+
private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
return new CreateTableEvent(
tableId,
@@ -172,13 +211,13 @@ class MySqlTableIdCaseInsensitveITCase extends
MySqlSourceTestBase {
* );
* </pre>
*/
- private List<Event> executeAlterAndProvideExpected(TableId tableId,
Statement statement)
- throws SQLException {
+ private List<Event> executeAlterAndProvideExpected(
+ TableId tableId, Statement statement, String tableName) throws
SQLException {
List<Event> expected = new ArrayList<>();
statement.execute(
String.format(
- "ALTER TABLE `%s`.`products` CHANGE COLUMN
`DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;",
- inventoryDatabase.getDatabaseName()));
+ "ALTER TABLE `%s`.`%s` CHANGE COLUMN `DESCRIPTION`
`DESC` VARCHAR(255) NULL DEFAULT NULL;",
+ inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("description",
DataTypes.VARCHAR(255))));
@@ -187,8 +226,8 @@ class MySqlTableIdCaseInsensitveITCase extends
MySqlSourceTestBase {
statement.execute(
String.format(
- "ALTER TABLE `%s`.`products` CHANGE COLUMN `desc`
`desc2` VARCHAR(400) NULL DEFAULT NULL;",
- inventoryDatabase.getDatabaseName()));
+ "ALTER TABLE `%s`.`%s` CHANGE COLUMN `desc` `desc2`
VARCHAR(400) NULL DEFAULT NULL;",
+ inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("desc",
DataTypes.VARCHAR(400))));
@@ -196,8 +235,8 @@ class MySqlTableIdCaseInsensitveITCase extends
MySqlSourceTestBase {
statement.execute(
String.format(
- "ALTER TABLE `%s`.`products` ADD COLUMN `DESC1`
VARCHAR(45) NULL AFTER `weight`;",
- inventoryDatabase.getDatabaseName()));
+ "ALTER TABLE `%s`.`%s` ADD COLUMN `DESC1` VARCHAR(45)
NULL AFTER `weight`;",
+ inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AddColumnEvent(
tableId,
@@ -209,8 +248,8 @@ class MySqlTableIdCaseInsensitveITCase extends
MySqlSourceTestBase {
statement.execute(
String.format(
- "ALTER TABLE `%s`.`products` ADD COLUMN `col1`
VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER
`desc1`;",
- inventoryDatabase.getDatabaseName()));
+ "ALTER TABLE `%s`.`%s` ADD COLUMN `col1` VARCHAR(45)
NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;",
+ inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AddColumnEvent(
tableId,
@@ -230,8 +269,8 @@ class MySqlTableIdCaseInsensitveITCase extends
MySqlSourceTestBase {
statement.execute(
String.format(
- "ALTER TABLE `%s`.`products` DROP COLUMN `desc2`,
CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;",
- inventoryDatabase.getDatabaseName()));
+ "ALTER TABLE `%s`.`%s` DROP COLUMN `desc2`, CHANGE
COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;",
+ inventoryDatabase.getDatabaseName(), tableName));
expected.add(new DropColumnEvent(tableId,
Collections.singletonList("desc2")));
expected.add(
new AlterColumnTypeEvent(
@@ -240,22 +279,22 @@ class MySqlTableIdCaseInsensitveITCase extends
MySqlSourceTestBase {
// Only available in mysql 8.0
statement.execute(
String.format(
- "ALTER TABLE `%s`.`products` RENAME COLUMN `desc1` TO
`desc3`;",
- inventoryDatabase.getDatabaseName()));
+ "ALTER TABLE `%s`.`%s` RENAME COLUMN `desc1` TO
`desc3`;",
+ inventoryDatabase.getDatabaseName(), tableName));
expected.add(new RenameColumnEvent(tableId,
Collections.singletonMap("desc1", "desc3")));
statement.execute(
String.format(
- "ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3`
VARCHAR(255) NULL DEFAULT NULL;",
- inventoryDatabase.getDatabaseName()));
+ "ALTER TABLE `%s`.`%s` MODIFY COLUMN `DESC3`
VARCHAR(255) NULL DEFAULT NULL;",
+ inventoryDatabase.getDatabaseName(), tableName));
expected.add(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("desc3",
DataTypes.VARCHAR(255))));
statement.execute(
String.format(
- "ALTER TABLE `%s`.`products` DROP COLUMN `desc3`;",
- inventoryDatabase.getDatabaseName()));
+ "ALTER TABLE `%s`.`%s` DROP COLUMN `desc3`;",
+ inventoryDatabase.getDatabaseName(), tableName));
expected.add(new DropColumnEvent(tableId,
Collections.singletonList("desc3")));
// Should not catch SchemaChangeEvent of tables other than `products`
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
index 626107f5c..e9386693f 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql
@@ -34,7 +34,27 @@ VALUES (default,"scooter","Small 2-wheel scooter",3.14),
(default,"hammer","14oz carpenter's hammer",0.875),
(default,"hammer","16oz carpenter's hammer",1.0),
(default,"rocks","box of assorted rocks",5.3),
- (default,"jacket","water resistent black wind breaker",0.1),
+ (default,"jacket","water resistant black wind breaker",0.1),
+ (default,"spare tire","24 inch spare tire",22.2);
+
+-- Create a table where all fields are in uppercase.
+CREATE TABLE uppercase_products (
+ ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ NAME VARCHAR(255) NOT NULL DEFAULT 'flink',
+ DESCRIPTION VARCHAR(512),
+ WEIGHT FLOAT(6)
+);
+ALTER TABLE uppercase_products AUTO_INCREMENT = 101;
+
+INSERT INTO uppercase_products
+VALUES (default,"scooter","Small 2-wheel scooter",3.14),
+ (default,"car battery","12V car battery",8.1),
+ (default,"12-pack drill bits","12-pack of drill bits with sizes ranging
from #40 to #3",0.8),
+ (default,"hammer","12oz carpenter's hammer",0.75),
+ (default,"hammer","14oz carpenter's hammer",0.875),
+ (default,"hammer","16oz carpenter's hammer",1.0),
+ (default,"rocks","box of assorted rocks",5.3),
+ (default,"jacket","water resistant black wind breaker",0.1),
(default,"spare tire","24 inch spare tire",22.2);
-- Create some customers ...