Should I change the query? something like below to add a limit? If no limit, does that mean flink will read whole huge table to memory and fetch and return 20 records each time?
val query = String.format("SELECT * FROM %s limit 1000", tableName) On Tue, Jan 18, 2022 at 11:56 PM Qihua Yang <yang...@gmail.com> wrote: > Hi Caizhi, > > Thank you for your reply. The heap size is 512m. Fetching from the DB > table is the only costly operation. After fetching from DB, I simply > ingested a kafka topic. That should not be the bottleneck. > Here is the jdbc configuration. Is that correct config? > > val query = String.format("SELECT * FROM %s", tableName) > > val options = JdbcOptions.builder() > .setDBUrl(url) > .setTableName(tableName) > .setDriverName(DRIVER_NAME) > .setUsername(userName) > .setPassword(password) > .build() > val readOptions = JdbcReadOptions.builder() > .setQuery(query) > .setPartitionColumnName(PARTITION_KEY) > .setPartitionLowerBound(dbLowerBound) > .setPartitionUpperBound(dbUpperBound) > .setNumPartitions(50) > .setFetchSize(20) > .build() > val lookupOptions = JdbcLookupOptions.builder() > .setCacheMaxSize(-1) > .setCacheExpireMs(1000) > .setMaxRetryTimes(2) > .build() > val rawSource = JdbcTableSource.builder() > .setOptions(options) > .setReadOptions(readOptions) > .setLookupOptions(lookupOptions) > .setSchema(schema) > .build().getDataStream(env) > > > On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> This is not the desired behavior. As you have set fetchSize to 20 there >> will be only 20 records in each parallelism of the source. How large is >> your heap size? Does your job have any other operations which consume a lot >> of heap memory? >> >> Qihua Yang <yang...@gmail.com> 于2022年1月19日周三 15:27写道: >> >>> Here is the errors >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "server-timer" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "I/O dispatcher 16" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "HTTP-Dispatcher" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "I/O dispatcher 11" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "I/O dispatcher 9" >>> >>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <yang...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I have a flink cluster(50 hosts, each host runs a task manager). >>>> I am using Flink JDBC to consume data from a database. The db table is >>>> pretty large, around 187340000 rows. I configured the JDBC number of >>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers. >>>> Anyone know why I got OutOfMemoryError? How should I config it? >>>> >>>> Thanks, >>>> Qihua >>>> >>>>