This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch feature/migration-documentation-consolidation in repository https://gitbox.apache.org/repos/asf/doris-website.git
commit a4131a90726a86b99156b62fb2c8007e01d3e3dd Author: Yongqiang YANG <[email protected]> AuthorDate: Thu Feb 5 15:40:12 2026 -0800 Add comprehensive migration documentation section Create consolidated migration guides covering: - Overview page with migration path comparison table - PostgreSQL to Doris (JDBC Catalog, Flink CDC, Export/Import) - MySQL to Doris (Flink CDC, JDBC Catalog, DataX) - Elasticsearch to Doris (ES Catalog, inverted index migration) - Other OLAP systems (ClickHouse, Greenplum, Hive/Iceberg/Hudi) Each guide includes data type mappings, step-by-step instructions, and troubleshooting for common issues. Chinese translations included. Co-Authored-By: Claude Opus 4.5 <[email protected]> --- docs/migration/elasticsearch-to-doris.md | 450 ++++++++++++++++++++ docs/migration/mysql-to-doris.md | 408 ++++++++++++++++++ docs/migration/other-olap-to-doris.md | 457 +++++++++++++++++++++ docs/migration/overview.md | 150 +++++++ docs/migration/postgresql-to-doris.md | 433 +++++++++++++++++++ .../current/migration/elasticsearch-to-doris.md | 352 ++++++++++++++++ .../current/migration/mysql-to-doris.md | 364 ++++++++++++++++ .../current/migration/other-olap-to-doris.md | 441 ++++++++++++++++++++ .../current/migration/overview.md | 150 +++++++ .../current/migration/postgresql-to-doris.md | 421 +++++++++++++++++++ sidebars.ts | 12 + 11 files changed, 3638 insertions(+) diff --git a/docs/migration/elasticsearch-to-doris.md b/docs/migration/elasticsearch-to-doris.md new file mode 100644 index 00000000000..aac32175ed8 --- /dev/null +++ b/docs/migration/elasticsearch-to-doris.md @@ -0,0 +1,450 @@ +--- +{ + "title": "Elasticsearch to Doris", + "language": "en", + "description": "Comprehensive guide to migrating data from Elasticsearch to Apache Doris" +} +--- + +This guide covers migrating data from Elasticsearch to Apache Doris. Doris can serve as a powerful alternative to Elasticsearch for log analytics, full-text search, and general OLAP workloads, often with better performance and lower operational complexity. + +## Why Migrate from Elasticsearch to Doris? + +| Aspect | Elasticsearch | Apache Doris | +|--------|---------------|--------------| +| Query Language | DSL (JSON-based) | Standard SQL | +| JOINs | Limited | Full SQL JOINs | +| Storage Efficiency | Higher storage usage | Columnar compression | +| Operational Complexity | Complex cluster management | Simpler operations | +| Full-text Search | Native inverted index | Inverted index support | +| Real-time Analytics | Good | Excellent | + +## Considerations + +1. **Full-text Search**: Doris supports [Inverted Index](../table-design/index/inverted-index/overview.md) for full-text search capabilities similar to Elasticsearch. + +2. **Index to Table Mapping**: Each Elasticsearch index typically maps to a Doris table. + +3. **Nested Documents**: Elasticsearch nested/object types map to Doris JSON type. + +4. **Array Handling**: Elasticsearch arrays require explicit configuration in Doris. + +## Data Type Mapping + +| Elasticsearch Type | Doris Type | Notes | +|--------------------|------------|-------| +| null | NULL | | +| boolean | BOOLEAN | | +| byte | TINYINT | | +| short | SMALLINT | | +| integer | INT | | +| long | BIGINT | | +| unsigned_long | LARGEINT | | +| float | FLOAT | | +| half_float | FLOAT | | +| double | DOUBLE | | +| scaled_float | DOUBLE | | +| keyword | STRING | | +| text | STRING | Consider inverted index in Doris | +| date | DATE or DATETIME | See [Date Handling](#handling-date-types) | +| ip | STRING | | +| nested | JSON | | +| object | JSON | | +| flattened | JSON | Supported since Doris 3.1.4, 4.0.3 | +| geo_point | STRING | Store as "lat,lon" string | +| geo_shape | STRING | Store as GeoJSON string | + +## Migration Options + +### Option 1: ES Catalog (Recommended) + +The ES Catalog provides direct access to Elasticsearch data from Doris, enabling both querying and migration. + +#### Prerequisites + +- Elasticsearch 5.x or higher +- Network connectivity between Doris FE/BE nodes and Elasticsearch + +#### Step 1: Create ES Catalog + +```sql +CREATE CATALOG es_catalog PROPERTIES ( + 'type' = 'es', + 'hosts' = 'http://es-node1:9200,http://es-node2:9200', + 'user' = 'elastic', + 'password' = 'password' +); +``` + +With additional options: + +```sql +CREATE CATALOG es_catalog PROPERTIES ( + 'type' = 'es', + 'hosts' = 'http://es-node1:9200', + 'user' = 'elastic', + 'password' = 'password', + 'doc_value_scan' = 'true', + 'keyword_sniff' = 'true', + 'nodes_discovery' = 'true', + 'ssl' = 'false', + 'mapping_es_id' = 'true' +); +``` + +#### Step 2: Explore Elasticsearch Data + +```sql +-- Switch to ES catalog +SWITCH es_catalog; + +-- ES creates a default_db database +USE default_db; + +-- List indices as tables +SHOW TABLES; + +-- Preview data +SELECT * FROM logs_index LIMIT 10; + +-- Check field mappings +DESC logs_index; +``` + +#### Step 3: Design Doris Table + +Based on your Elasticsearch index, design an appropriate Doris table: + +```sql +-- Example: Log data table +SWITCH internal; + +CREATE TABLE logs ( + `@timestamp` DATETIME NOT NULL, + log_id VARCHAR(64), + level VARCHAR(16), + message TEXT, + host VARCHAR(128), + service VARCHAR(64), + trace_id VARCHAR(64), + INDEX idx_message (message) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true"), + INDEX idx_level (level) USING INVERTED, + INDEX idx_service (service) USING INVERTED +) +DUPLICATE KEY(`@timestamp`, log_id) +PARTITION BY RANGE(`@timestamp`) () +DISTRIBUTED BY HASH(log_id) BUCKETS 16 +PROPERTIES ( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.start" = "-30", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "replication_num" = "3" +); +``` + +`★ Insight ─────────────────────────────────────` +1. **DUPLICATE KEY model** is best for log data where append-only writes are common +2. **Inverted indexes** enable full-text search similar to Elasticsearch +3. **Dynamic partitioning** automatically manages time-based data lifecycle +`─────────────────────────────────────────────────` + +#### Step 4: Migrate Data + +```sql +-- Basic migration +INSERT INTO internal.analytics_db.logs +SELECT + `@timestamp`, + _id as log_id, + level, + message, + host, + service, + trace_id +FROM es_catalog.default_db.logs_index; +``` + +For large indices, migrate by time range: + +```sql +-- Migrate by day +INSERT INTO internal.analytics_db.logs +SELECT * FROM es_catalog.default_db.logs_index +WHERE `@timestamp` >= '2024-01-01' AND `@timestamp` < '2024-01-02'; +``` + +#### Step 5: Configure Array Fields + +Elasticsearch doesn't have explicit array types. To read arrays correctly, configure the ES index mapping: + +```bash +# Add array field metadata to ES index +curl -X PUT "localhost:9200/logs_index/_mapping" -H 'Content-Type: application/json' -d '{ + "_meta": { + "doris": { + "array_fields": ["tags", "ip_addresses"] + } + } +}' +``` + +Then in Doris: + +```sql +-- Array fields will be correctly recognized +SELECT tags, ip_addresses FROM es_catalog.default_db.logs_index LIMIT 5; +``` + +### Option 2: Logstash Pipeline + +Use Logstash to read from Elasticsearch and write to Doris via HTTP. + +#### Logstash Configuration + +```ruby +input { + elasticsearch { + hosts => ["http://es-node:9200"] + index => "logs-*" + query => '{ "query": { "range": { "@timestamp": { "gte": "now-7d" } } } }' + size => 1000 + scroll => "5m" + docinfo => true + } +} + +filter { + mutate { + rename => { "[@metadata][_id]" => "doc_id" } + } + date { + match => ["@timestamp", "ISO8601"] + target => "@timestamp" + } +} + +output { + http { + url => "http://doris-fe:8030/api/analytics_db/logs/_stream_load" + http_method => "put" + headers => { + "Authorization" => "Basic cm9vdDo=" + "Expect" => "100-continue" + "format" => "json" + "strip_outer_array" => "true" + } + format => "json_batch" + pool_max => 10 + } +} +``` + +### Option 3: Custom Script with Scroll API + +For more control, use a custom script with Elasticsearch Scroll API: + +```python +from elasticsearch import Elasticsearch +import requests +import json + +es = Elasticsearch(['http://es-node:9200']) +doris_url = 'http://doris-fe:8030/api/db/table/_stream_load' + +# Scroll through Elasticsearch +resp = es.search( + index='logs-*', + scroll='5m', + size=10000, + body={'query': {'match_all': {}}} +) + +scroll_id = resp['_scroll_id'] +hits = resp['hits']['hits'] + +while hits: + # Transform and load to Doris + docs = [hit['_source'] for hit in hits] + + response = requests.put( + doris_url, + headers={ + 'Content-Type': 'application/json', + 'Authorization': 'Basic cm9vdDo=', + 'format': 'json', + 'strip_outer_array': 'true' + }, + data=json.dumps(docs) + ) + + # Continue scrolling + resp = es.scroll(scroll_id=scroll_id, scroll='5m') + scroll_id = resp['_scroll_id'] + hits = resp['hits']['hits'] +``` + +## Migrating Full-text Search + +Doris's inverted index provides full-text search capabilities similar to Elasticsearch. + +### Creating Inverted Indexes + +```sql +-- Create table with inverted index for full-text search +CREATE TABLE articles ( + id BIGINT, + title VARCHAR(256), + content TEXT, + author VARCHAR(64), + published_at DATETIME, + tags ARRAY<STRING>, + INDEX idx_title (title) USING INVERTED PROPERTIES("parser" = "unicode"), + INDEX idx_content (content) USING INVERTED PROPERTIES( + "parser" = "unicode", + "support_phrase" = "true" + ), + INDEX idx_tags (tags) USING INVERTED +) +DUPLICATE KEY(id) +DISTRIBUTED BY HASH(id) BUCKETS 8; +``` + +### Full-text Search Queries + +```sql +-- Match query (similar to ES match) +SELECT * FROM articles +WHERE content MATCH 'apache doris'; + +-- Phrase match (similar to ES match_phrase) +SELECT * FROM articles +WHERE content MATCH_PHRASE 'real-time analytics'; + +-- Multiple conditions +SELECT * FROM articles +WHERE title MATCH 'database' + AND content MATCH 'performance' + AND published_at > '2024-01-01'; +``` + +### DSL to SQL Conversion Examples + +| Elasticsearch DSL | Doris SQL | +|-------------------|-----------| +| `{"match": {"title": "doris"}}` | `WHERE title MATCH 'doris'` | +| `{"match_phrase": {"content": "real time"}}` | `WHERE content MATCH_PHRASE 'real time'` | +| `{"term": {"status": "active"}}` | `WHERE status = 'active'` | +| `{"terms": {"tag": ["a", "b"]}}` | `WHERE tag IN ('a', 'b')` | +| `{"range": {"price": {"gte": 10}}}` | `WHERE price >= 10` | +| `{"bool": {"must": [...]}}` | `WHERE ... AND ...` | +| `{"bool": {"should": [...]}}` | `WHERE ... OR ...` | +| `{"exists": {"field": "email"}}` | `WHERE email IS NOT NULL` | + +## Handling Common Issues + +### Handling Date Types + +Elasticsearch dates can have multiple formats. Ensure consistent handling: + +```sql +-- Doris table with datetime +CREATE TABLE events ( + event_id VARCHAR(64), + event_time DATETIME, + event_data JSON +) +DUPLICATE KEY(event_id) +DISTRIBUTED BY HASH(event_id) BUCKETS 8; + +-- Migration with date conversion +INSERT INTO events +SELECT + _id, + CAST(`@timestamp` AS DATETIME), + event_data +FROM es_catalog.default_db.events_index; +``` + +### Handling Nested Documents + +Elasticsearch nested objects map to Doris JSON: + +```sql +-- ES document with nested data +-- { "user": { "name": "John", "address": { "city": "NYC" } } } + +-- Doris table +CREATE TABLE users ( + id VARCHAR(64), + user_data JSON +) +DISTRIBUTED BY HASH(id) BUCKETS 8; + +-- Query nested data in Doris +SELECT + id, + JSON_EXTRACT(user_data, '$.name') as name, + JSON_EXTRACT(user_data, '$.address.city') as city +FROM users; +``` + +### Handling _id Field + +To preserve Elasticsearch `_id`: + +```sql +-- Enable _id mapping in catalog +CREATE CATALOG es_catalog PROPERTIES ( + 'type' = 'es', + 'hosts' = 'http://es-node:9200', + 'mapping_es_id' = 'true' +); + +-- Query with _id +SELECT _id, * FROM es_catalog.default_db.index_name LIMIT 10; +``` + +### Performance Optimization + +For better ES Catalog read performance: + +```sql +-- Enable columnar scan (doc_value) +CREATE CATALOG es_catalog PROPERTIES ( + 'type' = 'es', + 'hosts' = 'http://es-node:9200', + 'doc_value_scan' = 'true' +); +``` + +Note: `text` fields don't support doc_value, so they'll fall back to `_source`. + +## Validation + +After migration, validate: + +```sql +-- Compare document counts +SELECT COUNT(*) FROM es_catalog.default_db.logs_index; +SELECT COUNT(*) FROM internal.analytics_db.logs; + +-- Verify full-text search works +SELECT COUNT(*) FROM internal.analytics_db.logs +WHERE message MATCH 'error'; + +-- Compare against ES catalog query +SELECT COUNT(*) FROM es_catalog.default_db.logs_index +WHERE message = 'error'; + +-- Spot check specific documents +SELECT * FROM internal.analytics_db.logs +WHERE log_id = 'specific-doc-id'; +``` + +## Next Steps + +- [Inverted Index](../table-design/index/inverted-index/overview.md) - Full-text search in Doris +- [ES Catalog](../lakehouse/catalogs/es-catalog.md) - Complete ES Catalog reference +- [Log Storage Analysis](../log-storage-analysis.md) - Optimizing log analytics in Doris diff --git a/docs/migration/mysql-to-doris.md b/docs/migration/mysql-to-doris.md new file mode 100644 index 00000000000..3eb099d1841 --- /dev/null +++ b/docs/migration/mysql-to-doris.md @@ -0,0 +1,408 @@ +--- +{ + "title": "MySQL to Doris", + "language": "en", + "description": "Comprehensive guide to migrating data from MySQL to Apache Doris" +} +--- + +This guide covers migrating data from MySQL to Apache Doris. MySQL is one of the most common migration sources, and Doris provides excellent compatibility with MySQL protocol, making migration straightforward. + +## Considerations + +1. **Protocol Compatibility**: Doris is MySQL protocol compatible, so existing MySQL clients and tools work with Doris. + +2. **Real-time Requirements**: If you need real-time synchronization, Flink CDC is the recommended approach with support for automatic table creation and schema changes. + +3. **Full Database Sync**: The Flink Doris Connector supports synchronizing entire MySQL databases including DDL operations. + +## Data Type Mapping + +| MySQL Type | Doris Type | Notes | +|------------|------------|-------| +| BOOLEAN / TINYINT(1) | BOOLEAN | | +| TINYINT | TINYINT | | +| SMALLINT | SMALLINT | | +| MEDIUMINT | INT | | +| INT / INTEGER | INT | | +| BIGINT | BIGINT | | +| FLOAT | FLOAT | | +| DOUBLE | DOUBLE | | +| DECIMAL(P, S) | DECIMAL(P, S) | | +| DATE | DATE | | +| DATETIME | DATETIME | | +| TIMESTAMP | DATETIME | Stored as UTC, converted on read | +| TIME | STRING | Doris does not support TIME type | +| YEAR | INT | | +| CHAR(N) | CHAR(N) | | +| VARCHAR(N) | VARCHAR(N) | | +| TEXT / MEDIUMTEXT / LONGTEXT | STRING | | +| BINARY / VARBINARY | STRING | | +| BLOB / MEDIUMBLOB / LONGBLOB | STRING | | +| JSON | JSON | | +| ENUM | STRING | | +| SET | STRING | | +| BIT | BOOLEAN / BIGINT | BIT(1) maps to BOOLEAN | + +## Migration Options + +### Option 1: Flink CDC (Recommended for Real-time Sync) + +Flink CDC captures MySQL binlog changes and streams them to Doris. This is the recommended method for: + +- Real-time data synchronization +- Full database migration with automatic table creation +- Continuous sync with schema evolution support + +#### Prerequisites + +- MySQL 5.7+ or 8.0+ with binlog enabled +- Flink 1.15+ with Flink CDC 3.x and Flink Doris Connector + +#### Step 1: Configure MySQL Binlog + +Ensure these settings in MySQL: + +```ini +[mysqld] +server-id = 1 +log_bin = mysql-bin +binlog_format = ROW +binlog_row_image = FULL +expire_logs_days = 7 +``` + +Create a user for CDC: + +```sql +CREATE USER 'flink_cdc'@'%' IDENTIFIED BY 'password'; +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'%'; +FLUSH PRIVILEGES; +``` + +#### Step 2: Single Table Sync with Flink SQL + +```sql +-- Source: MySQL CDC +CREATE TABLE mysql_orders ( + order_id INT, + customer_id INT, + order_date DATE, + total_amount DECIMAL(10, 2), + status STRING, + created_at TIMESTAMP(3), + PRIMARY KEY (order_id) NOT ENFORCED +) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'mysql-host', + 'port' = '3306', + 'username' = 'flink_cdc', + 'password' = 'password', + 'database-name' = 'source_db', + 'table-name' = 'orders', + 'server-time-zone' = 'UTC' +); + +-- Sink: Doris +CREATE TABLE doris_orders ( + order_id INT, + customer_id INT, + order_date DATE, + total_amount DECIMAL(10, 2), + status STRING, + created_at DATETIME +) WITH ( + 'connector' = 'doris', + 'fenodes' = 'doris-fe:8030', + 'table.identifier' = 'target_db.orders', + 'username' = 'doris_user', + 'password' = 'doris_password', + 'sink.enable-2pc' = 'true', + 'sink.label-prefix' = 'mysql_orders_sync' +); + +-- Start synchronization +INSERT INTO doris_orders SELECT * FROM mysql_orders; +``` + +#### Step 3: Full Database Sync with Flink Doris Connector + +The Flink Doris Connector provides a powerful whole-database sync feature: + +```shell +<FLINK_HOME>/bin/flink run \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + flink-doris-connector-1.18-25.1.0.jar \ + mysql-sync-database \ + --database source_db \ + --mysql-conf hostname=mysql-host \ + --mysql-conf port=3306 \ + --mysql-conf username=flink_cdc \ + --mysql-conf password=password \ + --mysql-conf database-name=source_db \ + --doris-conf fenodes=doris-fe:8030 \ + --doris-conf username=doris_user \ + --doris-conf password=doris_password \ + --doris-conf jdbc-url=jdbc:mysql://doris-fe:9030 \ + --table-conf replication_num=3 \ + --including-tables "orders|customers|products" +``` + +Key options: + +| Parameter | Description | +|-----------|-------------| +| `--including-tables` | Regex pattern for tables to include | +| `--excluding-tables` | Regex pattern for tables to exclude | +| `--multi-to-one-origin` | Map multiple source tables to one target | +| `--create-table-only` | Only create tables without syncing data | + +### Option 2: JDBC Catalog + +The JDBC Catalog allows direct querying and batch migration from MySQL. + +#### Step 1: Download MySQL JDBC Driver + +```bash +wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.33/mysql-connector-java-8.0.33.jar +cp mysql-connector-java-8.0.33.jar $DORIS_HOME/fe/jdbc_drivers/ +cp mysql-connector-java-8.0.33.jar $DORIS_HOME/be/jdbc_drivers/ +``` + +#### Step 2: Create MySQL Catalog + +```sql +CREATE CATALOG mysql_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'mysql_user', + 'password' = 'mysql_password', + 'jdbc_url' = 'jdbc:mysql://mysql-host:3306/source_db', + 'driver_url' = 'mysql-connector-java-8.0.33.jar', + 'driver_class' = 'com.mysql.cj.jdbc.Driver' +); +``` + +#### Step 3: Query and Migrate + +```sql +-- Explore source data +SWITCH mysql_catalog; +SHOW DATABASES; +USE source_db; +SHOW TABLES; +SELECT * FROM orders LIMIT 10; + +-- Create target table in Doris +SWITCH internal; +CREATE TABLE target_db.orders ( + order_id INT, + customer_id INT, + order_date DATE NOT NULL, + total_amount DECIMAL(10, 2), + status VARCHAR(32) +) +UNIQUE KEY(order_id, order_date) +PARTITION BY RANGE(order_date) () +DISTRIBUTED BY HASH(order_id) BUCKETS 16 +PROPERTIES ( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "replication_num" = "3" +); + +-- Migrate data +INSERT INTO internal.target_db.orders +SELECT order_id, customer_id, order_date, total_amount, status +FROM mysql_catalog.source_db.orders; +``` + +### Option 3: DataX + +[DataX](https://github.com/alibaba/DataX) is a widely-used data synchronization tool that supports MySQL to Doris migration. + +#### DataX Job Configuration + +```json +{ + "job": { + "setting": { + "speed": { + "channel": 4 + } + }, + "content": [{ + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "mysql_user", + "password": "mysql_password", + "connection": [{ + "querySql": ["SELECT order_id, customer_id, order_date, total_amount, status FROM orders"], + "jdbcUrl": ["jdbc:mysql://mysql-host:3306/source_db"] + }] + } + }, + "writer": { + "name": "doriswriter", + "parameter": { + "feLoadUrl": ["doris-fe:8030"], + "jdbcUrl": "jdbc:mysql://doris-fe:9030/", + "database": "target_db", + "table": "orders", + "username": "doris_user", + "password": "doris_password", + "loadProps": { + "format": "json", + "strip_outer_array": true + } + } + } + }] + } +} +``` + +Run the job: + +```bash +python datax.py mysql_to_doris.json +``` + +## Handling Common Issues + +### Auto Increment Columns + +MySQL AUTO_INCREMENT columns should map to Doris's auto-increment feature: + +```sql +-- Doris table with auto increment +CREATE TABLE users ( + user_id BIGINT AUTO_INCREMENT, + username VARCHAR(64), + email VARCHAR(128) +) +UNIQUE KEY(user_id) +DISTRIBUTED BY HASH(user_id) BUCKETS 8; +``` + +For migration, you may want to preserve original IDs: + +```sql +-- Disable auto increment during migration +INSERT INTO users (user_id, username, email) +SELECT user_id, username, email +FROM mysql_catalog.source_db.users; +``` + +### Handling ENUM and SET Types + +MySQL ENUM and SET types are migrated as STRING in Doris: + +```sql +-- MySQL source +CREATE TABLE products ( + id INT, + status ENUM('active', 'inactive', 'pending'), + tags SET('featured', 'sale', 'new') +); + +-- Doris target +CREATE TABLE products ( + id INT, + status VARCHAR(32), + tags VARCHAR(128) +) +DISTRIBUTED BY HASH(id) BUCKETS 8; +``` + +### Handling Binary Data + +Binary data (BLOB, BINARY) is typically stored as base64-encoded STRING: + +```sql +-- Use HEX encoding for binary data +INSERT INTO doris_table +SELECT + id, + HEX(binary_col) as binary_hex +FROM mysql_catalog.source_db.table_with_binary; +``` + +### Large Table Migration Performance + +For tables with billions of rows: + +1. **Increase Flink parallelism**: +```sql +SET 'parallelism.default' = '8'; +``` + +2. **Tune Doris write buffer**: +```sql +-- In Flink sink configuration +'sink.buffer-size' = '1048576', +'sink.buffer-count' = '3' +``` + +3. **Use batch mode for initial load**: +```sql +-- Flink sink batch configuration +'sink.enable-2pc' = 'false', +'sink.properties.format' = 'json' +``` + +## Multi-Tenant Migration + +For MySQL instances with multiple databases: + +```shell +# Sync multiple databases +<FLINK_HOME>/bin/flink run \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + flink-doris-connector.jar \ + mysql-sync-database \ + --database "db1|db2|db3" \ + --mysql-conf hostname=mysql-host \ + --mysql-conf database-name="db1|db2|db3" \ + --doris-conf fenodes=doris-fe:8030 \ + --including-tables ".*" +``` + +## Validation + +After migration, validate data integrity: + +```sql +-- Row count comparison +SELECT + 'mysql' as source, + COUNT(*) as cnt +FROM mysql_catalog.source_db.orders +UNION ALL +SELECT + 'doris' as source, + COUNT(*) as cnt +FROM internal.target_db.orders; + +-- Checksum validation (sample) +SELECT + SUM(order_id) as id_sum, + SUM(total_amount) as amount_sum, + COUNT(DISTINCT customer_id) as unique_customers +FROM internal.target_db.orders; + +-- Compare with MySQL +SELECT + SUM(order_id) as id_sum, + SUM(total_amount) as amount_sum, + COUNT(DISTINCT customer_id) as unique_customers +FROM mysql_catalog.source_db.orders; +``` + +## Next Steps + +- [Flink Doris Connector](../ecosystem/flink-doris-connector.md) - Detailed connector documentation +- [Loading Data](../data-operate/import/load-manual.md) - Alternative import methods +- [Data Model](../table-design/data-model/overview.md) - Choose the right table model diff --git a/docs/migration/other-olap-to-doris.md b/docs/migration/other-olap-to-doris.md new file mode 100644 index 00000000000..20b3fbd96c3 --- /dev/null +++ b/docs/migration/other-olap-to-doris.md @@ -0,0 +1,457 @@ +--- +{ + "title": "Other OLAP Systems to Doris", + "language": "en", + "description": "Guide to migrating data from ClickHouse, Greenplum, Hive, Iceberg, Hudi and other OLAP systems to Apache Doris" +} +--- + +This guide covers migrating data from various OLAP systems to Apache Doris, including ClickHouse, Greenplum, and data lake technologies like Hive, Iceberg, and Hudi. + +## Migration Methods Overview + +| Source System | Recommended Method | Notes | +|---------------|-------------------|-------| +| ClickHouse | JDBC Catalog + SQL Convertor | Schema and SQL syntax conversion needed | +| Greenplum | JDBC Catalog | PostgreSQL-compatible | +| Hive | Multi-Catalog (Hive Catalog) | Direct metadata integration | +| Iceberg | Multi-Catalog (Iceberg Catalog) | Table format native support | +| Hudi | Multi-Catalog (Hudi Catalog) | Table format native support | +| Spark/Flink Tables | Spark/Flink Doris Connector | Batch or streaming | + +## ClickHouse + +ClickHouse and Doris are both columnar OLAP databases with some similarities but different SQL dialects and data types. + +### Data Type Mapping + +| ClickHouse Type | Doris Type | Notes | +|-----------------|------------|-------| +| Int8 | TINYINT | | +| Int16 | SMALLINT | | +| Int32 | INT | | +| Int64 | BIGINT | | +| UInt8 | SMALLINT | Unsigned to signed | +| UInt16 | INT | | +| UInt32 | BIGINT | | +| UInt64 | LARGEINT | | +| Float32 | FLOAT | | +| Float64 | DOUBLE | | +| Decimal(P, S) | DECIMAL(P, S) | | +| String | STRING | | +| FixedString(N) | CHAR(N) | | +| Date | DATE | | +| DateTime | DATETIME | | +| DateTime64 | DATETIME(precision) | | +| UUID | VARCHAR(36) | | +| Array(T) | ARRAY<T> | | +| Tuple | STRUCT | | +| Map(K, V) | MAP<K, V> | | +| Nullable(T) | T (nullable) | | +| LowCardinality(T) | T | No special handling needed | +| Enum8/Enum16 | TINYINT/SMALLINT or STRING | | + +### Migration with JDBC Catalog + +#### Step 1: Set Up ClickHouse JDBC Driver + +```bash +# Download ClickHouse JDBC driver +wget https://github.com/ClickHouse/clickhouse-java/releases/download/v0.6.0/clickhouse-jdbc-0.6.0-all.jar + +# Deploy to Doris +cp clickhouse-jdbc-0.6.0-all.jar $DORIS_HOME/fe/jdbc_drivers/ +cp clickhouse-jdbc-0.6.0-all.jar $DORIS_HOME/be/jdbc_drivers/ +``` + +#### Step 2: Create ClickHouse Catalog + +```sql +CREATE CATALOG clickhouse_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'default', + 'password' = 'password', + 'jdbc_url' = 'jdbc:clickhouse://clickhouse-host:8123/default', + 'driver_url' = 'clickhouse-jdbc-0.6.0-all.jar', + 'driver_class' = 'com.clickhouse.jdbc.ClickHouseDriver' +); +``` + +#### Step 3: Explore and Migrate + +```sql +-- Explore ClickHouse data +SWITCH clickhouse_catalog; +SHOW DATABASES; +USE default; +SHOW TABLES; + +-- Preview table +SELECT * FROM events LIMIT 10; + +-- Create Doris table +SWITCH internal; +CREATE TABLE analytics.events ( + event_id BIGINT, + event_time DATETIME, + user_id BIGINT, + event_type VARCHAR(64), + properties JSON +) +DUPLICATE KEY(event_id) +PARTITION BY RANGE(event_time) () +DISTRIBUTED BY HASH(event_id) BUCKETS 16 +PROPERTIES ( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "replication_num" = "3" +); + +-- Migrate data +INSERT INTO internal.analytics.events +SELECT + event_id, + event_time, + user_id, + event_type, + properties +FROM clickhouse_catalog.default.events; +``` + +### SQL Syntax Conversion + +Common ClickHouse to Doris SQL conversions: + +| ClickHouse | Doris | +|------------|-------| +| `toDate(datetime)` | `DATE(datetime)` | +| `toDateTime(string)` | `CAST(string AS DATETIME)` | +| `formatDateTime(dt, '%Y-%m')` | `DATE_FORMAT(dt, '%Y-%m')` | +| `arrayJoin(arr)` | `EXPLODE(arr)` with LATERAL VIEW | +| `groupArray(col)` | `COLLECT_LIST(col)` | +| `argMax(col1, col2)` | `MAX_BY(col1, col2)` | +| `argMin(col1, col2)` | `MIN_BY(col1, col2)` | +| `uniq(col)` | `APPROX_COUNT_DISTINCT(col)` | +| `uniqExact(col)` | `COUNT(DISTINCT col)` | +| `JSONExtract(json, 'key', 'String')` | `JSON_EXTRACT(json, '$.key')` | +| `multiIf(cond1, val1, cond2, val2, default)` | `CASE WHEN cond1 THEN val1 WHEN cond2 THEN val2 ELSE default END` | + +### Table Engine Mapping + +| ClickHouse Engine | Doris Model | Notes | +|-------------------|-------------|-------| +| MergeTree | DUPLICATE | Append-only analytics | +| ReplacingMergeTree | UNIQUE | Deduplication by key | +| SummingMergeTree | AGGREGATE | Pre-aggregation | +| AggregatingMergeTree | AGGREGATE | Complex aggregations | +| CollapsingMergeTree | UNIQUE | With delete support | + +## Greenplum + +Greenplum is PostgreSQL-based, so migration is similar to PostgreSQL. + +### Data Type Mapping + +Use the [PostgreSQL type mapping](./postgresql-to-doris.md#data-type-mapping) as reference. Additional Greenplum-specific types: + +| Greenplum Type | Doris Type | Notes | +|----------------|------------|-------| +| INT2/INT4/INT8 | SMALLINT/INT/BIGINT | | +| FLOAT4/FLOAT8 | FLOAT/DOUBLE | | +| NUMERIC | DECIMAL | | +| TEXT | STRING | | +| BYTEA | STRING | | +| TIMESTAMP | DATETIME | | +| INTERVAL | STRING | | + +### Migration with JDBC Catalog + +```sql +-- Create Greenplum catalog (uses PostgreSQL driver) +CREATE CATALOG gp_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'gpadmin', + 'password' = 'password', + 'jdbc_url' = 'jdbc:postgresql://gp-master:5432/database', + 'driver_url' = 'postgresql-42.5.6.jar', + 'driver_class' = 'org.postgresql.Driver' +); + +-- Query Greenplum data +SWITCH gp_catalog; +USE public; +SELECT * FROM large_table LIMIT 10; + +-- Migrate with partitioning +INSERT INTO internal.target_db.large_table +SELECT * FROM gp_catalog.public.large_table +WHERE partition_col >= '2024-01-01'; +``` + +### Parallel Migration + +For large Greenplum tables, leverage parallel export: + +```bash +# Export from Greenplum to files +psql -h gp-master -c "COPY large_table TO '/tmp/data.csv' WITH CSV" + +# Or use gpfdist for parallel export +# Then load to Doris using Stream Load or Broker Load +``` + +## Data Lake (Hive, Iceberg, Hudi) {#data-lake} + +Doris's Multi-Catalog feature provides native integration with data lake table formats. + +### Hive Migration + +#### Step 1: Create Hive Catalog + +```sql +CREATE CATALOG hive_catalog PROPERTIES ( + 'type' = 'hms', + 'hive.metastore.uris' = 'thrift://hive-metastore:9083', + 'hadoop.username' = 'hadoop' +); +``` + +For S3-based Hive: + +```sql +CREATE CATALOG hive_catalog PROPERTIES ( + 'type' = 'hms', + 'hive.metastore.uris' = 'thrift://hive-metastore:9083', + 's3.endpoint' = 's3.amazonaws.com', + 's3.region' = 'us-east-1', + 's3.access_key' = 'your_ak', + 's3.secret_key' = 'your_sk' +); +``` + +#### Step 2: Query and Migrate + +```sql +-- Browse Hive tables +SWITCH hive_catalog; +SHOW DATABASES; +USE warehouse; +SHOW TABLES; + +-- Query Hive data directly +SELECT * FROM hive_catalog.warehouse.fact_sales LIMIT 10; + +-- Migrate to Doris +INSERT INTO internal.analytics.fact_sales +SELECT * FROM hive_catalog.warehouse.fact_sales +WHERE dt >= '2024-01-01'; +``` + +### Iceberg Migration + +```sql +-- Create Iceberg catalog +CREATE CATALOG iceberg_catalog PROPERTIES ( + 'type' = 'iceberg', + 'iceberg.catalog.type' = 'hms', + 'hive.metastore.uris' = 'thrift://hive-metastore:9083' +); + +-- Or with REST catalog +CREATE CATALOG iceberg_catalog PROPERTIES ( + 'type' = 'iceberg', + 'iceberg.catalog.type' = 'rest', + 'uri' = 'http://iceberg-rest:8181' +); + +-- Query Iceberg tables +SELECT * FROM iceberg_catalog.db.table_name; + +-- Time travel query +SELECT * FROM iceberg_catalog.db.table_name +FOR VERSION AS OF 123456789; + +-- Migrate data +INSERT INTO internal.target_db.target_table +SELECT * FROM iceberg_catalog.source_db.source_table; +``` + +### Hudi Migration + +```sql +-- Create Hudi catalog +CREATE CATALOG hudi_catalog PROPERTIES ( + 'type' = 'hms', + 'hive.metastore.uris' = 'thrift://hive-metastore:9083' +); + +-- Query Hudi tables (read-optimized) +SELECT * FROM hudi_catalog.db.hudi_table; + +-- Migrate data +INSERT INTO internal.target_db.target_table +SELECT * FROM hudi_catalog.db.hudi_table; +``` + +## Spark/Flink Connector Migration + +For systems not directly supported by catalogs, use Spark or Flink connectors. + +### Spark Doris Connector + +```scala +// Read from any Spark-supported source +val sourceDF = spark.read + .format("source_format") + .load("source_path") + +// Write to Doris +sourceDF.write + .format("doris") + .option("doris.table.identifier", "db.table") + .option("doris.fenodes", "doris-fe:8030") + .option("user", "root") + .option("password", "") + .save() +``` + +### Flink Doris Connector + +```sql +-- Read from source +CREATE TABLE source_table (...) WITH ('connector' = 'source-connector', ...); + +-- Write to Doris +CREATE TABLE doris_sink (...) WITH ( + 'connector' = 'doris', + 'fenodes' = 'doris-fe:8030', + 'table.identifier' = 'db.table', + 'username' = 'root', + 'password' = '' +); + +INSERT INTO doris_sink SELECT * FROM source_table; +``` + +## Export-Import Method + +For air-gapped environments or when direct connectivity isn't possible: + +### Step 1: Export to Files + +```bash +# From ClickHouse +clickhouse-client --query "SELECT * FROM table FORMAT Parquet" > data.parquet + +# From Greenplum +psql -c "\COPY table TO 'data.csv' WITH CSV HEADER" + +# From Hive +hive -e "INSERT OVERWRITE DIRECTORY '/tmp/export' ROW FORMAT DELIMITED SELECT * FROM table" +``` + +### Step 2: Upload to Object Storage + +```bash +# Upload to S3 +aws s3 cp data.parquet s3://bucket/migration/ + +# Or to HDFS +hdfs dfs -put data.parquet /migration/ +``` + +### Step 3: Load into Doris + +```sql +-- S3 Load +LOAD LABEL migration_job +( + DATA INFILE("s3://bucket/migration/data.parquet") + INTO TABLE target_table + FORMAT AS "parquet" +) +WITH S3 ( + "provider" = "AWS", + "s3.endpoint" = "s3.amazonaws.com", + "s3.region" = "us-east-1", + "s3.access_key" = "ak", + "s3.secret_key" = "sk" +); +``` + +## Best Practices + +### Schema Design Considerations + +When migrating from other OLAP systems: + +1. **Choose the right data model**: + - DUPLICATE for append-only event data + - UNIQUE for dimension tables with updates + - AGGREGATE for pre-aggregated metrics + +2. **Partition strategy**: + - Time-based partitioning for time-series data + - Match source partitioning when possible + +3. **Bucket count**: + - Start with 8-16 buckets per partition + - Scale based on data volume and query patterns + +### Incremental Migration + +For continuous sync from data lakes: + +```sql +-- Track last sync timestamp +CREATE TABLE sync_metadata ( + table_name VARCHAR(128), + last_sync_time DATETIME +) +DISTRIBUTED BY HASH(table_name) BUCKETS 1; + +-- Incremental load +INSERT INTO internal.analytics.fact_sales +SELECT * FROM hive_catalog.warehouse.fact_sales +WHERE updated_at > ( + SELECT last_sync_time FROM sync_metadata + WHERE table_name = 'fact_sales' +); + +-- Update sync metadata +INSERT INTO sync_metadata VALUES ('fact_sales', NOW()) +ON DUPLICATE KEY UPDATE last_sync_time = NOW(); +``` + +## Validation + +After migration: + +```sql +-- Row count validation +SELECT + 'source' as system, + COUNT(*) as cnt +FROM source_catalog.db.table +UNION ALL +SELECT + 'doris' as system, + COUNT(*) as cnt +FROM internal.db.table; + +-- Aggregation validation +SELECT SUM(amount), COUNT(DISTINCT user_id) +FROM internal.db.table; + +-- Compare with source +SELECT SUM(amount), COUNT(DISTINCT user_id) +FROM source_catalog.db.table; +``` + +## Next Steps + +- [Lakehouse Overview](../lakehouse/lakehouse-overview.md) - Multi-Catalog capabilities +- [Hive Catalog](../lakehouse/catalogs/hive-catalog.md) - Hive integration details +- [Iceberg Catalog](../lakehouse/catalogs/iceberg-catalog.md) - Iceberg integration +- [Spark Doris Connector](../ecosystem/spark-doris-connector.md) - Spark integration +- [Flink Doris Connector](../ecosystem/flink-doris-connector.md) - Flink integration diff --git a/docs/migration/overview.md b/docs/migration/overview.md new file mode 100644 index 00000000000..1997ea86882 --- /dev/null +++ b/docs/migration/overview.md @@ -0,0 +1,150 @@ +--- +{ + "title": "Migration Overview", + "language": "en", + "description": "Guide to migrating data from various databases and data systems to Apache Doris" +} +--- + +Apache Doris provides multiple methods to migrate data from various source systems. This guide helps you choose the best migration approach based on your source system and requirements. + +## Migration Paths + +| Source System | Recommended Method | Real-time Sync | Full Migration | Incremental | +|---------------|-------------------|----------------|----------------|-------------| +| [PostgreSQL](./postgresql-to-doris.md) | JDBC Catalog / Flink CDC | Yes | Yes | Yes | +| [MySQL](./mysql-to-doris.md) | Flink CDC / JDBC Catalog | Yes | Yes | Yes | +| [Elasticsearch](./elasticsearch-to-doris.md) | ES Catalog | No | Yes | Manual | +| [ClickHouse](./other-olap-to-doris.md#clickhouse) | JDBC Catalog | No | Yes | Manual | +| [Greenplum](./other-olap-to-doris.md#greenplum) | JDBC Catalog | No | Yes | Manual | +| [Hive/Iceberg/Hudi](./other-olap-to-doris.md#data-lake) | Multi-Catalog | No | Yes | Yes | + +## Choosing a Migration Method + +### Catalog-Based Migration (Recommended) + +Doris's [Multi-Catalog](../lakehouse/lakehouse-overview.md) feature allows you to directly query external data sources without data movement. This is the recommended approach for: + +- **Initial exploration**: Query source data before deciding on migration strategy +- **Hybrid queries**: Join data across Doris and external sources +- **Incremental migration**: Gradually move data while keeping source accessible + +```sql +-- Create a catalog to connect to your source +CREATE CATALOG pg_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'username', + 'password' = 'password', + 'jdbc_url' = 'jdbc:postgresql://host:5432/database', + 'driver_url' = 'postgresql-42.5.6.jar', + 'driver_class' = 'org.postgresql.Driver' +); + +-- Query source data directly +SELECT * FROM pg_catalog.schema_name.table_name LIMIT 10; + +-- Migrate data with INSERT INTO SELECT +INSERT INTO doris_db.doris_table +SELECT * FROM pg_catalog.schema_name.source_table; +``` + +### Flink CDC (Real-time Synchronization) + +[Flink CDC](../ecosystem/flink-doris-connector.md) is ideal for: + +- **Real-time data sync**: Capture changes as they happen +- **Full database migration**: Sync entire databases with automatic table creation +- **Zero-downtime migration**: Keep source and Doris in sync during transition + +### Export-Import Method + +For scenarios where direct connectivity is limited: + +1. Export data from source system to files (CSV, Parquet, JSON) +2. Stage files in object storage (S3, GCS, HDFS) +3. Load into Doris using [S3 Load](../data-operate/import/data-source/amazon-s3.md) or [Broker Load](../data-operate/import/import-way/broker-load-manual.md) + +## Migration Planning Checklist + +Before migrating, consider the following: + +1. **Data Volume Assessment** + - Total data size and row count + - Daily/hourly data growth rate + - Historical data retention requirements + +2. **Schema Design** + - Choose appropriate [Data Model](../table-design/data-model/overview.md) (Duplicate, Unique, Aggregate) + - Plan [Partitioning](../table-design/data-partitioning/data-distribution.md) strategy + - Define [Bucketing](../table-design/data-partitioning/data-bucketing.md) keys + +3. **Data Type Mapping** + - Review type compatibility (see migration guides for specific mappings) + - Handle special types (arrays, JSON, timestamps with timezone) + +4. **Performance Requirements** + - Query latency expectations + - Concurrent query load + - Data freshness requirements + +5. **Migration Window** + - Acceptable downtime (if any) + - Sync vs. async migration needs + +## Best Practices + +### Start with a Pilot Table + +Before migrating your entire database, test with a representative table: + +```sql +-- 1. Create the Doris table with appropriate schema +CREATE TABLE pilot_table ( + id INT, + created_at DATETIME, + data VARCHAR(255) +) +UNIQUE KEY(id) +DISTRIBUTED BY HASH(id) BUCKETS 8; + +-- 2. Migrate data +INSERT INTO pilot_table +SELECT id, created_at, data +FROM source_catalog.db.source_table; + +-- 3. Validate row counts +SELECT COUNT(*) FROM pilot_table; +SELECT COUNT(*) FROM source_catalog.db.source_table; +``` + +### Batch Large Migrations + +For tables with billions of rows, migrate in batches: + +```sql +-- Migrate by date range +INSERT INTO doris_table +SELECT * FROM source_catalog.db.source_table +WHERE created_at >= '2024-01-01' AND created_at < '2024-02-01'; +``` + +### Monitor Migration Progress + +Track load jobs using: + +```sql +-- Check active load jobs +SHOW LOAD WHERE STATE = 'LOADING'; + +-- Check recent load history +SHOW LOAD ORDER BY CreateTime DESC LIMIT 10; +``` + +## Next Steps + +Choose your source system to see detailed migration instructions: + +- [PostgreSQL to Doris](./postgresql-to-doris.md) +- [MySQL to Doris](./mysql-to-doris.md) +- [Elasticsearch to Doris](./elasticsearch-to-doris.md) +- [Other OLAP Systems to Doris](./other-olap-to-doris.md) diff --git a/docs/migration/postgresql-to-doris.md b/docs/migration/postgresql-to-doris.md new file mode 100644 index 00000000000..39c8f39f1fc --- /dev/null +++ b/docs/migration/postgresql-to-doris.md @@ -0,0 +1,433 @@ +--- +{ + "title": "PostgreSQL to Doris", + "language": "en", + "description": "Comprehensive guide to migrating data from PostgreSQL to Apache Doris" +} +--- + +This guide covers migrating data from PostgreSQL to Apache Doris. You can choose from several migration methods depending on your requirements for real-time sync, data volume, and operational complexity. + +## Considerations + +1. **Schema Design**: Before migration, select an appropriate Doris [Data Model](../table-design/data-model/overview.md) and plan your [Partitioning](../table-design/data-partitioning/data-distribution.md) and [Bucketing](../table-design/data-partitioning/data-bucketing.md) strategies. + +2. **Data Types**: Review the type mapping table below. Some PostgreSQL types require special handling (arrays, timestamps with timezone, JSON). + +3. **Primary Keys**: PostgreSQL's serial/identity columns map to Doris INT/BIGINT types. For unique constraints, use Doris's UNIQUE KEY model. + +## Data Type Mapping + +| PostgreSQL Type | Doris Type | Notes | +|-----------------|------------|-------| +| boolean | BOOLEAN | | +| smallint / int2 | SMALLINT | | +| integer / int4 | INT | | +| bigint / int8 | BIGINT | | +| decimal / numeric | DECIMAL(P,S) | Numeric without precision maps to STRING | +| real / float4 | FLOAT | | +| double precision | DOUBLE | | +| smallserial | SMALLINT | | +| serial | INT | | +| bigserial | BIGINT | | +| char(n) | CHAR(N) | | +| varchar / text | STRING | | +| timestamp | DATETIME | | +| timestamptz | DATETIME | Converted to local timezone; see [Timezone Issues](#handling-timezone-issues) | +| date | DATE | | +| time | STRING | Doris does not support TIME type | +| interval | STRING | | +| json / jsonb | JSON or STRING | Use STRING for better query performance | +| uuid | STRING | | +| bytea | STRING | | +| array | ARRAY | See [Handling Arrays](#handling-arrays) | +| inet / cidr / macaddr | STRING | | +| point / line / polygon | STRING | Geometric types stored as strings | + +## Migration Options + +### Option 1: JDBC Catalog (Recommended) + +The JDBC Catalog provides direct access to PostgreSQL data from Doris. This is the simplest approach for both querying and migrating data. + +#### Prerequisites + +- PostgreSQL 11.x or higher +- [PostgreSQL JDBC Driver](https://jdbc.postgresql.org/) version 42.5.x or above +- Network connectivity between Doris FE/BE nodes and PostgreSQL (port 5432) + +#### Step 1: Download and Deploy JDBC Driver + +```bash +# Download the driver +wget https://jdbc.postgresql.org/download/postgresql-42.5.6.jar + +# Copy to Doris FE and BE jdbc_drivers directories +cp postgresql-42.5.6.jar $DORIS_HOME/fe/jdbc_drivers/ +cp postgresql-42.5.6.jar $DORIS_HOME/be/jdbc_drivers/ +``` + +#### Step 2: Create the PostgreSQL Catalog + +```sql +CREATE CATALOG pg_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'postgres_user', + 'password' = 'postgres_password', + 'jdbc_url' = 'jdbc:postgresql://pg-host:5432/database_name', + 'driver_url' = 'postgresql-42.5.6.jar', + 'driver_class' = 'org.postgresql.Driver' +); +``` + +For SSL connections: + +```sql +CREATE CATALOG pg_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'postgres_user', + 'password' = 'postgres_password', + 'jdbc_url' = 'jdbc:postgresql://pg-host:5432/database_name?ssl=true&sslmode=require', + 'driver_url' = 'postgresql-42.5.6.jar', + 'driver_class' = 'org.postgresql.Driver' +); +``` + +#### Step 3: Explore Source Data + +```sql +-- Switch to the catalog +SWITCH pg_catalog; + +-- List available schemas (databases in Doris) +SHOW DATABASES; + +-- Use a schema +USE public; + +-- List tables +SHOW TABLES; + +-- Preview data +SELECT * FROM source_table LIMIT 10; + +-- Check row count +SELECT COUNT(*) FROM source_table; +``` + +#### Step 4: Create Doris Target Table + +```sql +-- Switch back to internal catalog +SWITCH internal; +USE target_db; + +-- Create table based on source schema +CREATE TABLE orders ( + order_id INT, + customer_id INT, + order_date DATE NOT NULL, + total_amount DECIMAL(10, 2), + status VARCHAR(32), + created_at DATETIME +) +UNIQUE KEY(order_id, order_date) +PARTITION BY RANGE(order_date) () +DISTRIBUTED BY HASH(order_id) BUCKETS 16 +PROPERTIES ( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "MONTH", + "dynamic_partition.start" = "-12", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "16", + "replication_num" = "3" +); +``` + +#### Step 5: Migrate Data + +For small to medium tables: + +```sql +INSERT INTO internal.target_db.orders +SELECT + order_id, + customer_id, + order_date, + total_amount, + status, + created_at +FROM pg_catalog.public.orders; +``` + +For large tables, migrate in batches: + +```sql +-- Batch by date range +INSERT INTO internal.target_db.orders +SELECT * FROM pg_catalog.public.orders +WHERE order_date >= '2024-01-01' AND order_date < '2024-04-01'; + +INSERT INTO internal.target_db.orders +SELECT * FROM pg_catalog.public.orders +WHERE order_date >= '2024-04-01' AND order_date < '2024-07-01'; +``` + +#### Step 6: Validate Migration + +```sql +-- Compare row counts +SELECT 'doris' as source, COUNT(*) as cnt FROM internal.target_db.orders +UNION ALL +SELECT 'postgres' as source, COUNT(*) as cnt FROM pg_catalog.public.orders; + +-- Spot check specific records +SELECT * FROM internal.target_db.orders WHERE order_id = 12345; +SELECT * FROM pg_catalog.public.orders WHERE order_id = 12345; +``` + +### Option 2: Flink CDC (Real-time Sync) + +Flink CDC captures changes from PostgreSQL WAL (Write-Ahead Log) and streams them to Doris in real-time. This is ideal for continuous synchronization. + +#### Prerequisites + +- PostgreSQL with logical replication enabled (`wal_level = logical`) +- Flink 1.15+ with Flink CDC and Flink Doris Connector +- A replication slot in PostgreSQL + +#### Step 1: Configure PostgreSQL + +Ensure these settings in `postgresql.conf`: + +```properties +wal_level = logical +max_replication_slots = 10 +max_wal_senders = 10 +``` + +Create a replication user and grant permissions: + +```sql +-- Create user with replication privilege +CREATE USER flink_cdc WITH REPLICATION PASSWORD 'password'; + +-- Grant access to tables +GRANT SELECT ON ALL TABLES IN SCHEMA public TO flink_cdc; +GRANT USAGE ON SCHEMA public TO flink_cdc; +``` + +#### Step 2: Create Flink CDC Job + +Using Flink SQL: + +```sql +-- Source: PostgreSQL CDC +CREATE TABLE pg_orders ( + order_id INT, + customer_id INT, + order_date DATE, + total_amount DECIMAL(10, 2), + status STRING, + created_at TIMESTAMP(3), + PRIMARY KEY (order_id) NOT ENFORCED +) WITH ( + 'connector' = 'postgres-cdc', + 'hostname' = 'pg-host', + 'port' = '5432', + 'username' = 'flink_cdc', + 'password' = 'password', + 'database-name' = 'source_db', + 'schema-name' = 'public', + 'table-name' = 'orders', + 'slot.name' = 'flink_slot', + 'decoding.plugin.name' = 'pgoutput' +); + +-- Sink: Doris +CREATE TABLE doris_orders ( + order_id INT, + customer_id INT, + order_date DATE, + total_amount DECIMAL(10, 2), + status STRING, + created_at DATETIME +) WITH ( + 'connector' = 'doris', + 'fenodes' = 'doris-fe:8030', + 'table.identifier' = 'target_db.orders', + 'username' = 'doris_user', + 'password' = 'doris_password', + 'sink.enable-2pc' = 'true', + 'sink.label-prefix' = 'pg_orders_sync' +); + +-- Start sync +INSERT INTO doris_orders SELECT * FROM pg_orders; +``` + +#### Step 3: Full Database Sync + +For synchronizing multiple tables or entire schemas: + +```sql +-- Use Flink Doris Connector's database sync feature +CREATE DATABASE IF NOT EXISTS sync_db; + +-- FlinkCDC whole database sync configuration +-- See Flink Doris Connector documentation for complete setup +``` + +### Option 3: Export and Load + +For air-gapped environments or when direct connectivity is not possible. + +#### Step 1: Export from PostgreSQL + +```bash +# Export to CSV +psql -h pg-host -U user -d database -c "\COPY orders TO '/tmp/orders.csv' WITH CSV HEADER" + +# Export to Parquet using DuckDB or pandas +duckdb -c "COPY (SELECT * FROM postgres_scan('postgresql://user:pass@host/db', 'public', 'orders')) TO '/tmp/orders.parquet'" +``` + +#### Step 2: Upload to Object Storage + +```bash +# Upload to S3 +aws s3 cp /tmp/orders.parquet s3://bucket/migration/orders.parquet + +# Or to HDFS +hdfs dfs -put /tmp/orders.parquet /migration/orders.parquet +``` + +#### Step 3: Load into Doris + +```sql +-- Using S3 Load +LOAD LABEL orders_migration +( + DATA INFILE("s3://bucket/migration/orders.parquet") + INTO TABLE orders + FORMAT AS "parquet" +) +WITH S3 ( + "provider" = "AWS", + "s3.endpoint" = "s3.amazonaws.com", + "s3.region" = "us-east-1", + "s3.access_key" = "your_ak", + "s3.secret_key" = "your_sk" +); + +-- Check load status +SHOW LOAD WHERE LABEL = "orders_migration"\G +``` + +## Handling Common Issues + +### Handling Timezone Issues + +PostgreSQL `timestamptz` stores timestamps in UTC and converts to session timezone on read. Doris `DATETIME` does not carry timezone information. + +**Recommendation**: Convert timestamps explicitly during migration: + +```sql +-- Convert to specific timezone in PostgreSQL query +INSERT INTO doris_table +SELECT + id, + created_at AT TIME ZONE 'UTC' as created_at_utc +FROM pg_catalog.schema.table; +``` + +Also ensure JVM timezone consistency in Doris BE by setting in `be.conf`: + +```properties +JAVA_OPTS="-Duser.timezone=UTC ..." +``` + +### Handling Arrays + +PostgreSQL arrays map to Doris ARRAY type, but dimension detection requires existing data: + +```sql +-- PostgreSQL source +CREATE TABLE pg_table ( + id INT, + tags TEXT[] +); + +-- Doris target +CREATE TABLE doris_table ( + id INT, + tags ARRAY<STRING> +) +DISTRIBUTED BY HASH(id) BUCKETS 8; +``` + +If array dimension cannot be determined, cast explicitly: + +```sql +INSERT INTO doris_table +SELECT + id, + CAST(tags AS ARRAY<STRING>) +FROM pg_catalog.schema.pg_table; +``` + +### Handling JSON/JSONB + +For complex JSON queries, map to Doris STRING and use JSON functions: + +```sql +-- Query JSON fields +SELECT + id, + JSON_EXTRACT(json_col, '$.name') as name, + JSON_EXTRACT(json_col, '$.address.city') as city +FROM table_name; +``` + +### Large Table Migration + +For tables with hundreds of millions of rows: + +1. **Partition the migration**: Migrate by time ranges or ID ranges +2. **Increase parallelism**: Use multiple INSERT statements concurrently +3. **Monitor resources**: Check Doris BE memory and disk usage + +```sql +-- Parallel migration script (run concurrently) +-- Session 1 +INSERT INTO orders SELECT * FROM pg_catalog.public.orders +WHERE order_id BETWEEN 0 AND 10000000; + +-- Session 2 +INSERT INTO orders SELECT * FROM pg_catalog.public.orders +WHERE order_id BETWEEN 10000001 AND 20000000; +``` + +## Validation Checklist + +After migration, validate: + +- [ ] Row counts match between source and target +- [ ] Sample records are identical +- [ ] Null values are preserved correctly +- [ ] Numeric precision is maintained +- [ ] Date/time values are correct (check timezone) +- [ ] Array and JSON fields are queryable + +```sql +-- Comprehensive validation query +SELECT + 'rows' as check_type, + CASE WHEN s.cnt = t.cnt THEN 'PASS' ELSE 'FAIL' END as result, + s.cnt as source_count, + t.cnt as target_count +FROM + (SELECT COUNT(*) cnt FROM pg_catalog.public.orders) s, + (SELECT COUNT(*) cnt FROM internal.target_db.orders) t; +``` diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/elasticsearch-to-doris.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/elasticsearch-to-doris.md new file mode 100644 index 00000000000..fc37d7d7c95 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/elasticsearch-to-doris.md @@ -0,0 +1,352 @@ +--- +{ + "title": "Elasticsearch 迁移到 Doris", + "language": "zh-CN", + "description": "从 Elasticsearch 迁移数据到 Apache Doris 的完整指南" +} +--- + +本指南介绍如何将数据从 Elasticsearch 迁移到 Apache Doris。Doris 可以作为 Elasticsearch 的强大替代方案,用于日志分析、全文搜索和通用 OLAP 工作负载,通常具有更好的性能和更低的运维复杂度。 + +## 为什么从 Elasticsearch 迁移到 Doris? + +| 方面 | Elasticsearch | Apache Doris | +|------|---------------|--------------| +| 查询语言 | DSL(基于 JSON) | 标准 SQL | +| JOIN | 有限支持 | 完整 SQL JOIN | +| 存储效率 | 存储使用较高 | 列式压缩 | +| 运维复杂度 | 集群管理复杂 | 运维更简单 | +| 全文搜索 | 原生倒排索引 | 支持倒排索引 | +| 实时分析 | 良好 | 优秀 | + +## 注意事项 + +1. **全文搜索**:Doris 支持[倒排索引](../table-design/index/inverted-index/overview.md),提供类似 Elasticsearch 的全文搜索能力。 + +2. **索引到表映射**:每个 Elasticsearch 索引通常映射到一个 Doris 表。 + +3. **嵌套文档**:Elasticsearch nested/object 类型映射到 Doris JSON 类型。 + +4. **数组处理**:Elasticsearch 数组需要在 Doris 中显式配置。 + +## 数据类型映射 + +| Elasticsearch 类型 | Doris 类型 | 说明 | +|--------------------|------------|------| +| null | NULL | | +| boolean | BOOLEAN | | +| byte | TINYINT | | +| short | SMALLINT | | +| integer | INT | | +| long | BIGINT | | +| unsigned_long | LARGEINT | | +| float | FLOAT | | +| half_float | FLOAT | | +| double | DOUBLE | | +| scaled_float | DOUBLE | | +| keyword | STRING | | +| text | STRING | 考虑在 Doris 中使用倒排索引 | +| date | DATE 或 DATETIME | 参见[日期处理](#处理日期类型) | +| ip | STRING | | +| nested | JSON | | +| object | JSON | | +| flattened | JSON | Doris 3.1.4、4.0.3 起支持 | +| geo_point | STRING | 存储为 "lat,lon" 字符串 | +| geo_shape | STRING | 存储为 GeoJSON 字符串 | + +## 迁移选项 + +### 选项 1:ES Catalog(推荐) + +ES Catalog 提供从 Doris 直接访问 Elasticsearch 数据的能力,支持查询和迁移。 + +#### 前提条件 + +- Elasticsearch 5.x 或更高版本 +- Doris FE/BE 节点与 Elasticsearch 之间的网络连接 + +#### 步骤 1:创建 ES Catalog + +```sql +CREATE CATALOG es_catalog PROPERTIES ( + 'type' = 'es', + 'hosts' = 'http://es-node1:9200,http://es-node2:9200', + 'user' = 'elastic', + 'password' = 'password' +); +``` + +带有更多选项: + +```sql +CREATE CATALOG es_catalog PROPERTIES ( + 'type' = 'es', + 'hosts' = 'http://es-node1:9200', + 'user' = 'elastic', + 'password' = 'password', + 'doc_value_scan' = 'true', + 'keyword_sniff' = 'true', + 'nodes_discovery' = 'true', + 'ssl' = 'false', + 'mapping_es_id' = 'true' +); +``` + +#### 步骤 2:探索 Elasticsearch 数据 + +```sql +-- 切换到 ES catalog +SWITCH es_catalog; + +-- ES 创建一个 default_db 数据库 +USE default_db; + +-- 列出索引作为表 +SHOW TABLES; + +-- 预览数据 +SELECT * FROM logs_index LIMIT 10; + +-- 检查字段映射 +DESC logs_index; +``` + +#### 步骤 3:设计 Doris 表 + +基于您的 Elasticsearch 索引,设计合适的 Doris 表: + +```sql +-- 示例:日志数据表 +SWITCH internal; + +CREATE TABLE logs ( + `@timestamp` DATETIME NOT NULL, + log_id VARCHAR(64), + level VARCHAR(16), + message TEXT, + host VARCHAR(128), + service VARCHAR(64), + trace_id VARCHAR(64), + INDEX idx_message (message) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true"), + INDEX idx_level (level) USING INVERTED, + INDEX idx_service (service) USING INVERTED +) +DUPLICATE KEY(`@timestamp`, log_id) +PARTITION BY RANGE(`@timestamp`) () +DISTRIBUTED BY HASH(log_id) BUCKETS 16 +PROPERTIES ( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.start" = "-30", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "replication_num" = "3" +); +``` + +#### 步骤 4:迁移数据 + +```sql +-- 基本迁移 +INSERT INTO internal.analytics_db.logs +SELECT + `@timestamp`, + _id as log_id, + level, + message, + host, + service, + trace_id +FROM es_catalog.default_db.logs_index; +``` + +对于大型索引,按时间范围迁移: + +```sql +-- 按天迁移 +INSERT INTO internal.analytics_db.logs +SELECT * FROM es_catalog.default_db.logs_index +WHERE `@timestamp` >= '2024-01-01' AND `@timestamp` < '2024-01-02'; +``` + +#### 步骤 5:配置数组字段 + +Elasticsearch 没有显式的数组类型。要正确读取数组,需要配置 ES 索引映射: + +```bash +# 向 ES 索引添加数组字段元数据 +curl -X PUT "localhost:9200/logs_index/_mapping" -H 'Content-Type: application/json' -d '{ + "_meta": { + "doris": { + "array_fields": ["tags", "ip_addresses"] + } + } +}' +``` + +然后在 Doris 中: + +```sql +-- 数组字段将被正确识别 +SELECT tags, ip_addresses FROM es_catalog.default_db.logs_index LIMIT 5; +``` + +## 迁移全文搜索 + +Doris 的倒排索引提供类似 Elasticsearch 的全文搜索能力。 + +### 创建倒排索引 + +```sql +-- 创建带倒排索引的表用于全文搜索 +CREATE TABLE articles ( + id BIGINT, + title VARCHAR(256), + content TEXT, + author VARCHAR(64), + published_at DATETIME, + tags ARRAY<STRING>, + INDEX idx_title (title) USING INVERTED PROPERTIES("parser" = "unicode"), + INDEX idx_content (content) USING INVERTED PROPERTIES( + "parser" = "unicode", + "support_phrase" = "true" + ), + INDEX idx_tags (tags) USING INVERTED +) +DUPLICATE KEY(id) +DISTRIBUTED BY HASH(id) BUCKETS 8; +``` + +### 全文搜索查询 + +```sql +-- Match 查询(类似 ES match) +SELECT * FROM articles +WHERE content MATCH 'apache doris'; + +-- 短语匹配(类似 ES match_phrase) +SELECT * FROM articles +WHERE content MATCH_PHRASE 'real-time analytics'; + +-- 多条件组合 +SELECT * FROM articles +WHERE title MATCH 'database' + AND content MATCH 'performance' + AND published_at > '2024-01-01'; +``` + +### DSL 到 SQL 转换示例 + +| Elasticsearch DSL | Doris SQL | +|-------------------|-----------| +| `{"match": {"title": "doris"}}` | `WHERE title MATCH 'doris'` | +| `{"match_phrase": {"content": "real time"}}` | `WHERE content MATCH_PHRASE 'real time'` | +| `{"term": {"status": "active"}}` | `WHERE status = 'active'` | +| `{"terms": {"tag": ["a", "b"]}}` | `WHERE tag IN ('a', 'b')` | +| `{"range": {"price": {"gte": 10}}}` | `WHERE price >= 10` | +| `{"bool": {"must": [...]}}` | `WHERE ... AND ...` | +| `{"bool": {"should": [...]}}` | `WHERE ... OR ...` | +| `{"exists": {"field": "email"}}` | `WHERE email IS NOT NULL` | + +## 处理常见问题 + +### 处理日期类型 + +Elasticsearch 日期可以有多种格式。确保一致处理: + +```sql +-- 带 datetime 的 Doris 表 +CREATE TABLE events ( + event_id VARCHAR(64), + event_time DATETIME, + event_data JSON +) +DUPLICATE KEY(event_id) +DISTRIBUTED BY HASH(event_id) BUCKETS 8; + +-- 带日期转换的迁移 +INSERT INTO events +SELECT + _id, + CAST(`@timestamp` AS DATETIME), + event_data +FROM es_catalog.default_db.events_index; +``` + +### 处理嵌套文档 + +Elasticsearch 嵌套对象映射到 Doris JSON: + +```sql +-- ES 文档带嵌套数据 +-- { "user": { "name": "John", "address": { "city": "NYC" } } } + +-- Doris 表 +CREATE TABLE users ( + id VARCHAR(64), + user_data JSON +) +DISTRIBUTED BY HASH(id) BUCKETS 8; + +-- 在 Doris 中查询嵌套数据 +SELECT + id, + JSON_EXTRACT(user_data, '$.name') as name, + JSON_EXTRACT(user_data, '$.address.city') as city +FROM users; +``` + +### 处理 _id 字段 + +要保留 Elasticsearch `_id`: + +```sql +-- 在 catalog 中启用 _id 映射 +CREATE CATALOG es_catalog PROPERTIES ( + 'type' = 'es', + 'hosts' = 'http://es-node:9200', + 'mapping_es_id' = 'true' +); + +-- 带 _id 查询 +SELECT _id, * FROM es_catalog.default_db.index_name LIMIT 10; +``` + +### 性能优化 + +提升 ES Catalog 读取性能: + +```sql +-- 启用列式扫描(doc_value) +CREATE CATALOG es_catalog PROPERTIES ( + 'type' = 'es', + 'hosts' = 'http://es-node:9200', + 'doc_value_scan' = 'true' +); +``` + +注意:`text` 字段不支持 doc_value,会回退到 `_source`。 + +## 验证 + +迁移后,验证: + +```sql +-- 比较文档数 +SELECT COUNT(*) FROM es_catalog.default_db.logs_index; +SELECT COUNT(*) FROM internal.analytics_db.logs; + +-- 验证全文搜索工作正常 +SELECT COUNT(*) FROM internal.analytics_db.logs +WHERE message MATCH 'error'; + +-- 抽查特定文档 +SELECT * FROM internal.analytics_db.logs +WHERE log_id = 'specific-doc-id'; +``` + +## 下一步 + +- [倒排索引](../table-design/index/inverted-index/overview.md) - Doris 中的全文搜索 +- [ES Catalog](../lakehouse/catalogs/es-catalog.md) - 完整的 ES Catalog 参考 +- [日志存储分析](../log-storage-analysis.md) - 优化 Doris 中的日志分析 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/mysql-to-doris.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/mysql-to-doris.md new file mode 100644 index 00000000000..ed2de1ca1ae --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/mysql-to-doris.md @@ -0,0 +1,364 @@ +--- +{ + "title": "MySQL 迁移到 Doris", + "language": "zh-CN", + "description": "从 MySQL 迁移数据到 Apache Doris 的完整指南" +} +--- + +本指南介绍如何将数据从 MySQL 迁移到 Apache Doris。MySQL 是最常见的迁移源之一,Doris 对 MySQL 协议有很好的兼容性,使迁移变得简单。 + +## 注意事项 + +1. **协议兼容**:Doris 兼容 MySQL 协议,因此现有的 MySQL 客户端和工具可以与 Doris 配合使用。 + +2. **实时需求**:如果需要实时同步,推荐使用 Flink CDC,支持自动建表和 Schema 变更。 + +3. **全库同步**:Flink Doris Connector 支持同步整个 MySQL 数据库,包括 DDL 操作。 + +## 数据类型映射 + +| MySQL 类型 | Doris 类型 | 说明 | +|------------|------------|------| +| BOOLEAN / TINYINT(1) | BOOLEAN | | +| TINYINT | TINYINT | | +| SMALLINT | SMALLINT | | +| MEDIUMINT | INT | | +| INT / INTEGER | INT | | +| BIGINT | BIGINT | | +| FLOAT | FLOAT | | +| DOUBLE | DOUBLE | | +| DECIMAL(P, S) | DECIMAL(P, S) | | +| DATE | DATE | | +| DATETIME | DATETIME | | +| TIMESTAMP | DATETIME | 以 UTC 存储,读取时转换 | +| TIME | STRING | Doris 不支持 TIME 类型 | +| YEAR | INT | | +| CHAR(N) | CHAR(N) | | +| VARCHAR(N) | VARCHAR(N) | | +| TEXT / MEDIUMTEXT / LONGTEXT | STRING | | +| BINARY / VARBINARY | STRING | | +| BLOB / MEDIUMBLOB / LONGBLOB | STRING | | +| JSON | JSON | | +| ENUM | STRING | | +| SET | STRING | | +| BIT | BOOLEAN / BIGINT | BIT(1) 映射为 BOOLEAN | + +## 迁移选项 + +### 选项 1:Flink CDC(推荐用于实时同步) + +Flink CDC 捕获 MySQL binlog 变更并流式传输到 Doris。这是以下场景的推荐方法: + +- 实时数据同步 +- 自动建表的全库迁移 +- 支持 Schema 演进的持续同步 + +#### 前提条件 + +- MySQL 5.7+ 或 8.0+,启用 binlog +- Flink 1.15+ 配合 Flink CDC 3.x 和 Flink Doris Connector + +#### 步骤 1:配置 MySQL Binlog + +确保 MySQL 中有以下设置: + +```ini +[mysqld] +server-id = 1 +log_bin = mysql-bin +binlog_format = ROW +binlog_row_image = FULL +expire_logs_days = 7 +``` + +创建 CDC 用户: + +```sql +CREATE USER 'flink_cdc'@'%' IDENTIFIED BY 'password'; +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_cdc'@'%'; +FLUSH PRIVILEGES; +``` + +#### 步骤 2:使用 Flink SQL 单表同步 + +```sql +-- 源:MySQL CDC +CREATE TABLE mysql_orders ( + order_id INT, + customer_id INT, + order_date DATE, + total_amount DECIMAL(10, 2), + status STRING, + created_at TIMESTAMP(3), + PRIMARY KEY (order_id) NOT ENFORCED +) WITH ( + 'connector' = 'mysql-cdc', + 'hostname' = 'mysql-host', + 'port' = '3306', + 'username' = 'flink_cdc', + 'password' = 'password', + 'database-name' = 'source_db', + 'table-name' = 'orders', + 'server-time-zone' = 'UTC' +); + +-- 目标:Doris +CREATE TABLE doris_orders ( + order_id INT, + customer_id INT, + order_date DATE, + total_amount DECIMAL(10, 2), + status STRING, + created_at DATETIME +) WITH ( + 'connector' = 'doris', + 'fenodes' = 'doris-fe:8030', + 'table.identifier' = 'target_db.orders', + 'username' = 'doris_user', + 'password' = 'doris_password', + 'sink.enable-2pc' = 'true', + 'sink.label-prefix' = 'mysql_orders_sync' +); + +-- 开始同步 +INSERT INTO doris_orders SELECT * FROM mysql_orders; +``` + +#### 步骤 3:使用 Flink Doris Connector 全库同步 + +Flink Doris Connector 提供强大的整库同步功能: + +```shell +<FLINK_HOME>/bin/flink run \ + -c org.apache.doris.flink.tools.cdc.CdcTools \ + flink-doris-connector-1.18-25.1.0.jar \ + mysql-sync-database \ + --database source_db \ + --mysql-conf hostname=mysql-host \ + --mysql-conf port=3306 \ + --mysql-conf username=flink_cdc \ + --mysql-conf password=password \ + --mysql-conf database-name=source_db \ + --doris-conf fenodes=doris-fe:8030 \ + --doris-conf username=doris_user \ + --doris-conf password=doris_password \ + --doris-conf jdbc-url=jdbc:mysql://doris-fe:9030 \ + --table-conf replication_num=3 \ + --including-tables "orders|customers|products" +``` + +关键选项: + +| 参数 | 说明 | +|------|------| +| `--including-tables` | 要包含的表的正则表达式 | +| `--excluding-tables` | 要排除的表的正则表达式 | +| `--multi-to-one-origin` | 多源表映射到一个目标表 | +| `--create-table-only` | 仅创建表不同步数据 | + +### 选项 2:JDBC Catalog + +JDBC Catalog 允许从 MySQL 直接查询和批量迁移。 + +#### 步骤 1:下载 MySQL JDBC 驱动 + +```bash +wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.33/mysql-connector-java-8.0.33.jar +cp mysql-connector-java-8.0.33.jar $DORIS_HOME/fe/jdbc_drivers/ +cp mysql-connector-java-8.0.33.jar $DORIS_HOME/be/jdbc_drivers/ +``` + +#### 步骤 2:创建 MySQL Catalog + +```sql +CREATE CATALOG mysql_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'mysql_user', + 'password' = 'mysql_password', + 'jdbc_url' = 'jdbc:mysql://mysql-host:3306/source_db', + 'driver_url' = 'mysql-connector-java-8.0.33.jar', + 'driver_class' = 'com.mysql.cj.jdbc.Driver' +); +``` + +#### 步骤 3:查询和迁移 + +```sql +-- 探索源数据 +SWITCH mysql_catalog; +SHOW DATABASES; +USE source_db; +SHOW TABLES; +SELECT * FROM orders LIMIT 10; + +-- 在 Doris 中创建目标表 +SWITCH internal; +CREATE TABLE target_db.orders ( + order_id INT, + customer_id INT, + order_date DATE NOT NULL, + total_amount DECIMAL(10, 2), + status VARCHAR(32) +) +UNIQUE KEY(order_id, order_date) +PARTITION BY RANGE(order_date) () +DISTRIBUTED BY HASH(order_id) BUCKETS 16 +PROPERTIES ( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "replication_num" = "3" +); + +-- 迁移数据 +INSERT INTO internal.target_db.orders +SELECT order_id, customer_id, order_date, total_amount, status +FROM mysql_catalog.source_db.orders; +``` + +### 选项 3:DataX + +[DataX](https://github.com/alibaba/DataX) 是一个广泛使用的数据同步工具,支持 MySQL 到 Doris 的迁移。 + +#### DataX 任务配置 + +```json +{ + "job": { + "setting": { + "speed": { + "channel": 4 + } + }, + "content": [{ + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "mysql_user", + "password": "mysql_password", + "connection": [{ + "querySql": ["SELECT order_id, customer_id, order_date, total_amount, status FROM orders"], + "jdbcUrl": ["jdbc:mysql://mysql-host:3306/source_db"] + }] + } + }, + "writer": { + "name": "doriswriter", + "parameter": { + "feLoadUrl": ["doris-fe:8030"], + "jdbcUrl": "jdbc:mysql://doris-fe:9030/", + "database": "target_db", + "table": "orders", + "username": "doris_user", + "password": "doris_password", + "loadProps": { + "format": "json", + "strip_outer_array": true + } + } + } + }] + } +} +``` + +运行任务: + +```bash +python datax.py mysql_to_doris.json +``` + +## 处理常见问题 + +### 自增列 + +MySQL AUTO_INCREMENT 列应映射到 Doris 的自增功能: + +```sql +-- 带自增的 Doris 表 +CREATE TABLE users ( + user_id BIGINT AUTO_INCREMENT, + username VARCHAR(64), + email VARCHAR(128) +) +UNIQUE KEY(user_id) +DISTRIBUTED BY HASH(user_id) BUCKETS 8; +``` + +迁移时,您可能希望保留原始 ID: + +```sql +-- 迁移时保留原 ID +INSERT INTO users (user_id, username, email) +SELECT user_id, username, email +FROM mysql_catalog.source_db.users; +``` + +### 处理 ENUM 和 SET 类型 + +MySQL ENUM 和 SET 类型在 Doris 中作为 STRING 迁移: + +```sql +-- MySQL 源表 +CREATE TABLE products ( + id INT, + status ENUM('active', 'inactive', 'pending'), + tags SET('featured', 'sale', 'new') +); + +-- Doris 目标表 +CREATE TABLE products ( + id INT, + status VARCHAR(32), + tags VARCHAR(128) +) +DISTRIBUTED BY HASH(id) BUCKETS 8; +``` + +### 大表迁移性能 + +对于数十亿行的表: + +1. **增加 Flink 并行度**: +```sql +SET 'parallelism.default' = '8'; +``` + +2. **调整 Doris 写缓冲**: +```sql +-- 在 Flink sink 配置中 +'sink.buffer-size' = '1048576', +'sink.buffer-count' = '3' +``` + +## 验证 + +迁移后,验证数据完整性: + +```sql +-- 行数比较 +SELECT + 'mysql' as source, + COUNT(*) as cnt +FROM mysql_catalog.source_db.orders +UNION ALL +SELECT + 'doris' as source, + COUNT(*) as cnt +FROM internal.target_db.orders; + +-- 校验和验证(示例) +SELECT + SUM(order_id) as id_sum, + SUM(total_amount) as amount_sum, + COUNT(DISTINCT customer_id) as unique_customers +FROM internal.target_db.orders; +``` + +## 下一步 + +- [Flink Doris Connector](../ecosystem/flink-doris-connector.md) - 详细的连接器文档 +- [数据导入](../data-operate/import/load-manual.md) - 其他导入方法 +- [数据模型](../table-design/data-model/overview.md) - 选择正确的表模型 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/other-olap-to-doris.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/other-olap-to-doris.md new file mode 100644 index 00000000000..a4384331ddb --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/other-olap-to-doris.md @@ -0,0 +1,441 @@ +--- +{ + "title": "其他 OLAP 系统迁移到 Doris", + "language": "zh-CN", + "description": "从 ClickHouse、Greenplum、Hive、Iceberg、Hudi 等 OLAP 系统迁移数据到 Apache Doris 的指南" +} +--- + +本指南介绍如何从各种 OLAP 系统迁移数据到 Apache Doris,包括 ClickHouse、Greenplum 以及数据湖技术如 Hive、Iceberg 和 Hudi。 + +## 迁移方法概述 + +| 源系统 | 推荐方法 | 说明 | +|--------|---------|------| +| ClickHouse | JDBC Catalog + SQL 转换 | 需要 Schema 和 SQL 语法转换 | +| Greenplum | JDBC Catalog | 兼容 PostgreSQL | +| Hive | Multi-Catalog(Hive Catalog) | 直接元数据集成 | +| Iceberg | Multi-Catalog(Iceberg Catalog) | 原生表格式支持 | +| Hudi | Multi-Catalog(Hudi Catalog) | 原生表格式支持 | +| Spark/Flink 表 | Spark/Flink Doris Connector | 批量或流式 | + +## ClickHouse + +ClickHouse 和 Doris 都是列式 OLAP 数据库,有一些相似之处,但 SQL 方言和数据类型不同。 + +### 数据类型映射 + +| ClickHouse 类型 | Doris 类型 | 说明 | +|-----------------|------------|------| +| Int8 | TINYINT | | +| Int16 | SMALLINT | | +| Int32 | INT | | +| Int64 | BIGINT | | +| UInt8 | SMALLINT | 无符号转有符号 | +| UInt16 | INT | | +| UInt32 | BIGINT | | +| UInt64 | LARGEINT | | +| Float32 | FLOAT | | +| Float64 | DOUBLE | | +| Decimal(P, S) | DECIMAL(P, S) | | +| String | STRING | | +| FixedString(N) | CHAR(N) | | +| Date | DATE | | +| DateTime | DATETIME | | +| DateTime64 | DATETIME(precision) | | +| UUID | VARCHAR(36) | | +| Array(T) | ARRAY<T> | | +| Tuple | STRUCT | | +| Map(K, V) | MAP<K, V> | | +| Nullable(T) | T(可空) | | +| LowCardinality(T) | T | 无需特殊处理 | +| Enum8/Enum16 | TINYINT/SMALLINT 或 STRING | | + +### 使用 JDBC Catalog 迁移 + +#### 步骤 1:设置 ClickHouse JDBC 驱动 + +```bash +# 下载 ClickHouse JDBC 驱动 +wget https://github.com/ClickHouse/clickhouse-java/releases/download/v0.6.0/clickhouse-jdbc-0.6.0-all.jar + +# 部署到 Doris +cp clickhouse-jdbc-0.6.0-all.jar $DORIS_HOME/fe/jdbc_drivers/ +cp clickhouse-jdbc-0.6.0-all.jar $DORIS_HOME/be/jdbc_drivers/ +``` + +#### 步骤 2:创建 ClickHouse Catalog + +```sql +CREATE CATALOG clickhouse_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'default', + 'password' = 'password', + 'jdbc_url' = 'jdbc:clickhouse://clickhouse-host:8123/default', + 'driver_url' = 'clickhouse-jdbc-0.6.0-all.jar', + 'driver_class' = 'com.clickhouse.jdbc.ClickHouseDriver' +); +``` + +#### 步骤 3:探索和迁移 + +```sql +-- 探索 ClickHouse 数据 +SWITCH clickhouse_catalog; +SHOW DATABASES; +USE default; +SHOW TABLES; + +-- 预览表 +SELECT * FROM events LIMIT 10; + +-- 创建 Doris 表 +SWITCH internal; +CREATE TABLE analytics.events ( + event_id BIGINT, + event_time DATETIME, + user_id BIGINT, + event_type VARCHAR(64), + properties JSON +) +DUPLICATE KEY(event_id) +PARTITION BY RANGE(event_time) () +DISTRIBUTED BY HASH(event_id) BUCKETS 16 +PROPERTIES ( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "replication_num" = "3" +); + +-- 迁移数据 +INSERT INTO internal.analytics.events +SELECT + event_id, + event_time, + user_id, + event_type, + properties +FROM clickhouse_catalog.default.events; +``` + +### SQL 语法转换 + +常见的 ClickHouse 到 Doris SQL 转换: + +| ClickHouse | Doris | +|------------|-------| +| `toDate(datetime)` | `DATE(datetime)` | +| `toDateTime(string)` | `CAST(string AS DATETIME)` | +| `formatDateTime(dt, '%Y-%m')` | `DATE_FORMAT(dt, '%Y-%m')` | +| `arrayJoin(arr)` | `EXPLODE(arr)` 配合 LATERAL VIEW | +| `groupArray(col)` | `COLLECT_LIST(col)` | +| `argMax(col1, col2)` | `MAX_BY(col1, col2)` | +| `argMin(col1, col2)` | `MIN_BY(col1, col2)` | +| `uniq(col)` | `APPROX_COUNT_DISTINCT(col)` | +| `uniqExact(col)` | `COUNT(DISTINCT col)` | +| `JSONExtract(json, 'key', 'String')` | `JSON_EXTRACT(json, '$.key')` | +| `multiIf(cond1, val1, cond2, val2, default)` | `CASE WHEN cond1 THEN val1 WHEN cond2 THEN val2 ELSE default END` | + +### 表引擎映射 + +| ClickHouse 引擎 | Doris 模型 | 说明 | +|-----------------|------------|------| +| MergeTree | DUPLICATE | 仅追加分析 | +| ReplacingMergeTree | UNIQUE | 按键去重 | +| SummingMergeTree | AGGREGATE | 预聚合 | +| AggregatingMergeTree | AGGREGATE | 复杂聚合 | +| CollapsingMergeTree | UNIQUE | 支持删除 | + +## Greenplum + +Greenplum 基于 PostgreSQL,因此迁移与 PostgreSQL 类似。 + +### 数据类型映射 + +参考 [PostgreSQL 类型映射](./postgresql-to-doris.md#数据类型映射)。Greenplum 特有的类型: + +| Greenplum 类型 | Doris 类型 | 说明 | +|----------------|------------|------| +| INT2/INT4/INT8 | SMALLINT/INT/BIGINT | | +| FLOAT4/FLOAT8 | FLOAT/DOUBLE | | +| NUMERIC | DECIMAL | | +| TEXT | STRING | | +| BYTEA | STRING | | +| TIMESTAMP | DATETIME | | +| INTERVAL | STRING | | + +### 使用 JDBC Catalog 迁移 + +```sql +-- 创建 Greenplum catalog(使用 PostgreSQL 驱动) +CREATE CATALOG gp_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'gpadmin', + 'password' = 'password', + 'jdbc_url' = 'jdbc:postgresql://gp-master:5432/database', + 'driver_url' = 'postgresql-42.5.6.jar', + 'driver_class' = 'org.postgresql.Driver' +); + +-- 查询 Greenplum 数据 +SWITCH gp_catalog; +USE public; +SELECT * FROM large_table LIMIT 10; + +-- 带分区迁移 +INSERT INTO internal.target_db.large_table +SELECT * FROM gp_catalog.public.large_table +WHERE partition_col >= '2024-01-01'; +``` + +## 数据湖(Hive、Iceberg、Hudi) {#data-lake} + +Doris 的 Multi-Catalog 功能提供与数据湖表格式的原生集成。 + +### Hive 迁移 + +#### 步骤 1:创建 Hive Catalog + +```sql +CREATE CATALOG hive_catalog PROPERTIES ( + 'type' = 'hms', + 'hive.metastore.uris' = 'thrift://hive-metastore:9083', + 'hadoop.username' = 'hadoop' +); +``` + +基于 S3 的 Hive: + +```sql +CREATE CATALOG hive_catalog PROPERTIES ( + 'type' = 'hms', + 'hive.metastore.uris' = 'thrift://hive-metastore:9083', + 's3.endpoint' = 's3.amazonaws.com', + 's3.region' = 'us-east-1', + 's3.access_key' = 'your_ak', + 's3.secret_key' = 'your_sk' +); +``` + +#### 步骤 2:查询和迁移 + +```sql +-- 浏览 Hive 表 +SWITCH hive_catalog; +SHOW DATABASES; +USE warehouse; +SHOW TABLES; + +-- 直接查询 Hive 数据 +SELECT * FROM hive_catalog.warehouse.fact_sales LIMIT 10; + +-- 迁移到 Doris +INSERT INTO internal.analytics.fact_sales +SELECT * FROM hive_catalog.warehouse.fact_sales +WHERE dt >= '2024-01-01'; +``` + +### Iceberg 迁移 + +```sql +-- 创建 Iceberg catalog +CREATE CATALOG iceberg_catalog PROPERTIES ( + 'type' = 'iceberg', + 'iceberg.catalog.type' = 'hms', + 'hive.metastore.uris' = 'thrift://hive-metastore:9083' +); + +-- 或使用 REST catalog +CREATE CATALOG iceberg_catalog PROPERTIES ( + 'type' = 'iceberg', + 'iceberg.catalog.type' = 'rest', + 'uri' = 'http://iceberg-rest:8181' +); + +-- 查询 Iceberg 表 +SELECT * FROM iceberg_catalog.db.table_name; + +-- 时间旅行查询 +SELECT * FROM iceberg_catalog.db.table_name +FOR VERSION AS OF 123456789; + +-- 迁移数据 +INSERT INTO internal.target_db.target_table +SELECT * FROM iceberg_catalog.source_db.source_table; +``` + +### Hudi 迁移 + +```sql +-- 创建 Hudi catalog +CREATE CATALOG hudi_catalog PROPERTIES ( + 'type' = 'hms', + 'hive.metastore.uris' = 'thrift://hive-metastore:9083' +); + +-- 查询 Hudi 表(读优化) +SELECT * FROM hudi_catalog.db.hudi_table; + +-- 迁移数据 +INSERT INTO internal.target_db.target_table +SELECT * FROM hudi_catalog.db.hudi_table; +``` + +## Spark/Flink Connector 迁移 + +对于 catalog 不直接支持的系统,使用 Spark 或 Flink connector。 + +### Spark Doris Connector + +```scala +// 从任何 Spark 支持的源读取 +val sourceDF = spark.read + .format("source_format") + .load("source_path") + +// 写入 Doris +sourceDF.write + .format("doris") + .option("doris.table.identifier", "db.table") + .option("doris.fenodes", "doris-fe:8030") + .option("user", "root") + .option("password", "") + .save() +``` + +### Flink Doris Connector + +```sql +-- 从源读取 +CREATE TABLE source_table (...) WITH ('connector' = 'source-connector', ...); + +-- 写入 Doris +CREATE TABLE doris_sink (...) WITH ( + 'connector' = 'doris', + 'fenodes' = 'doris-fe:8030', + 'table.identifier' = 'db.table', + 'username' = 'root', + 'password' = '' +); + +INSERT INTO doris_sink SELECT * FROM source_table; +``` + +## 导出-导入方法 + +对于网络隔离环境或无法直接连接时: + +### 步骤 1:导出到文件 + +```bash +# 从 ClickHouse +clickhouse-client --query "SELECT * FROM table FORMAT Parquet" > data.parquet + +# 从 Greenplum +psql -c "\COPY table TO 'data.csv' WITH CSV HEADER" + +# 从 Hive +hive -e "INSERT OVERWRITE DIRECTORY '/tmp/export' ROW FORMAT DELIMITED SELECT * FROM table" +``` + +### 步骤 2:上传到对象存储 + +```bash +# 上传到 S3 +aws s3 cp data.parquet s3://bucket/migration/ + +# 或上传到 HDFS +hdfs dfs -put data.parquet /migration/ +``` + +### 步骤 3:加载到 Doris + +```sql +-- S3 Load +LOAD LABEL migration_job +( + DATA INFILE("s3://bucket/migration/data.parquet") + INTO TABLE target_table + FORMAT AS "parquet" +) +WITH S3 ( + "provider" = "AWS", + "s3.endpoint" = "s3.amazonaws.com", + "s3.region" = "us-east-1", + "s3.access_key" = "ak", + "s3.secret_key" = "sk" +); +``` + +## 最佳实践 + +### Schema 设计考虑 + +从其他 OLAP 系统迁移时: + +1. **选择正确的数据模型**: + - DUPLICATE 用于仅追加的事件数据 + - UNIQUE 用于带更新的维表 + - AGGREGATE 用于预聚合指标 + +2. **分区策略**: + - 对时间序列数据使用基于时间的分区 + - 尽可能匹配源分区 + +3. **分桶数**: + - 每个分区从 8-16 个桶开始 + - 根据数据量和查询模式扩展 + +### 增量迁移 + +从数据湖持续同步: + +```sql +-- 跟踪最后同步时间戳 +CREATE TABLE sync_metadata ( + table_name VARCHAR(128), + last_sync_time DATETIME +) +DISTRIBUTED BY HASH(table_name) BUCKETS 1; + +-- 增量加载 +INSERT INTO internal.analytics.fact_sales +SELECT * FROM hive_catalog.warehouse.fact_sales +WHERE updated_at > ( + SELECT last_sync_time FROM sync_metadata + WHERE table_name = 'fact_sales' +); + +-- 更新同步元数据 +INSERT INTO sync_metadata VALUES ('fact_sales', NOW()) +ON DUPLICATE KEY UPDATE last_sync_time = NOW(); +``` + +## 验证 + +迁移后: + +```sql +-- 行数验证 +SELECT + 'source' as system, + COUNT(*) as cnt +FROM source_catalog.db.table +UNION ALL +SELECT + 'doris' as system, + COUNT(*) as cnt +FROM internal.db.table; + +-- 聚合验证 +SELECT SUM(amount), COUNT(DISTINCT user_id) +FROM internal.db.table; +``` + +## 下一步 + +- [湖仓一体概述](../lakehouse/lakehouse-overview.md) - Multi-Catalog 功能 +- [Hive Catalog](../lakehouse/catalogs/hive-catalog.md) - Hive 集成详情 +- [Iceberg Catalog](../lakehouse/catalogs/iceberg-catalog.md) - Iceberg 集成 +- [Spark Doris Connector](../ecosystem/spark-doris-connector.md) - Spark 集成 +- [Flink Doris Connector](../ecosystem/flink-doris-connector.md) - Flink 集成 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/overview.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/overview.md new file mode 100644 index 00000000000..7c19c4acb9b --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/overview.md @@ -0,0 +1,150 @@ +--- +{ + "title": "迁移概述", + "language": "zh-CN", + "description": "从各种数据库和数据系统迁移数据到 Apache Doris 的指南" +} +--- + +Apache Doris 提供多种方法从各种源系统迁移数据。本指南帮助您根据源系统和需求选择最佳的迁移方式。 + +## 迁移路径 + +| 源系统 | 推荐方法 | 实时同步 | 全量迁移 | 增量同步 | +|--------|---------|---------|---------|---------| +| [PostgreSQL](./postgresql-to-doris.md) | JDBC Catalog / Flink CDC | 支持 | 支持 | 支持 | +| [MySQL](./mysql-to-doris.md) | Flink CDC / JDBC Catalog | 支持 | 支持 | 支持 | +| [Elasticsearch](./elasticsearch-to-doris.md) | ES Catalog | 不支持 | 支持 | 手动 | +| [ClickHouse](./other-olap-to-doris.md#clickhouse) | JDBC Catalog | 不支持 | 支持 | 手动 | +| [Greenplum](./other-olap-to-doris.md#greenplum) | JDBC Catalog | 不支持 | 支持 | 手动 | +| [Hive/Iceberg/Hudi](./other-olap-to-doris.md#data-lake) | Multi-Catalog | 不支持 | 支持 | 支持 | + +## 选择迁移方法 + +### 基于 Catalog 的迁移(推荐) + +Doris 的 [Multi-Catalog](../lakehouse/lakehouse-overview.md) 功能允许您直接查询外部数据源而无需数据移动。这是推荐的方法,适用于: + +- **初步探索**:在决定迁移策略之前查询源数据 +- **混合查询**:跨 Doris 和外部源进行 JOIN 查询 +- **增量迁移**:在保持源可访问的同时逐步迁移数据 + +```sql +-- 创建 Catalog 连接到您的源 +CREATE CATALOG pg_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'username', + 'password' = 'password', + 'jdbc_url' = 'jdbc:postgresql://host:5432/database', + 'driver_url' = 'postgresql-42.5.6.jar', + 'driver_class' = 'org.postgresql.Driver' +); + +-- 直接查询源数据 +SELECT * FROM pg_catalog.schema_name.table_name LIMIT 10; + +-- 使用 INSERT INTO SELECT 迁移数据 +INSERT INTO doris_db.doris_table +SELECT * FROM pg_catalog.schema_name.source_table; +``` + +### Flink CDC(实时同步) + +[Flink CDC](../ecosystem/flink-doris-connector.md) 适用于: + +- **实时数据同步**:实时捕获变更 +- **全库迁移**:自动建表同步整个数据库 +- **零停机迁移**:在迁移过程中保持源和 Doris 同步 + +### 导出-导入方法 + +对于直接连接受限的场景: + +1. 从源系统导出数据到文件(CSV、Parquet、JSON) +2. 将文件存储到对象存储(S3、GCS、HDFS) +3. 使用 [S3 Load](../data-operate/import/data-source/amazon-s3.md) 或 [Broker Load](../data-operate/import/import-way/broker-load-manual.md) 加载到 Doris + +## 迁移规划清单 + +迁移前,请考虑以下事项: + +1. **数据量评估** + - 总数据大小和行数 + - 每日/每小时数据增长率 + - 历史数据保留要求 + +2. **Schema 设计** + - 选择合适的[数据模型](../table-design/data-model/overview.md)(Duplicate、Unique、Aggregate) + - 规划[分区](../table-design/data-partitioning/data-distribution.md)策略 + - 定义[分桶](../table-design/data-partitioning/data-bucketing.md)键 + +3. **数据类型映射** + - 检查类型兼容性(参见各迁移指南的具体映射) + - 处理特殊类型(数组、JSON、带时区的时间戳) + +4. **性能要求** + - 查询延迟预期 + - 并发查询负载 + - 数据新鲜度要求 + +5. **迁移窗口** + - 可接受的停机时间(如果有) + - 同步与异步迁移需求 + +## 最佳实践 + +### 从试点表开始 + +在迁移整个数据库之前,先用一个代表性的表进行测试: + +```sql +-- 1. 创建具有适当 schema 的 Doris 表 +CREATE TABLE pilot_table ( + id INT, + created_at DATETIME, + data VARCHAR(255) +) +UNIQUE KEY(id) +DISTRIBUTED BY HASH(id) BUCKETS 8; + +-- 2. 迁移数据 +INSERT INTO pilot_table +SELECT id, created_at, data +FROM source_catalog.db.source_table; + +-- 3. 验证行数 +SELECT COUNT(*) FROM pilot_table; +SELECT COUNT(*) FROM source_catalog.db.source_table; +``` + +### 批量大规模迁移 + +对于数十亿行的表,分批迁移: + +```sql +-- 按日期范围迁移 +INSERT INTO doris_table +SELECT * FROM source_catalog.db.source_table +WHERE created_at >= '2024-01-01' AND created_at < '2024-02-01'; +``` + +### 监控迁移进度 + +使用以下命令跟踪加载任务: + +```sql +-- 检查活动的加载任务 +SHOW LOAD WHERE STATE = 'LOADING'; + +-- 检查最近的加载历史 +SHOW LOAD ORDER BY CreateTime DESC LIMIT 10; +``` + +## 下一步 + +选择您的源系统查看详细的迁移说明: + +- [PostgreSQL 迁移到 Doris](./postgresql-to-doris.md) +- [MySQL 迁移到 Doris](./mysql-to-doris.md) +- [Elasticsearch 迁移到 Doris](./elasticsearch-to-doris.md) +- [其他 OLAP 系统迁移到 Doris](./other-olap-to-doris.md) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/postgresql-to-doris.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/postgresql-to-doris.md new file mode 100644 index 00000000000..c4406c90ca6 --- /dev/null +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/migration/postgresql-to-doris.md @@ -0,0 +1,421 @@ +--- +{ + "title": "PostgreSQL 迁移到 Doris", + "language": "zh-CN", + "description": "从 PostgreSQL 迁移数据到 Apache Doris 的完整指南" +} +--- + +本指南介绍如何将数据从 PostgreSQL 迁移到 Apache Doris。您可以根据实时同步需求、数据量和运维复杂度选择多种迁移方法。 + +## 注意事项 + +1. **Schema 设计**:迁移前,选择合适的 Doris [数据模型](../table-design/data-model/overview.md)并规划您的[分区](../table-design/data-partitioning/data-distribution.md)和[分桶](../table-design/data-partitioning/data-bucketing.md)策略。 + +2. **数据类型**:查看下面的类型映射表。某些 PostgreSQL 类型需要特殊处理(数组、带时区的时间戳、JSON)。 + +3. **主键**:PostgreSQL 的 serial/identity 列映射到 Doris 的 INT/BIGINT 类型。对于唯一约束,使用 Doris 的 UNIQUE KEY 模型。 + +## 数据类型映射 + +| PostgreSQL 类型 | Doris 类型 | 说明 | +|-----------------|------------|------| +| boolean | BOOLEAN | | +| smallint / int2 | SMALLINT | | +| integer / int4 | INT | | +| bigint / int8 | BIGINT | | +| decimal / numeric | DECIMAL(P,S) | 无精度的 Numeric 映射为 STRING | +| real / float4 | FLOAT | | +| double precision | DOUBLE | | +| smallserial | SMALLINT | | +| serial | INT | | +| bigserial | BIGINT | | +| char(n) | CHAR(N) | | +| varchar / text | STRING | | +| timestamp | DATETIME | | +| timestamptz | DATETIME | 转换为本地时区;参见[时区问题](#处理时区问题) | +| date | DATE | | +| time | STRING | Doris 不支持 TIME 类型 | +| interval | STRING | | +| json / jsonb | JSON 或 STRING | 使用 STRING 可获得更好的查询性能 | +| uuid | STRING | | +| bytea | STRING | | +| array | ARRAY | 参见[处理数组](#处理数组) | +| inet / cidr / macaddr | STRING | | +| point / line / polygon | STRING | 几何类型存储为字符串 | + +## 迁移选项 + +### 选项 1:JDBC Catalog(推荐) + +JDBC Catalog 提供从 Doris 直接访问 PostgreSQL 数据的能力。这是查询和迁移数据最简单的方法。 + +#### 前提条件 + +- PostgreSQL 11.x 或更高版本 +- [PostgreSQL JDBC 驱动](https://jdbc.postgresql.org/) 42.5.x 或更高版本 +- Doris FE/BE 节点与 PostgreSQL 之间的网络连接(端口 5432) + +#### 步骤 1:下载并部署 JDBC 驱动 + +```bash +# 下载驱动 +wget https://jdbc.postgresql.org/download/postgresql-42.5.6.jar + +# 复制到 Doris FE 和 BE 的 jdbc_drivers 目录 +cp postgresql-42.5.6.jar $DORIS_HOME/fe/jdbc_drivers/ +cp postgresql-42.5.6.jar $DORIS_HOME/be/jdbc_drivers/ +``` + +#### 步骤 2:创建 PostgreSQL Catalog + +```sql +CREATE CATALOG pg_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'postgres_user', + 'password' = 'postgres_password', + 'jdbc_url' = 'jdbc:postgresql://pg-host:5432/database_name', + 'driver_url' = 'postgresql-42.5.6.jar', + 'driver_class' = 'org.postgresql.Driver' +); +``` + +对于 SSL 连接: + +```sql +CREATE CATALOG pg_catalog PROPERTIES ( + 'type' = 'jdbc', + 'user' = 'postgres_user', + 'password' = 'postgres_password', + 'jdbc_url' = 'jdbc:postgresql://pg-host:5432/database_name?ssl=true&sslmode=require', + 'driver_url' = 'postgresql-42.5.6.jar', + 'driver_class' = 'org.postgresql.Driver' +); +``` + +#### 步骤 3:探索源数据 + +```sql +-- 切换到 catalog +SWITCH pg_catalog; + +-- 列出可用的 schema(Doris 中的数据库) +SHOW DATABASES; + +-- 使用某个 schema +USE public; + +-- 列出表 +SHOW TABLES; + +-- 预览数据 +SELECT * FROM source_table LIMIT 10; + +-- 检查行数 +SELECT COUNT(*) FROM source_table; +``` + +#### 步骤 4:创建 Doris 目标表 + +```sql +-- 切换回内部 catalog +SWITCH internal; +USE target_db; + +-- 根据源 schema 创建表 +CREATE TABLE orders ( + order_id INT, + customer_id INT, + order_date DATE NOT NULL, + total_amount DECIMAL(10, 2), + status VARCHAR(32), + created_at DATETIME +) +UNIQUE KEY(order_id, order_date) +PARTITION BY RANGE(order_date) () +DISTRIBUTED BY HASH(order_id) BUCKETS 16 +PROPERTIES ( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "MONTH", + "dynamic_partition.start" = "-12", + "dynamic_partition.end" = "3", + "dynamic_partition.prefix" = "p", + "dynamic_partition.buckets" = "16", + "replication_num" = "3" +); +``` + +#### 步骤 5:迁移数据 + +对于中小型表: + +```sql +INSERT INTO internal.target_db.orders +SELECT + order_id, + customer_id, + order_date, + total_amount, + status, + created_at +FROM pg_catalog.public.orders; +``` + +对于大表,分批迁移: + +```sql +-- 按日期范围分批 +INSERT INTO internal.target_db.orders +SELECT * FROM pg_catalog.public.orders +WHERE order_date >= '2024-01-01' AND order_date < '2024-04-01'; + +INSERT INTO internal.target_db.orders +SELECT * FROM pg_catalog.public.orders +WHERE order_date >= '2024-04-01' AND order_date < '2024-07-01'; +``` + +#### 步骤 6:验证迁移 + +```sql +-- 比较行数 +SELECT 'doris' as source, COUNT(*) as cnt FROM internal.target_db.orders +UNION ALL +SELECT 'postgres' as source, COUNT(*) as cnt FROM pg_catalog.public.orders; + +-- 抽查特定记录 +SELECT * FROM internal.target_db.orders WHERE order_id = 12345; +SELECT * FROM pg_catalog.public.orders WHERE order_id = 12345; +``` + +### 选项 2:Flink CDC(实时同步) + +Flink CDC 从 PostgreSQL WAL(预写日志)捕获变更并实时流式传输到 Doris。这适用于持续同步场景。 + +#### 前提条件 + +- 启用逻辑复制的 PostgreSQL(`wal_level = logical`) +- Flink 1.15+ 配合 Flink CDC 和 Flink Doris Connector +- PostgreSQL 中的复制槽 + +#### 步骤 1:配置 PostgreSQL + +确保 `postgresql.conf` 中有以下设置: + +```properties +wal_level = logical +max_replication_slots = 10 +max_wal_senders = 10 +``` + +创建复制用户并授予权限: + +```sql +-- 创建具有复制权限的用户 +CREATE USER flink_cdc WITH REPLICATION PASSWORD 'password'; + +-- 授予表访问权限 +GRANT SELECT ON ALL TABLES IN SCHEMA public TO flink_cdc; +GRANT USAGE ON SCHEMA public TO flink_cdc; +``` + +#### 步骤 2:创建 Flink CDC 任务 + +使用 Flink SQL: + +```sql +-- 源:PostgreSQL CDC +CREATE TABLE pg_orders ( + order_id INT, + customer_id INT, + order_date DATE, + total_amount DECIMAL(10, 2), + status STRING, + created_at TIMESTAMP(3), + PRIMARY KEY (order_id) NOT ENFORCED +) WITH ( + 'connector' = 'postgres-cdc', + 'hostname' = 'pg-host', + 'port' = '5432', + 'username' = 'flink_cdc', + 'password' = 'password', + 'database-name' = 'source_db', + 'schema-name' = 'public', + 'table-name' = 'orders', + 'slot.name' = 'flink_slot', + 'decoding.plugin.name' = 'pgoutput' +); + +-- 目标:Doris +CREATE TABLE doris_orders ( + order_id INT, + customer_id INT, + order_date DATE, + total_amount DECIMAL(10, 2), + status STRING, + created_at DATETIME +) WITH ( + 'connector' = 'doris', + 'fenodes' = 'doris-fe:8030', + 'table.identifier' = 'target_db.orders', + 'username' = 'doris_user', + 'password' = 'doris_password', + 'sink.enable-2pc' = 'true', + 'sink.label-prefix' = 'pg_orders_sync' +); + +-- 开始同步 +INSERT INTO doris_orders SELECT * FROM pg_orders; +``` + +### 选项 3:导出和加载 + +适用于网络隔离环境或无法直接连接的情况。 + +#### 步骤 1:从 PostgreSQL 导出 + +```bash +# 导出为 CSV +psql -h pg-host -U user -d database -c "\COPY orders TO '/tmp/orders.csv' WITH CSV HEADER" + +# 使用 DuckDB 或 pandas 导出为 Parquet +duckdb -c "COPY (SELECT * FROM postgres_scan('postgresql://user:pass@host/db', 'public', 'orders')) TO '/tmp/orders.parquet'" +``` + +#### 步骤 2:上传到对象存储 + +```bash +# 上传到 S3 +aws s3 cp /tmp/orders.parquet s3://bucket/migration/orders.parquet + +# 或上传到 HDFS +hdfs dfs -put /tmp/orders.parquet /migration/orders.parquet +``` + +#### 步骤 3:加载到 Doris + +```sql +-- 使用 S3 Load +LOAD LABEL orders_migration +( + DATA INFILE("s3://bucket/migration/orders.parquet") + INTO TABLE orders + FORMAT AS "parquet" +) +WITH S3 ( + "provider" = "AWS", + "s3.endpoint" = "s3.amazonaws.com", + "s3.region" = "us-east-1", + "s3.access_key" = "your_ak", + "s3.secret_key" = "your_sk" +); + +-- 检查加载状态 +SHOW LOAD WHERE LABEL = "orders_migration"\G +``` + +## 处理常见问题 + +### 处理时区问题 + +PostgreSQL `timestamptz` 以 UTC 存储时间戳并在读取时转换为会话时区。Doris `DATETIME` 不携带时区信息。 + +**建议**:迁移时显式转换时间戳: + +```sql +-- 在 PostgreSQL 查询中转换为特定时区 +INSERT INTO doris_table +SELECT + id, + created_at AT TIME ZONE 'UTC' as created_at_utc +FROM pg_catalog.schema.table; +``` + +同时确保 Doris BE 中 JVM 时区一致,在 `be.conf` 中设置: + +```properties +JAVA_OPTS="-Duser.timezone=UTC ..." +``` + +### 处理数组 + +PostgreSQL 数组映射到 Doris ARRAY 类型,但维度检测需要现有数据: + +```sql +-- PostgreSQL 源表 +CREATE TABLE pg_table ( + id INT, + tags TEXT[] +); + +-- Doris 目标表 +CREATE TABLE doris_table ( + id INT, + tags ARRAY<STRING> +) +DISTRIBUTED BY HASH(id) BUCKETS 8; +``` + +如果无法确定数组维度,显式转换: + +```sql +INSERT INTO doris_table +SELECT + id, + CAST(tags AS ARRAY<STRING>) +FROM pg_catalog.schema.pg_table; +``` + +### 处理 JSON/JSONB + +对于复杂的 JSON 查询,映射到 Doris STRING 并使用 JSON 函数: + +```sql +-- 查询 JSON 字段 +SELECT + id, + JSON_EXTRACT(json_col, '$.name') as name, + JSON_EXTRACT(json_col, '$.address.city') as city +FROM table_name; +``` + +### 大表迁移 + +对于数亿行的表: + +1. **分区迁移**:按时间范围或 ID 范围迁移 +2. **增加并行度**:同时运行多个 INSERT 语句 +3. **监控资源**:检查 Doris BE 内存和磁盘使用情况 + +```sql +-- 并行迁移脚本(并发运行) +-- 会话 1 +INSERT INTO orders SELECT * FROM pg_catalog.public.orders +WHERE order_id BETWEEN 0 AND 10000000; + +-- 会话 2 +INSERT INTO orders SELECT * FROM pg_catalog.public.orders +WHERE order_id BETWEEN 10000001 AND 20000000; +``` + +## 验证清单 + +迁移后,验证: + +- [ ] 源和目标的行数匹配 +- [ ] 样本记录相同 +- [ ] NULL 值正确保留 +- [ ] 数值精度保持 +- [ ] 日期/时间值正确(检查时区) +- [ ] 数组和 JSON 字段可查询 + +```sql +-- 综合验证查询 +SELECT + 'rows' as check_type, + CASE WHEN s.cnt = t.cnt THEN 'PASS' ELSE 'FAIL' END as result, + s.cnt as source_count, + t.cnt as target_count +FROM + (SELECT COUNT(*) cnt FROM pg_catalog.public.orders) s, + (SELECT COUNT(*) cnt FROM internal.target_db.orders) t; +``` diff --git a/sidebars.ts b/sidebars.ts index cfce6a2f6bf..c70e5b3c84f 100644 --- a/sidebars.ts +++ b/sidebars.ts @@ -20,6 +20,18 @@ const sidebars: SidebarsConfig = { }, ], }, + { + type: 'category', + label: 'Migration', + collapsed: false, + items: [ + 'migration/overview', + 'migration/postgresql-to-doris', + 'migration/mysql-to-doris', + 'migration/elasticsearch-to-doris', + 'migration/other-olap-to-doris', + ], + }, { type: 'category', label: 'Guides', --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
