Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-04-08 Thread via GitHub


RocMarshal commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2042712930

   Thanks everybody related for  the pr. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-04-08 Thread via GitHub


boring-cyborg[bot] commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2042575379

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-04-08 Thread via GitHub


snuyanzin merged PR #78:
URL: https://github.com/apache/flink-connector-jdbc/pull/78


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-04-08 Thread via GitHub


snuyanzin commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2042573923

   thanks @RocMarshal for the hard work 
   thanks a lot everyone for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-04-08 Thread via GitHub


Jiabao-Sun commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1448844127


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceOptions.java:
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+
+/** JDBC source options. */
+public class JdbcSourceOptions implements Serializable {
+
+private JdbcSourceOptions() {}
+
+public static final ConfigOption READER_FETCH_BATCH_SIZE =
+ConfigOptions.key("connectors.jdbc.split-reader.fetch-batch-size")
+.intType()
+.defaultValue(1024);
+public static final ConfigOption RESULTSET_TYPE =
+ConfigOptions.key("connectors.jdbc.split-reader.resultSetType")
+.intType()
+.defaultValue(ResultSet.TYPE_FORWARD_ONLY);
+public static final ConfigOption RESULTSET_CONCURRENCY =
+
ConfigOptions.key("connectors.jdbc.split-reader.resultSetConcurrency")
+.intType()
+.defaultValue(ResultSet.CONCUR_READ_ONLY);
+public static final ConfigOption RESULTSET_FETCH_SIZE =
+
ConfigOptions.key("connectors.jdbc.split-reader.resultSetFetchSize")
+.intType()
+.defaultValue(0);
+public static final ConfigOption AUTO_COMMIT =
+ConfigOptions.key("connectors.jdbc.split-reader.autoCommit")
+.booleanType()
+.defaultValue(true);

Review Comment:
   How about to use lowercase hyphenated form for the configuration name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-04-08 Thread via GitHub


RocMarshal commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2042297264

   Thank you @eskabetxe @caicancai very much for the review.
   CC @snuyanzin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-03-26 Thread via GitHub


snuyanzin commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2020733326

   @davidradl @eskabetxe could you please have another look here?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-03-22 Thread via GitHub


RocMarshal commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2016325794

   Thank you @snuyanzin  for the review.
   I made some change based on your comments~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-03-22 Thread via GitHub


RocMarshal commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1536549280


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
+
+/**
+ * The JDBC source reader to read data from jdbc splits.
+ *
+ * @param  The type of the record read from the source.
+ */
+public class JdbcSourceSplitReader
+implements SplitReader, JdbcSourceSplit>, 
ResultTypeQueryable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceSplitReader.class);
+
+private final Configuration config;
+@Nullable private JdbcSourceSplit currentSplit;
+private final Queue splits;
+private final TypeInformation typeInformation;
+private final JdbcConnectionProvider connectionProvider;
+private transient Connection connection;
+private transient PreparedStatement statement;
+private transient ResultSet resultSet;
+
+private final ResultExtractor resultExtractor;
+protected boolean hasNextRecordCurrentSplit;
+private final DeliveryGuarantee deliveryGuarantee;
+
+private final int splitReaderFetchBatchSize;
+
+private final int resultSetType;
+private final int resultSetConcurrency;
+private final int resultSetFetchSize;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+private final Boolean autoCommit;
+private int currentSplitOffset;
+
+private final SourceReaderContext context;
+
+public JdbcSourceSplitReader(
+SourceReaderContext context,
+Configuration config,
+TypeInformation typeInformation,
+JdbcConnectionProvider connectionProvider,
+DeliveryGuarantee deliveryGuarantee,
+ResultExtractor resultExtractor) {
+this.context = Preconditions.checkNotNull(context);
+this.config = Preconditions.checkNotNull(config);
+this.typeInformation = Preconditions.checkNotNull(typeInformation);
+this.connectionProvider = 
Preconditions.checkNotNull(connectionProvider);
+this.resultSetType = config.getInteger(RESULTSET_TYPE);
+

Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-03-22 Thread via GitHub


snuyanzin commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1535501299


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java:
##
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source.reader;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
+
+/**
+ * The JDBC source reader to read data from jdbc splits.
+ *
+ * @param  The type of the record read from the source.
+ */
+public class JdbcSourceSplitReader
+implements SplitReader, JdbcSourceSplit>, 
ResultTypeQueryable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceSplitReader.class);
+
+private final Configuration config;
+@Nullable private JdbcSourceSplit currentSplit;
+private final Queue splits;
+private final TypeInformation typeInformation;
+private final JdbcConnectionProvider connectionProvider;
+private transient Connection connection;
+private transient PreparedStatement statement;
+private transient ResultSet resultSet;
+
+private final ResultExtractor resultExtractor;
+protected boolean hasNextRecordCurrentSplit;
+private final DeliveryGuarantee deliveryGuarantee;
+
+private final int splitReaderFetchBatchSize;
+
+private final int resultSetType;
+private final int resultSetConcurrency;
+private final int resultSetFetchSize;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+private final Boolean autoCommit;
+private int currentSplitOffset;
+
+private final SourceReaderContext context;
+
+public JdbcSourceSplitReader(
+SourceReaderContext context,
+Configuration config,
+TypeInformation typeInformation,
+JdbcConnectionProvider connectionProvider,
+DeliveryGuarantee deliveryGuarantee,
+ResultExtractor resultExtractor) {
+this.context = Preconditions.checkNotNull(context);
+this.config = Preconditions.checkNotNull(config);
+this.typeInformation = Preconditions.checkNotNull(typeInformation);
+this.connectionProvider = 
Preconditions.checkNotNull(connectionProvider);
+this.resultSetType = config.getInteger(RESULTSET_TYPE);
+

Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-03-22 Thread via GitHub


snuyanzin commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1535498978


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java:
##
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
+
+/**
+ * A tool is used to build {@link JdbcSource} quickly.
+ *
+ * 
+ * JdbcSourceRow> source = JdbcSource.Row>builder()
+ *   .setSql(validSql)
+ *   .setResultExtractor(new RowResultExtractor())
+ *   .setDBUrl(dbUrl)
+ *   .setDriverName(driverName)
+ *   .setTypeInformation(new TypeHintRow>() {}.getTypeInfo())
+ *   .setPassword(password)
+ *   .setUsername(username)
+ *   .build();
+ * 
+ *
+ * In order to query the JDBC source in parallel, you need to provide a 
parameterized query
+ * template (i.e. a valid {@link PreparedStatement}) and a {@link 
JdbcParameterValuesProvider} which
+ * provides binding values for the query parameters. E.g.:
+ *
+ * 
+ *
+ * Serializable[][] queryParameters = new String[2][1];
+ * queryParameters[0] = new String[]{"Kumar"};
+ * queryParameters[1] = new String[]{"Tan Ah Teck"};
+ *
+ * JdbcSourceRow> jdbcSource =  JdbcSource.Row>builder()
+ *  .setResultExtractor(new RowResultExtractor())
+ *  .setTypeInformation(new TypeHintRow>() {}.getTypeInfo())
+ *  .setPassword(password)
+ *  .setUsername(username)
+ * .setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
+ * .setDBUrl("jdbc:derby:memory:ebookshop")
+ * .setSql("select * from books WHERE author = ?")
+ * .setJdbcParameterValuesProvider(new 
JdbcGenericParameterValuesProvider(queryParameters))
+ *  .build();
+ * 
+ *
+ * @see Row
+ * @see JdbcParameterValuesProvider
+ * @see PreparedStatement
+ * @see DriverManager
+ * @see JdbcSource
+ */
+@PublicEvolving
+public class JdbcSourceBuilder {
+
+public static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceBuilder.class);
+
+private final Configuration configuration;
+
+private int splitReaderFetchBatchSize;
+private int resultSetType;
+private int resultSetConcurrency;
+private int resultSetFetchSize;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+private Boolean autoCommit;
+
+// TODO It would need a builder method to render after introducing 
streaming semantic.

Review Comment:
   should we fix this TODO first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the

Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-03-18 Thread via GitHub


RocMarshal commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2005697292

   Hi, @snuyanzin would you mind helping take a look ? thank you very much~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-04 Thread via GitHub


RocMarshal commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441484531


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
+import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Objects;
+
+/** JDBC source. */
+@PublicEvolving
+public class JdbcSource
+implements Source,
+ResultTypeQueryable {
+
+private final Boundedness boundedness;
+private final TypeInformation typeInformation;
+
+private final Configuration configuration;
+private final JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider;
+
+protected JdbcConnectionProvider connectionProvider;
+private final ResultExtractor resultExtractor;
+private final DeliveryGuarantee deliveryGuarantee;
+
+JdbcSource(
+Configuration configuration,
+JdbcConnectionProvider connectionProvider,
+JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider,
+ResultExtractor resultExtractor,
+@Nullable TypeInformation typeInformation,
+@Nullable DeliveryGuarantee deliveryGuarantee) {

Review Comment:
   Good proposal!  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-04 Thread via GitHub


eskabetxe commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441475081


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
+import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Objects;
+
+/** JDBC source. */
+@PublicEvolving
+public class JdbcSource
+implements Source,
+ResultTypeQueryable {
+
+private final Boundedness boundedness;
+private final TypeInformation typeInformation;
+
+private final Configuration configuration;
+private final JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider;
+
+protected JdbcConnectionProvider connectionProvider;
+private final ResultExtractor resultExtractor;
+private final DeliveryGuarantee deliveryGuarantee;
+
+JdbcSource(
+Configuration configuration,
+JdbcConnectionProvider connectionProvider,
+JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider,
+ResultExtractor resultExtractor,
+@Nullable TypeInformation typeInformation,
+@Nullable DeliveryGuarantee deliveryGuarantee) {

Review Comment:
   The second one (without DeliveryGuarantee) could call the first one, with 
the DeliveryGuarantee.NONE.
this way we could remove the @Nullable, and not duplicate the constructor 
logic 
   
   ```
   JdbcSource(
   Configuration configuration,
   JdbcConnectionProvider connectionProvider,
   JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider,
   ResultExtractor resultExtractor,
   TypeInformation typeInformation) {
   this(configuration,
  connectionProvider, 
  sqlSplitEnumeratorProvider,
  resultExtractor,
  typeInformation,
  DeliveryGuarantee.NONE);
   }
   
   JdbcSource(
   Configuration configuration,
   JdbcConnectionProvider connectionProvider,
   JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider,
   ResultExtractor resultExtractor,
   TypeInformation typeInformation,
   DeliveryGuarantee deliveryGuarantee) {
   this.configuration = 

Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-04 Thread via GitHub


eskabetxe commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441475081


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
+import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Objects;
+
+/** JDBC source. */
+@PublicEvolving
+public class JdbcSource
+implements Source,
+ResultTypeQueryable {
+
+private final Boundedness boundedness;
+private final TypeInformation typeInformation;
+
+private final Configuration configuration;
+private final JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider;
+
+protected JdbcConnectionProvider connectionProvider;
+private final ResultExtractor resultExtractor;
+private final DeliveryGuarantee deliveryGuarantee;
+
+JdbcSource(
+Configuration configuration,
+JdbcConnectionProvider connectionProvider,
+JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider,
+ResultExtractor resultExtractor,
+@Nullable TypeInformation typeInformation,
+@Nullable DeliveryGuarantee deliveryGuarantee) {

Review Comment:
   The second one (without DeliveryGuarantee) could call the first one, with 
the DeliveryGuarantee.NONE.
this way we could remove the @Nullable, and not duplicate the constructor 
logic 
   
   `
   JdbcSource(
   Configuration configuration,
   JdbcConnectionProvider connectionProvider,
   JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider,
   ResultExtractor resultExtractor,
   @Nonnull TypeInformation typeInformation) {
   this(configuration,
  connectionProvider, 
  sqlSplitEnumeratorProvider,
  resultExtractor,
  typeInformation,
  DeliveryGuarantee.NONE);
   }
   
   JdbcSource(
   Configuration configuration,
   JdbcConnectionProvider connectionProvider,
   JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider,
   ResultExtractor resultExtractor,
   @Nonnull TypeInformation typeInformation,
   DeliveryGuarantee deliveryGuarantee) {
   this.configuration = 

Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-03 Thread via GitHub


RocMarshal commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441300360


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source.reader.extractor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * The Extractor to extract the data from {@link ResultSet}.
+ *
+ * @param  The target data type.
+ */
+@PublicEvolving
+public interface ResultExtractor extends Serializable {
+
+/**
+ * Extract the data from the current point line of the result.
+ *
+ * @param resultSet Result set queried from a sql.
+ * @return The data object filled by the current line of the resultSet.
+ * @throws SQLException SQL exception.
+ */
+T extract(ResultSet resultSet) throws SQLException;
+
+/**
+ * The identifier of the extractor.
+ *
+ * @return identifier in {@link String} type.
+ */
+default String identifier() {

Review Comment:
   What do you think if we change the default implementation to this way (and 
remove the implementation/override method of `RowResultExtractor`)
   
   ```
   default String identifier() {
   return this.getClass().getSimpleName();
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-03 Thread via GitHub


RocMarshal commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441300360


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source.reader.extractor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * The Extractor to extract the data from {@link ResultSet}.
+ *
+ * @param  The target data type.
+ */
+@PublicEvolving
+public interface ResultExtractor extends Serializable {
+
+/**
+ * Extract the data from the current point line of the result.
+ *
+ * @param resultSet Result set queried from a sql.
+ * @return The data object filled by the current line of the resultSet.
+ * @throws SQLException SQL exception.
+ */
+T extract(ResultSet resultSet) throws SQLException;
+
+/**
+ * The identifier of the extractor.
+ *
+ * @return identifier in {@link String} type.
+ */
+default String identifier() {

Review Comment:
   What do you think if we change the default implementation to this way (and 
remove the implementation of `RowResultExtractor`)
   
   ```
   default String identifier() {
   return this.getClass().getSimpleName();
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-03 Thread via GitHub


RocMarshal commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-1876318089

   Hi, @eskabetxe Thank you very much for the review. And I left some 
discussion items for your comments.
   
   Please let me know what's your opinion~ :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-03 Thread via GitHub


RocMarshal commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441300360


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source.reader.extractor;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.types.Row;
+
+import java.io.Serializable;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * The Extractor to extract the data from {@link ResultSet}.
+ *
+ * @param  The target data type.
+ */
+@PublicEvolving
+public interface ResultExtractor extends Serializable {
+
+/**
+ * Extract the data from the current point line of the result.
+ *
+ * @param resultSet Result set queried from a sql.
+ * @return The data object filled by the current line of the resultSet.
+ * @throws SQLException SQL exception.
+ */
+T extract(ResultSet resultSet) throws SQLException;
+
+/**
+ * The identifier of the extractor.
+ *
+ * @return identifier in {@link String} type.
+ */
+default String identifier() {

Review Comment:
   What do you think if we change the default implementation to this way
   
   ```
   default String identifier() {
   return this.getClass().getSimpleName();
   }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-03 Thread via GitHub


RocMarshal commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441299767


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java:
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.sql.ResultSet;
+
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
+
+/** A tool is used to build {@link JdbcSource} quickly. */
+@PublicEvolving
+public class JdbcSourceBuilder {
+
+public static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceBuilder.class);
+
+private final Configuration configuration;
+
+private int splitReaderFetchBatchSize;
+private int resultSetType;
+private int resultSetConcurrency;
+private int resultSetFetchSize;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+private Boolean autoCommit;
+
+// TODO It would need a builder method to render after introducing 
streaming semantic.
+private DeliveryGuarantee deliveryGuarantee;
+
+private TypeInformation typeInformation;
+
+private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder 
connOptionsBuilder;
+private String sql;
+private JdbcParameterValuesProvider jdbcParameterValuesProvider;
+private ResultExtractor resultExtractor;
+
+private JdbcConnectionProvider connectionProvider;
+
+JdbcSourceBuilder() {
+this.configuration = new Configuration();
+this.connOptionsBuilder = new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder();
+this.splitReaderFetchBatchSize = 1024;
+this.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
+this.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
+this.deliveryGuarantee = DeliveryGuarantee.NONE;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+this.autoCommit = true;
+}
+
+public JdbcSourceBuilder setSql(@Nonnull String sql) {
+Preconditions.checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(sql), "It's required to 
set the sql.");
+this.sql = sql;
+return this;
+}
+
+public JdbcSourceBuilder setResultExtractor(ResultExtractor 
resultExtractor) {
+this.resultExtractor =
+Preconditions.checkNotNull(resultExtractor, "resultExtractor 
must not be null.");
+return this;
+}
+
+public JdbcSourceBuilder setUsername(String username) {
+Preconditions.checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(username),
+"It's required to set the 'username'.");
+connOptionsBuilder.withUsername(username);
+return this;
+}
+
+public JdbcSourceBuilder setPassword(String password) {
+

Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-03 Thread via GitHub


RocMarshal commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441298453


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
+import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Objects;
+
+/** JDBC source. */
+@PublicEvolving
+public class JdbcSource
+implements Source,
+ResultTypeQueryable {
+
+private final Boundedness boundedness;
+private final TypeInformation typeInformation;
+
+private final Configuration configuration;
+private final JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider;
+
+protected JdbcConnectionProvider connectionProvider;
+private final ResultExtractor resultExtractor;
+private final DeliveryGuarantee deliveryGuarantee;
+
+JdbcSource(
+Configuration configuration,
+JdbcConnectionProvider connectionProvider,
+JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider,
+ResultExtractor resultExtractor,
+@Nullable TypeInformation typeInformation,
+@Nullable DeliveryGuarantee deliveryGuarantee) {

Review Comment:
   Thx for the comment.  

   I have rechecked the logic of this section and come to the following 
conclusion in my limited read:
   
   - `typeInformation ` must not be null, because when `typeInformation ` is 
null, we currently cannot provide true type information through type inference.
   
   - `deliveryGuarantee` could be null.
   
   
   
   So how about retaining the following two construction methods?
   
   
   ```
   
   JdbcSource(
   Configuration configuration,
   JdbcConnectionProvider connectionProvider,
   JdbcSqlSplitEnumeratorBase.Provider 
sqlSplitEnumeratorProvider,
   ResultExtractor resultExtractor,
   @Nonnull TypeInformation typeInformation,
   @Nullable DeliveryGuarantee deliveryGuarantee) {
   this.configuration = Preconditions.checkNotNull(configuration);
   this.connectionProvider = 
Preconditions.checkNotNull(connectionProvider);
   this.sqlSplitEnumeratorProvider = 
Preconditions.checkNotNull(sqlSplitEnumeratorProvider);
   this.resultExtractor = 

Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2024-01-03 Thread via GitHub


eskabetxe commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1440197286


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java:
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.sql.ResultSet;
+
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE;
+import static 
org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE;
+
+/** A tool is used to build {@link JdbcSource} quickly. */
+@PublicEvolving
+public class JdbcSourceBuilder {
+
+public static final Logger LOG = 
LoggerFactory.getLogger(JdbcSourceBuilder.class);
+
+private final Configuration configuration;
+
+private int splitReaderFetchBatchSize;
+private int resultSetType;
+private int resultSetConcurrency;
+private int resultSetFetchSize;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+private Boolean autoCommit;
+
+// TODO It would need a builder method to render after introducing 
streaming semantic.
+private DeliveryGuarantee deliveryGuarantee;
+
+private TypeInformation typeInformation;
+
+private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder 
connOptionsBuilder;
+private String sql;
+private JdbcParameterValuesProvider jdbcParameterValuesProvider;
+private ResultExtractor resultExtractor;
+
+private JdbcConnectionProvider connectionProvider;
+
+JdbcSourceBuilder() {
+this.configuration = new Configuration();
+this.connOptionsBuilder = new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder();
+this.splitReaderFetchBatchSize = 1024;
+this.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
+this.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY;
+this.deliveryGuarantee = DeliveryGuarantee.NONE;
+// Boolean to distinguish between default value and explicitly set 
autoCommit mode.
+this.autoCommit = true;
+}
+
+public JdbcSourceBuilder setSql(@Nonnull String sql) {
+Preconditions.checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(sql), "It's required to 
set the sql.");
+this.sql = sql;
+return this;
+}
+
+public JdbcSourceBuilder setResultExtractor(ResultExtractor 
resultExtractor) {
+this.resultExtractor =
+Preconditions.checkNotNull(resultExtractor, "resultExtractor 
must not be null.");
+return this;
+}
+
+public JdbcSourceBuilder setUsername(String username) {
+Preconditions.checkArgument(
+!StringUtils.isNullOrWhitespaceOnly(username),
+"It's required to set the 'username'.");
+connOptionsBuilder.withUsername(username);
+return this;
+}
+
+public JdbcSourceBuilder setPassword(String password) {
+

Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2023-12-28 Thread via GitHub


RocMarshal commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-1870955620

   Hi, @Jiabao-Sun Would you mind helping take a look on this pr ?
   thank you~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2023-11-27 Thread via GitHub


eskabetxe commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1405924661


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
+import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Objects;
+
+/** JDBC source. */
+@Internal

Review Comment:
   should not be PublicEvolving?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2023-11-27 Thread via GitHub


eskabetxe commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1405924661


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
+import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Objects;
+
+/** JDBC source. */
+@Internal

Review Comment:
   I think this should be PublicEvolving



##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java:
##
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.sql.ResultSet;
+
+import static 

Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2023-11-27 Thread via GitHub


eskabetxe commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1405917676


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/Utils.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/** Utils class to hold common static methods. */
+public class Utils {
+private Utils() {}
+
+public static void serializeJdbcSourceSplit(DataOutputStream out, 
JdbcSourceSplit sourceSplit)

Review Comment:
   I would say that this should be on JdbcSourceSplitSerializer.java, and we 
should instantiate that on JdbcSourceEnumStateSerializer.
   This because any change to this code will affect the state and should be 
versioned, and is more easy to understand if is there.
   
   Other option is to have this on JdbcSourceSplit, but I think that the same 
problem of versioning will happen.
   
   But we should avoid this kind of utility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2023-11-17 Thread via GitHub


davidradl commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1397421543


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
+import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Objects;
+
+/** JDBC source. */
+@Internal

Review Comment:
   I wonder why this is not tagged with @experimental as the JdbcSourceBuild is 
experimental. Also JdbcSourceBuild is tagged @internal. I suspect they both 
should be tagged consistently



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2023-11-17 Thread via GitHub


davidradl commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1397421543


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java:
##
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer;
+import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState;
+import 
org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader;
+import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit;
+import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Objects;
+
+/** JDBC source. */
+@Internal

Review Comment:
   I wonder why this is not tagged with @experimental as the JdbcSourceBuild is 
experimental



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2023-11-17 Thread via GitHub


RocMarshal commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1397395729


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java:
##
@@ -99,7 +99,11 @@
  * @see JdbcParameterValuesProvider
  * @see PreparedStatement
  * @see DriverManager
+ * @deprecated Please use {@link 
org.apache.flink.connector.jdbc.source.JdbcSource} instead. The
+ * builder utils and parameters passing could be view {@link

Review Comment:
   hi, @davidradl 
   
   Thank you very much for your comment.
   Your concerns are reasonable.
   Firstly, there are two questions:  
   1. There may be syntax issues with the expression. I just want to declare 
here that this method has expired and requires the use of a new API for 
development, with the new class declared here.  I'll update the sentence.
   
   2. A comprehensive document is indeed needed here to guide developers.
   As https://issues.apache.org/jira/browse/FLINK-25420 mentioned in, there are 
currently no plans to introduce relevant documents.
   
   Of course, if you are willing to write usage documentation for this new API, 
we would greatly appreciate it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2023-11-17 Thread via GitHub


davidradl commented on code in PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1397218900


##
flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java:
##
@@ -99,7 +99,11 @@
  * @see JdbcParameterValuesProvider
  * @see PreparedStatement
  * @see DriverManager
+ * @deprecated Please use {@link 
org.apache.flink.connector.jdbc.source.JdbcSource} instead. The
+ * builder utils and parameters passing could be view {@link

Review Comment:
   I am not sure what this sentence means. Is it something to do with jdbc 
views or relating to the viewing the link   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2023-11-05 Thread via GitHub


boring-cyborg[bot] commented on PR #78:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-1793683582

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]

2023-11-05 Thread via GitHub


RocMarshal opened a new pull request, #78:
URL: https://github.com/apache/flink-connector-jdbc/pull/78

   # Main change  
   - [FLINK-33459][Connector/JDBC] Add dependency items about 
flink-connector-base and testing into the pom.xml
   - [FLINK-33459][Connector/JDBC] Support the new source that keeps the same 
functionality as the original JDBC input format
   - [FLINK-33459][Connector/JDBC] Mark the old JdbcInputFormat as Deprecated.
   
   # Testing
   Add some unit test cased and integration test cases.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org