Should I change the query? something like below to add a limit? If no
limit, does that mean flink will read whole huge table to memory and fetch
and return 20 records each time?

val query = String.format("SELECT * FROM %s limit 1000", tableName)


On Tue, Jan 18, 2022 at 11:56 PM Qihua Yang <yang...@gmail.com> wrote:

> Hi Caizhi,
>
> Thank you for your reply. The heap size is 512m. Fetching from the DB
> table is the only costly operation. After fetching from DB, I simply
> ingested a kafka topic. That should not be the bottleneck.
> Here is the jdbc configuration. Is that correct config?
>
> val query = String.format("SELECT * FROM %s", tableName)
>
> val options = JdbcOptions.builder()
>     .setDBUrl(url)
>     .setTableName(tableName)
>     .setDriverName(DRIVER_NAME)
>     .setUsername(userName)
>     .setPassword(password)
>     .build()
> val readOptions = JdbcReadOptions.builder()
>     .setQuery(query)
>     .setPartitionColumnName(PARTITION_KEY)
>     .setPartitionLowerBound(dbLowerBound)
>     .setPartitionUpperBound(dbUpperBound)
>     .setNumPartitions(50)
>     .setFetchSize(20)
>     .build()
> val lookupOptions = JdbcLookupOptions.builder()
>     .setCacheMaxSize(-1)
>     .setCacheExpireMs(1000)
>     .setMaxRetryTimes(2)
>     .build()
> val rawSource = JdbcTableSource.builder()
>     .setOptions(options)
>     .setReadOptions(readOptions)
>     .setLookupOptions(lookupOptions)
>     .setSchema(schema)
>     .build().getDataStream(env)
>
>
> On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>
>> Hi!
>>
>> This is not the desired behavior. As you have set fetchSize to 20 there
>> will be only 20 records in each parallelism of the source. How large is
>> your heap size? Does your job have any other operations which consume a lot
>> of heap memory?
>>
>> Qihua Yang <yang...@gmail.com> 于2022年1月19日周三 15:27写道:
>>
>>> Here is the errors
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "server-timer"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "I/O dispatcher 16"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "HTTP-Dispatcher"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "I/O dispatcher 11"
>>> Exception: java.lang.OutOfMemoryError thrown from the
>>> UncaughtExceptionHandler in thread "I/O dispatcher 9"
>>>
>>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang <yang...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a flink cluster(50 hosts, each host runs a task manager).
>>>> I am using Flink JDBC to consume data from a database. The db table is
>>>> pretty large, around 187340000 rows. I configured the JDBC number of
>>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers.
>>>> Anyone know why I got OutOfMemoryError? How should I config it?
>>>>
>>>> Thanks,
>>>> Qihua
>>>>
>>>>

Reply via email to