Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2042712930 Thanks everybody related for the pr. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
boring-cyborg[bot] commented on PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2042575379 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
snuyanzin merged PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
snuyanzin commented on PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2042573923 thanks @RocMarshal for the hard work thanks a lot everyone for the review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
Jiabao-Sun commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1448844127 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceOptions.java: ## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import java.io.Serializable; +import java.sql.ResultSet; + +/** JDBC source options. */ +public class JdbcSourceOptions implements Serializable { + +private JdbcSourceOptions() {} + +public static final ConfigOption READER_FETCH_BATCH_SIZE = +ConfigOptions.key("connectors.jdbc.split-reader.fetch-batch-size") +.intType() +.defaultValue(1024); +public static final ConfigOption RESULTSET_TYPE = +ConfigOptions.key("connectors.jdbc.split-reader.resultSetType") +.intType() +.defaultValue(ResultSet.TYPE_FORWARD_ONLY); +public static final ConfigOption RESULTSET_CONCURRENCY = + ConfigOptions.key("connectors.jdbc.split-reader.resultSetConcurrency") +.intType() +.defaultValue(ResultSet.CONCUR_READ_ONLY); +public static final ConfigOption RESULTSET_FETCH_SIZE = + ConfigOptions.key("connectors.jdbc.split-reader.resultSetFetchSize") +.intType() +.defaultValue(0); +public static final ConfigOption AUTO_COMMIT = +ConfigOptions.key("connectors.jdbc.split-reader.autoCommit") +.booleanType() +.defaultValue(true); Review Comment: How about to use lowercase hyphenated form for the configuration name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2042297264 Thank you @eskabetxe @caicancai very much for the review. CC @snuyanzin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
snuyanzin commented on PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2020733326 @davidradl @eskabetxe could you please have another look here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2016325794 Thank you @snuyanzin for the review. I made some change based on your comments~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1536549280 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE; + +/** + * The JDBC source reader to read data from jdbc splits. + * + * @param The type of the record read from the source. + */ +public class JdbcSourceSplitReader +implements SplitReader, JdbcSourceSplit>, ResultTypeQueryable { + +private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitReader.class); + +private final Configuration config; +@Nullable private JdbcSourceSplit currentSplit; +private final Queue splits; +private final TypeInformation typeInformation; +private final JdbcConnectionProvider connectionProvider; +private transient Connection connection; +private transient PreparedStatement statement; +private transient ResultSet resultSet; + +private final ResultExtractor resultExtractor; +protected boolean hasNextRecordCurrentSplit; +private final DeliveryGuarantee deliveryGuarantee; + +private final int splitReaderFetchBatchSize; + +private final int resultSetType; +private final int resultSetConcurrency; +private final int resultSetFetchSize; +// Boolean to distinguish between default value and explicitly set autoCommit mode. +private final Boolean autoCommit; +private int currentSplitOffset; + +private final SourceReaderContext context; + +public JdbcSourceSplitReader( +SourceReaderContext context, +Configuration config, +TypeInformation typeInformation, +JdbcConnectionProvider connectionProvider, +DeliveryGuarantee deliveryGuarantee, +ResultExtractor resultExtractor) { +this.context = Preconditions.checkNotNull(context); +this.config = Preconditions.checkNotNull(config); +this.typeInformation = Preconditions.checkNotNull(typeInformation); +this.connectionProvider = Preconditions.checkNotNull(connectionProvider); +this.resultSetType = config.getInteger(RESULTSET_TYPE); +
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
snuyanzin commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1535501299 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/JdbcSourceSplitReader.java: ## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source.reader; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.base.source.reader.RecordsBySplits; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Queue; + +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE; + +/** + * The JDBC source reader to read data from jdbc splits. + * + * @param The type of the record read from the source. + */ +public class JdbcSourceSplitReader +implements SplitReader, JdbcSourceSplit>, ResultTypeQueryable { + +private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSplitReader.class); + +private final Configuration config; +@Nullable private JdbcSourceSplit currentSplit; +private final Queue splits; +private final TypeInformation typeInformation; +private final JdbcConnectionProvider connectionProvider; +private transient Connection connection; +private transient PreparedStatement statement; +private transient ResultSet resultSet; + +private final ResultExtractor resultExtractor; +protected boolean hasNextRecordCurrentSplit; +private final DeliveryGuarantee deliveryGuarantee; + +private final int splitReaderFetchBatchSize; + +private final int resultSetType; +private final int resultSetConcurrency; +private final int resultSetFetchSize; +// Boolean to distinguish between default value and explicitly set autoCommit mode. +private final Boolean autoCommit; +private int currentSplitOffset; + +private final SourceReaderContext context; + +public JdbcSourceSplitReader( +SourceReaderContext context, +Configuration config, +TypeInformation typeInformation, +JdbcConnectionProvider connectionProvider, +DeliveryGuarantee deliveryGuarantee, +ResultExtractor resultExtractor) { +this.context = Preconditions.checkNotNull(context); +this.config = Preconditions.checkNotNull(config); +this.typeInformation = Preconditions.checkNotNull(typeInformation); +this.connectionProvider = Preconditions.checkNotNull(connectionProvider); +this.resultSetType = config.getInteger(RESULTSET_TYPE); +
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
snuyanzin commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1535498978 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java: ## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.sql.DriverManager; +import java.sql.PreparedStatement; + +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE; + +/** + * A tool is used to build {@link JdbcSource} quickly. + * + * + * JdbcSourceRow> source = JdbcSource.Row>builder() + * .setSql(validSql) + * .setResultExtractor(new RowResultExtractor()) + * .setDBUrl(dbUrl) + * .setDriverName(driverName) + * .setTypeInformation(new TypeHintRow>() {}.getTypeInfo()) + * .setPassword(password) + * .setUsername(username) + * .build(); + * + * + * In order to query the JDBC source in parallel, you need to provide a parameterized query + * template (i.e. a valid {@link PreparedStatement}) and a {@link JdbcParameterValuesProvider} which + * provides binding values for the query parameters. E.g.: + * + * + * + * Serializable[][] queryParameters = new String[2][1]; + * queryParameters[0] = new String[]{"Kumar"}; + * queryParameters[1] = new String[]{"Tan Ah Teck"}; + * + * JdbcSourceRow> jdbcSource = JdbcSource.Row>builder() + * .setResultExtractor(new RowResultExtractor()) + * .setTypeInformation(new TypeHintRow>() {}.getTypeInfo()) + * .setPassword(password) + * .setUsername(username) + * .setDriverName("org.apache.derby.jdbc.EmbeddedDriver") + * .setDBUrl("jdbc:derby:memory:ebookshop") + * .setSql("select * from books WHERE author = ?") + * .setJdbcParameterValuesProvider(new JdbcGenericParameterValuesProvider(queryParameters)) + * .build(); + * + * + * @see Row + * @see JdbcParameterValuesProvider + * @see PreparedStatement + * @see DriverManager + * @see JdbcSource + */ +@PublicEvolving +public class JdbcSourceBuilder { + +public static final Logger LOG = LoggerFactory.getLogger(JdbcSourceBuilder.class); + +private final Configuration configuration; + +private int splitReaderFetchBatchSize; +private int resultSetType; +private int resultSetConcurrency; +private int resultSetFetchSize; +// Boolean to distinguish between default value and explicitly set autoCommit mode. +private Boolean autoCommit; + +// TODO It would need a builder method to render after introducing streaming semantic. Review Comment: should we fix this TODO first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-2005697292 Hi, @snuyanzin would you mind helping take a look ? thank you very much~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441484531 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Objects; + +/** JDBC source. */ +@PublicEvolving +public class JdbcSource +implements Source, +ResultTypeQueryable { + +private final Boundedness boundedness; +private final TypeInformation typeInformation; + +private final Configuration configuration; +private final JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider; + +protected JdbcConnectionProvider connectionProvider; +private final ResultExtractor resultExtractor; +private final DeliveryGuarantee deliveryGuarantee; + +JdbcSource( +Configuration configuration, +JdbcConnectionProvider connectionProvider, +JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider, +ResultExtractor resultExtractor, +@Nullable TypeInformation typeInformation, +@Nullable DeliveryGuarantee deliveryGuarantee) { Review Comment: Good proposal! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
eskabetxe commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441475081 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Objects; + +/** JDBC source. */ +@PublicEvolving +public class JdbcSource +implements Source, +ResultTypeQueryable { + +private final Boundedness boundedness; +private final TypeInformation typeInformation; + +private final Configuration configuration; +private final JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider; + +protected JdbcConnectionProvider connectionProvider; +private final ResultExtractor resultExtractor; +private final DeliveryGuarantee deliveryGuarantee; + +JdbcSource( +Configuration configuration, +JdbcConnectionProvider connectionProvider, +JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider, +ResultExtractor resultExtractor, +@Nullable TypeInformation typeInformation, +@Nullable DeliveryGuarantee deliveryGuarantee) { Review Comment: The second one (without DeliveryGuarantee) could call the first one, with the DeliveryGuarantee.NONE. this way we could remove the @Nullable, and not duplicate the constructor logic ``` JdbcSource( Configuration configuration, JdbcConnectionProvider connectionProvider, JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider, ResultExtractor resultExtractor, TypeInformation typeInformation) { this(configuration, connectionProvider, sqlSplitEnumeratorProvider, resultExtractor, typeInformation, DeliveryGuarantee.NONE); } JdbcSource( Configuration configuration, JdbcConnectionProvider connectionProvider, JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider, ResultExtractor resultExtractor, TypeInformation typeInformation, DeliveryGuarantee deliveryGuarantee) { this.configuration =
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
eskabetxe commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441475081 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Objects; + +/** JDBC source. */ +@PublicEvolving +public class JdbcSource +implements Source, +ResultTypeQueryable { + +private final Boundedness boundedness; +private final TypeInformation typeInformation; + +private final Configuration configuration; +private final JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider; + +protected JdbcConnectionProvider connectionProvider; +private final ResultExtractor resultExtractor; +private final DeliveryGuarantee deliveryGuarantee; + +JdbcSource( +Configuration configuration, +JdbcConnectionProvider connectionProvider, +JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider, +ResultExtractor resultExtractor, +@Nullable TypeInformation typeInformation, +@Nullable DeliveryGuarantee deliveryGuarantee) { Review Comment: The second one (without DeliveryGuarantee) could call the first one, with the DeliveryGuarantee.NONE. this way we could remove the @Nullable, and not duplicate the constructor logic ` JdbcSource( Configuration configuration, JdbcConnectionProvider connectionProvider, JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider, ResultExtractor resultExtractor, @Nonnull TypeInformation typeInformation) { this(configuration, connectionProvider, sqlSplitEnumeratorProvider, resultExtractor, typeInformation, DeliveryGuarantee.NONE); } JdbcSource( Configuration configuration, JdbcConnectionProvider connectionProvider, JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider, ResultExtractor resultExtractor, @Nonnull TypeInformation typeInformation, DeliveryGuarantee deliveryGuarantee) { this.configuration =
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441300360 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source.reader.extractor; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * The Extractor to extract the data from {@link ResultSet}. + * + * @param The target data type. + */ +@PublicEvolving +public interface ResultExtractor extends Serializable { + +/** + * Extract the data from the current point line of the result. + * + * @param resultSet Result set queried from a sql. + * @return The data object filled by the current line of the resultSet. + * @throws SQLException SQL exception. + */ +T extract(ResultSet resultSet) throws SQLException; + +/** + * The identifier of the extractor. + * + * @return identifier in {@link String} type. + */ +default String identifier() { Review Comment: What do you think if we change the default implementation to this way (and remove the implementation/override method of `RowResultExtractor`) ``` default String identifier() { return this.getClass().getSimpleName(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441300360 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source.reader.extractor; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * The Extractor to extract the data from {@link ResultSet}. + * + * @param The target data type. + */ +@PublicEvolving +public interface ResultExtractor extends Serializable { + +/** + * Extract the data from the current point line of the result. + * + * @param resultSet Result set queried from a sql. + * @return The data object filled by the current line of the resultSet. + * @throws SQLException SQL exception. + */ +T extract(ResultSet resultSet) throws SQLException; + +/** + * The identifier of the extractor. + * + * @return identifier in {@link String} type. + */ +default String identifier() { Review Comment: What do you think if we change the default implementation to this way (and remove the implementation of `RowResultExtractor`) ``` default String identifier() { return this.getClass().getSimpleName(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-1876318089 Hi, @eskabetxe Thank you very much for the review. And I left some discussion items for your comments. Please let me know what's your opinion~ :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441300360 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/reader/extractor/ResultExtractor.java: ## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source.reader.extractor; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.types.Row; + +import java.io.Serializable; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * The Extractor to extract the data from {@link ResultSet}. + * + * @param The target data type. + */ +@PublicEvolving +public interface ResultExtractor extends Serializable { + +/** + * Extract the data from the current point line of the result. + * + * @param resultSet Result set queried from a sql. + * @return The data object filled by the current line of the resultSet. + * @throws SQLException SQL exception. + */ +T extract(ResultSet resultSet) throws SQLException; + +/** + * The identifier of the extractor. + * + * @return identifier in {@link String} type. + */ +default String identifier() { Review Comment: What do you think if we change the default implementation to this way ``` default String identifier() { return this.getClass().getSimpleName(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441299767 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java: ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.sql.ResultSet; + +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE; + +/** A tool is used to build {@link JdbcSource} quickly. */ +@PublicEvolving +public class JdbcSourceBuilder { + +public static final Logger LOG = LoggerFactory.getLogger(JdbcSourceBuilder.class); + +private final Configuration configuration; + +private int splitReaderFetchBatchSize; +private int resultSetType; +private int resultSetConcurrency; +private int resultSetFetchSize; +// Boolean to distinguish between default value and explicitly set autoCommit mode. +private Boolean autoCommit; + +// TODO It would need a builder method to render after introducing streaming semantic. +private DeliveryGuarantee deliveryGuarantee; + +private TypeInformation typeInformation; + +private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder; +private String sql; +private JdbcParameterValuesProvider jdbcParameterValuesProvider; +private ResultExtractor resultExtractor; + +private JdbcConnectionProvider connectionProvider; + +JdbcSourceBuilder() { +this.configuration = new Configuration(); +this.connOptionsBuilder = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder(); +this.splitReaderFetchBatchSize = 1024; +this.resultSetType = ResultSet.TYPE_FORWARD_ONLY; +this.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY; +this.deliveryGuarantee = DeliveryGuarantee.NONE; +// Boolean to distinguish between default value and explicitly set autoCommit mode. +this.autoCommit = true; +} + +public JdbcSourceBuilder setSql(@Nonnull String sql) { +Preconditions.checkArgument( +!StringUtils.isNullOrWhitespaceOnly(sql), "It's required to set the sql."); +this.sql = sql; +return this; +} + +public JdbcSourceBuilder setResultExtractor(ResultExtractor resultExtractor) { +this.resultExtractor = +Preconditions.checkNotNull(resultExtractor, "resultExtractor must not be null."); +return this; +} + +public JdbcSourceBuilder setUsername(String username) { +Preconditions.checkArgument( +!StringUtils.isNullOrWhitespaceOnly(username), +"It's required to set the 'username'."); +connOptionsBuilder.withUsername(username); +return this; +} + +public JdbcSourceBuilder setPassword(String password) { +
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1441298453 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Objects; + +/** JDBC source. */ +@PublicEvolving +public class JdbcSource +implements Source, +ResultTypeQueryable { + +private final Boundedness boundedness; +private final TypeInformation typeInformation; + +private final Configuration configuration; +private final JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider; + +protected JdbcConnectionProvider connectionProvider; +private final ResultExtractor resultExtractor; +private final DeliveryGuarantee deliveryGuarantee; + +JdbcSource( +Configuration configuration, +JdbcConnectionProvider connectionProvider, +JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider, +ResultExtractor resultExtractor, +@Nullable TypeInformation typeInformation, +@Nullable DeliveryGuarantee deliveryGuarantee) { Review Comment: Thx for the comment. I have rechecked the logic of this section and come to the following conclusion in my limited read: - `typeInformation ` must not be null, because when `typeInformation ` is null, we currently cannot provide true type information through type inference. - `deliveryGuarantee` could be null. So how about retaining the following two construction methods? ``` JdbcSource( Configuration configuration, JdbcConnectionProvider connectionProvider, JdbcSqlSplitEnumeratorBase.Provider sqlSplitEnumeratorProvider, ResultExtractor resultExtractor, @Nonnull TypeInformation typeInformation, @Nullable DeliveryGuarantee deliveryGuarantee) { this.configuration = Preconditions.checkNotNull(configuration); this.connectionProvider = Preconditions.checkNotNull(connectionProvider); this.sqlSplitEnumeratorProvider = Preconditions.checkNotNull(sqlSplitEnumeratorProvider); this.resultExtractor =
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
eskabetxe commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1440197286 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java: ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.sql.ResultSet; + +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.AUTO_COMMIT; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.READER_FETCH_BATCH_SIZE; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_CONCURRENCY; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_FETCH_SIZE; +import static org.apache.flink.connector.jdbc.source.JdbcSourceOptions.RESULTSET_TYPE; + +/** A tool is used to build {@link JdbcSource} quickly. */ +@PublicEvolving +public class JdbcSourceBuilder { + +public static final Logger LOG = LoggerFactory.getLogger(JdbcSourceBuilder.class); + +private final Configuration configuration; + +private int splitReaderFetchBatchSize; +private int resultSetType; +private int resultSetConcurrency; +private int resultSetFetchSize; +// Boolean to distinguish between default value and explicitly set autoCommit mode. +private Boolean autoCommit; + +// TODO It would need a builder method to render after introducing streaming semantic. +private DeliveryGuarantee deliveryGuarantee; + +private TypeInformation typeInformation; + +private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder; +private String sql; +private JdbcParameterValuesProvider jdbcParameterValuesProvider; +private ResultExtractor resultExtractor; + +private JdbcConnectionProvider connectionProvider; + +JdbcSourceBuilder() { +this.configuration = new Configuration(); +this.connOptionsBuilder = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder(); +this.splitReaderFetchBatchSize = 1024; +this.resultSetType = ResultSet.TYPE_FORWARD_ONLY; +this.resultSetConcurrency = ResultSet.CONCUR_READ_ONLY; +this.deliveryGuarantee = DeliveryGuarantee.NONE; +// Boolean to distinguish between default value and explicitly set autoCommit mode. +this.autoCommit = true; +} + +public JdbcSourceBuilder setSql(@Nonnull String sql) { +Preconditions.checkArgument( +!StringUtils.isNullOrWhitespaceOnly(sql), "It's required to set the sql."); +this.sql = sql; +return this; +} + +public JdbcSourceBuilder setResultExtractor(ResultExtractor resultExtractor) { +this.resultExtractor = +Preconditions.checkNotNull(resultExtractor, "resultExtractor must not be null."); +return this; +} + +public JdbcSourceBuilder setUsername(String username) { +Preconditions.checkArgument( +!StringUtils.isNullOrWhitespaceOnly(username), +"It's required to set the 'username'."); +connOptionsBuilder.withUsername(username); +return this; +} + +public JdbcSourceBuilder setPassword(String password) { +
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-1870955620 Hi, @Jiabao-Sun Would you mind helping take a look on this pr ? thank you~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
eskabetxe commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1405924661 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Objects; + +/** JDBC source. */ +@Internal Review Comment: should not be PublicEvolving? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
eskabetxe commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1405924661 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Objects; + +/** JDBC source. */ +@Internal Review Comment: I think this should be PublicEvolving ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSourceBuilder.java: ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.SqlTemplateSplitEnumerator; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.sql.ResultSet; + +import static
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
eskabetxe commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1405917676 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/Utils.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.connector.jdbc.source.split.CheckpointedOffset; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.util.InstantiationUtil; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; + +/** Utils class to hold common static methods. */ +public class Utils { +private Utils() {} + +public static void serializeJdbcSourceSplit(DataOutputStream out, JdbcSourceSplit sourceSplit) Review Comment: I would say that this should be on JdbcSourceSplitSerializer.java, and we should instantiate that on JdbcSourceEnumStateSerializer. This because any change to this code will affect the state and should be versioned, and is more easy to understand if is there. Other option is to have this on JdbcSourceSplit, but I think that the same problem of versioning will happen. But we should avoid this kind of utility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
davidradl commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1397421543 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Objects; + +/** JDBC source. */ +@Internal Review Comment: I wonder why this is not tagged with @experimental as the JdbcSourceBuild is experimental. Also JdbcSourceBuild is tagged @internal. I suspect they both should be tagged consistently -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
davidradl commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1397421543 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/JdbcSource.java: ## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumStateSerializer; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumerator; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSourceEnumeratorState; +import org.apache.flink.connector.jdbc.source.enumerator.JdbcSqlSplitEnumeratorBase; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceReader; +import org.apache.flink.connector.jdbc.source.reader.JdbcSourceSplitReader; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplit; +import org.apache.flink.connector.jdbc.source.split.JdbcSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Objects; + +/** JDBC source. */ +@Internal Review Comment: I wonder why this is not tagged with @experimental as the JdbcSourceBuild is experimental -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1397395729 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java: ## @@ -99,7 +99,11 @@ * @see JdbcParameterValuesProvider * @see PreparedStatement * @see DriverManager + * @deprecated Please use {@link org.apache.flink.connector.jdbc.source.JdbcSource} instead. The + * builder utils and parameters passing could be view {@link Review Comment: hi, @davidradl Thank you very much for your comment. Your concerns are reasonable. Firstly, there are two questions: 1. There may be syntax issues with the expression. I just want to declare here that this method has expired and requires the use of a new API for development, with the new class declared here. I'll update the sentence. 2. A comprehensive document is indeed needed here to guide developers. As https://issues.apache.org/jira/browse/FLINK-25420 mentioned in, there are currently no plans to introduce relevant documents. Of course, if you are willing to write usage documentation for this new API, we would greatly appreciate it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
davidradl commented on code in PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#discussion_r1397218900 ## flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcInputFormat.java: ## @@ -99,7 +99,11 @@ * @see JdbcParameterValuesProvider * @see PreparedStatement * @see DriverManager + * @deprecated Please use {@link org.apache.flink.connector.jdbc.source.JdbcSource} instead. The + * builder utils and parameters passing could be view {@link Review Comment: I am not sure what this sentence means. Is it something to do with jdbc views or relating to the viewing the link -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
boring-cyborg[bot] commented on PR #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78#issuecomment-1793683582 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format [flink-connector-jdbc]
RocMarshal opened a new pull request, #78: URL: https://github.com/apache/flink-connector-jdbc/pull/78 # Main change - [FLINK-33459][Connector/JDBC] Add dependency items about flink-connector-base and testing into the pom.xml - [FLINK-33459][Connector/JDBC] Support the new source that keeps the same functionality as the original JDBC input format - [FLINK-33459][Connector/JDBC] Mark the old JdbcInputFormat as Deprecated. # Testing Add some unit test cased and integration test cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org