hailin0 commented on code in PR #5581:
URL: https://github.com/apache/seatunnel/pull/5581#discussion_r1366679206


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java:
##########
@@ -67,7 +67,7 @@ public interface JdbcOptions {
     Option<Integer> FETCH_SIZE =
             Options.key("fetch_size")
                     .intType()
-                    .defaultValue(0)
+                    .defaultValue(1024)
                     .withDescription(
                             "For queries that return a large number of 
objects, "

Review Comment:
   fixed



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java:
##########
@@ -61,91 +53,48 @@ public class JdbcSource
     protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
 
     private JdbcSourceConfig jdbcSourceConfig;
-    private SeaTunnelRowType typeInfo;
-
-    private JdbcDialect jdbcDialect;
-    private JdbcInputFormat inputFormat;
-    private PartitionParameter partitionParameter;
-    private JdbcConnectionProvider jdbcConnectionProvider;
-
-    private String query;
+    private Map<TablePath, JdbcSourceTable> jdbcSourceTables;
 
-    public JdbcSource(
-            JdbcSourceConfig jdbcSourceConfig,
-            SeaTunnelRowType typeInfo,
-            JdbcDialect jdbcDialect,
-            JdbcInputFormat inputFormat,
-            PartitionParameter partitionParameter,
-            JdbcConnectionProvider jdbcConnectionProvider,
-            String query) {
+    @SneakyThrows
+    public JdbcSource(JdbcSourceConfig jdbcSourceConfig) {
         this.jdbcSourceConfig = jdbcSourceConfig;
-        this.typeInfo = typeInfo;
-        this.jdbcDialect = jdbcDialect;
-        this.inputFormat = inputFormat;
-        this.partitionParameter = partitionParameter;
-        this.jdbcConnectionProvider = jdbcConnectionProvider;
-        this.query = query;
+        this.jdbcSourceTables =
+                JdbcCatalogUtils.getTables(
+                        jdbcSourceConfig.getJdbcConnectionConfig(),
+                        jdbcSourceConfig.getTableConfigList());
     }
 
     @Override
     public String getPluginName() {
         return "Jdbc";
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        ReadonlyConfig config = ReadonlyConfig.fromConfig(pluginConfig);
-        ConfigValidator.of(config).validate(new 
JdbcSourceFactory().optionRule());
-        this.jdbcSourceConfig = JdbcSourceConfig.of(config);
-        this.jdbcDialect =
-                JdbcDialectLoader.load(
-                        jdbcSourceConfig.getJdbcConnectionConfig().getUrl(),
-                        
jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode());
-        this.jdbcDialect.connectionUrlParse(
-                jdbcSourceConfig.getJdbcConnectionConfig().getUrl(),
-                jdbcSourceConfig.getJdbcConnectionConfig().getProperties(),
-                this.jdbcDialect.defaultParameter());
-        this.jdbcConnectionProvider =
-                new 
SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
-        this.query = jdbcSourceConfig.getQuery();
-        try (Connection connection = 
jdbcConnectionProvider.getOrEstablishConnection()) {
-            this.typeInfo = initTableField(connection);
-            this.partitionParameter =
-                    
createPartitionParameter(jdbcConnectionProvider.getOrEstablishConnection());
-        } catch (Exception e) {
-            throw new PrepareFailException("jdbc", PluginType.SOURCE, 
e.toString());
-        }
-
-        if (partitionParameter != null) {
-            this.query =
-                    JdbcSourceFactory.obtainPartitionSql(
-                            jdbcDialect, partitionParameter, 
jdbcSourceConfig.getQuery());
-        }
-
-        this.inputFormat =
-                new JdbcInputFormat(
-                        jdbcConnectionProvider,
-                        jdbcDialect,
-                        typeInfo,
-                        query,
-                        jdbcSourceConfig.getFetchSize(),
-                        
jdbcSourceConfig.getJdbcConnectionConfig().isAutoCommit());
-    }
-
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {

Review Comment:
   revert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to