This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fa84ba4 Try to extract sharding columns from target config if it's
available. (#7660)
fa84ba4 is described below
commit fa84ba463e8c4b752149e7e379b3edf0fa9a1cb8
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Tue Sep 29 17:45:17 2020 +0800
Try to extract sharding columns from target config if it's available.
(#7660)
* Update usage document.
* Try to extract sharding columns from target config if it's available.
* Fix NPE bug in DataSourceConfiguration.getDatabaseType().
* add @Override
* add log
* flip ==
Co-authored-by: qiulu3 <Lucas209910>
---
.../user-manual/shardingsphere-scaling/usage.cn.md | 4 ++--
.../user-manual/shardingsphere-scaling/usage.en.md | 4 ++--
.../core/config/JDBCDataSourceConfiguration.java | 10 ++++++++-
.../config/ShardingSphereJDBCConfiguration.java | 12 +++++++++--
.../core/schedule/ScalingTaskScheduler.java | 2 ++
.../scaling/core/utils/SyncConfigurationUtil.java | 25 ++++++++++++++++++----
6 files changed, 46 insertions(+), 11 deletions(-)
diff --git
a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
index bca4f70..4a2d728 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.cn.md
@@ -80,7 +80,7 @@ type = jdbc
| 参数 | 描述
|
| ------------------------------------------------- |
------------------------------------------------------------ |
| name | jdbc 名称
|
-| ruleConfiguration.targetDataSources.url | jdbc 连接
|
+| ruleConfiguration.targetDataSources.jdbcUrl | jdbc 连接
|
| ruleConfiguration.targetDataSources.username | jdbc 用户
|
| ruleConfiguration.targetDataSources.password | jdbc 密码
|
@@ -148,7 +148,7 @@ curl -X POST \
"parameter": {
"username": "root",
"password": "root",
- "url":
"jdbc:mysql://127.0.0.1:3307/sharding_db?serverTimezone=UTC&useSSL=false"
+ "jdbcUrl":
"jdbc:mysql://127.0.0.1:3307/sharding_db?serverTimezone=UTC&useSSL=false"
}
}
},
diff --git
a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
index 2d6f586..b9c5cdd 100644
--- a/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
+++ b/docs/document/content/user-manual/shardingsphere-scaling/usage.en.md
@@ -80,7 +80,7 @@ type = jdbc
| Parameter | Describe
|
| ------------------------------------------------- |
------------------------------------------------------------ |
| name | jdbc name
|
-| ruleConfiguration.targetDataSources.url | jdbc url
|
+| ruleConfiguration.targetDataSources.jdbcUrl | jdbc url
|
| ruleConfiguration.targetDataSources.username | jdbc username
|
| ruleConfiguration.targetDataSources.password | jdbc password
|
@@ -148,7 +148,7 @@ curl -X POST \
"parameter": {
"username": "root",
"password": "root",
- "url":
"jdbc:mysql://127.0.0.1:3307/sharding_db?serverTimezone=UTC&useSSL=false"
+ "jdbcUrl":
"jdbc:mysql://127.0.0.1:3307/sharding_db?serverTimezone=UTC&useSSL=false"
}
}
},
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JDBCDataSourceConfiguration.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JDBCDataSourceConfiguration.java
index ce21f7e..905634f 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JDBCDataSourceConfiguration.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JDBCDataSourceConfiguration.java
@@ -43,6 +43,14 @@ public final class JDBCDataSourceConfiguration implements
DataSourceConfiguratio
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
- databaseType = DatabaseTypes.getDatabaseTypeByURL(jdbcUrl);
+ databaseType = getDatabaseType();
+ }
+
+ @Override
+ public DatabaseType getDatabaseType() {
+ if (null == databaseType) {
+ databaseType = DatabaseTypes.getDatabaseTypeByURL(jdbcUrl);
+ }
+ return databaseType;
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ShardingSphereJDBCConfiguration.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ShardingSphereJDBCConfiguration.java
index 9f8a81f..3054a4f 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ShardingSphereJDBCConfiguration.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/ShardingSphereJDBCConfiguration.java
@@ -40,7 +40,15 @@ public class ShardingSphereJDBCConfiguration implements
DataSourceConfiguration
public ShardingSphereJDBCConfiguration(final String dataSource, final
String rule) {
this.dataSource = dataSource;
this.rule = rule;
- Map<String,
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration>
sourceDataSource =
ConfigurationYamlConverter.loadDataSourceConfigurations(dataSource);
- this.databaseType =
DatabaseTypes.getDatabaseTypeByURL(sourceDataSource.values().iterator().next().getProps().get("jdbcUrl").toString());
+ this.databaseType = getDatabaseType();
+ }
+
+ @Override
+ public DatabaseType getDatabaseType() {
+ if (null == databaseType) {
+ Map<String,
org.apache.shardingsphere.infra.config.datasource.DataSourceConfiguration>
sourceDataSource =
ConfigurationYamlConverter.loadDataSourceConfigurations(dataSource);
+ databaseType =
DatabaseTypes.getDatabaseTypeByURL(sourceDataSource.values().iterator().next().getProps().get("jdbcUrl").toString());
+ }
+ return databaseType;
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
index 461b428..2cf4e31 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/schedule/ScalingTaskScheduler.java
@@ -88,6 +88,7 @@ public final class ScalingTaskScheduler implements Runnable {
@Override
public void onFailure(final Throwable throwable) {
+ log.error("Inventory task execute failed.", throwable);
stop();
shardingScalingJob.setStatus(SyncTaskControlStatus.MIGRATE_INVENTORY_DATA_FAILURE.name());
}
@@ -117,6 +118,7 @@ public final class ScalingTaskScheduler implements Runnable
{
@Override
public void onFailure(final Throwable throwable) {
+ log.error("Incremental task execute failed.", throwable);
stop();
shardingScalingJob.setStatus(SyncTaskControlStatus.SYNCHRONIZE_INCREMENTAL_DATA_FAILURE.name());
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
index 3c84d07..7bf1e20 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.utils;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.AccessLevel;
@@ -45,6 +46,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
/**
@@ -61,20 +63,35 @@ public final class SyncConfigurationUtil {
*/
public static Collection<SyncConfiguration> toSyncConfigurations(final
ScalingConfiguration scalingConfiguration) {
Collection<SyncConfiguration> result = new LinkedList<>();
- ShardingSphereJDBCConfiguration shardingSphereJDBCConfiguration =
(ShardingSphereJDBCConfiguration)
scalingConfiguration.getRuleConfiguration().getSource().toTypedDataSourceConfiguration();
- Map<String, DataSourceConfiguration> sourceDataSource =
ConfigurationYamlConverter.loadDataSourceConfigurations(shardingSphereJDBCConfiguration.getDataSource());
- ShardingRuleConfiguration sourceRule =
ConfigurationYamlConverter.loadShardingRuleConfiguration(shardingSphereJDBCConfiguration.getRule());
+ ShardingSphereJDBCConfiguration sourceConfiguration =
getSourceConfiguration(scalingConfiguration);
+ Map<String, DataSourceConfiguration> sourceDataSource =
ConfigurationYamlConverter.loadDataSourceConfigurations(sourceConfiguration.getDataSource());
+ ShardingRuleConfiguration sourceRule =
ConfigurationYamlConverter.loadShardingRuleConfiguration(sourceConfiguration.getRule());
Map<String, Map<String, String>> dataSourceTableNameMap =
toDataSourceTableNameMap(sourceRule, sourceDataSource.keySet());
+ Optional<ShardingRuleConfiguration> targetRule =
getTargetRuleConfiguration(scalingConfiguration);
filterByShardingDataSourceTables(dataSourceTableNameMap,
scalingConfiguration.getJobConfiguration());
for (Entry<String, Map<String, String>> entry :
dataSourceTableNameMap.entrySet()) {
DumperConfiguration dumperConfiguration =
createDumperConfiguration(entry.getKey(), sourceDataSource.get(entry.getKey()),
entry.getValue());
- ImporterConfiguration importerConfiguration =
createImporterConfiguration(scalingConfiguration, sourceRule);
+ ImporterConfiguration importerConfiguration =
createImporterConfiguration(scalingConfiguration,
targetRule.orElse(sourceRule));
importerConfiguration.setRetryTimes(scalingConfiguration.getJobConfiguration().getRetryTimes());
result.add(new
SyncConfiguration(scalingConfiguration.getJobConfiguration().getConcurrency(),
dumperConfiguration, importerConfiguration));
}
return result;
}
+ private static ShardingSphereJDBCConfiguration
getSourceConfiguration(final ScalingConfiguration scalingConfiguration) {
+ org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration
dataSourceConfiguration =
scalingConfiguration.getRuleConfiguration().getSource().toTypedDataSourceConfiguration();
+ Preconditions.checkArgument(dataSourceConfiguration instanceof
ShardingSphereJDBCConfiguration, "Only support ShardingSphere source data
source.");
+ return (ShardingSphereJDBCConfiguration) dataSourceConfiguration;
+ }
+
+ private static Optional<ShardingRuleConfiguration>
getTargetRuleConfiguration(final ScalingConfiguration scalingConfiguration) {
+ org.apache.shardingsphere.scaling.core.config.DataSourceConfiguration
dataSourceConfiguration =
scalingConfiguration.getRuleConfiguration().getTarget().toTypedDataSourceConfiguration();
+ if (dataSourceConfiguration instanceof
ShardingSphereJDBCConfiguration) {
+ return
Optional.of(ConfigurationYamlConverter.loadShardingRuleConfiguration(((ShardingSphereJDBCConfiguration)
dataSourceConfiguration).getRule()));
+ }
+ return Optional.empty();
+ }
+
private static void filterByShardingDataSourceTables(final Map<String,
Map<String, String>> dataSourceTableNameMap, final JobConfiguration
jobConfiguration) {
if (null == jobConfiguration.getShardingTables()) {
return;