Re: database as stream source issue

2021-10-28 Thread Leonard Xu
Hi, Qihua

JDBC connector support Postgres dailect, but it is implemented as a bounded 
source which means it only captures the snapshot data(the existed records) and 
then finished its work, the new adding transaction log records (as known as 
MySql bin-log) won’t be captured. You should receive all the snapshot data If 
your program works fine.

BTW, if you want capture both snapshot + transaction log events, you can try 
`postgres-cdc` connector[1], it offers SQL API and DataStream API, you can 
refer the documentation[2] for quick start.


Best,
Leonard
[1] https://github.com/ververica/flink-cdc-connectors
[2] 
https://ververica.github.io/flink-cdc-connectors/release-2.0/content/connectors/postgres-cdc.html




> 在 2021年10月28日,13:24,Qihua Yang  写道:
> 
> Hi,
> 
> I am trying to use postgres DB as the stream data source and push to kafka 
> topic. Here is how I config database source. Looks like it didn't read out 
> any data. But I didn't see any error from the flink log. 
> I did a test, tried to insert wrong data to database, I saw flink throw below 
> error. Looks like flink try to read data from database.
> java.lang.ClassCastException: class java.lang.Long cannot be cast to class 
> java.lang.Integer (java.lang.Long and java.lang.Integer are in module 
> java.base of loader 'bootstrap')
> 
> I saw  job manager shows switched from DEPLOYING to RUNNING. and switched 
> from RUNNING to FINISHED immediately. 
> Can anyone help understand why?
> Did I config anything wrong? Or I missed anything?
> 
> 2021-10-28 02:49:52.777 [flink-akka.actor.default-dispatcher-3] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Sink: test-sink 
> (2/2) (7aa24e97a11bbd831941d636910fe84f) switched from DEPLOYING to RUNNING.
> 
> 2021-10-28 02:49:53.245 [flink-akka.actor.default-dispatcher-15] INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: 
> JdbcTableSource(id, name, address) -> Map -> Map -> Filter (1/2) 
> (558afdc9dd911684833d0e51943eef92) switched from RUNNING to FINISHED.
> 
> val options = JdbcOptions.builder()
> // .setDBUrl("jdbc:derby:memory:mydb")
> .setDBUrl("")
> .setTableName("test_store")
> .setDriverName("org.postgresql.Driver")
> .setUsername("dbUser")
> .setPassword("123")
> .build()
> val readOptions = JdbcReadOptions.builder()
> .setPartitionColumnName("id")
> .setPartitionLowerBound(-1)
> .setPartitionUpperBound(DB_SIZE)
> .setNumPartitions(PARTITIONS)
> //.setFetchSize(0)
> .build()
> val lookupOptions = JdbcLookupOptions.builder()
> .setCacheMaxSize(-1)
> .setCacheExpireMs(CACHE_SIZE)
> .setMaxRetryTimes(2)
> .build()
> val dataSource = JdbcTableSource.builder()
> .setOptions(options)
> .setReadOptions(readOptions)
> .setLookupOptions(lookupOptions)
> .setSchema(storeSchema)
> .build().getDataStream(env)



database as stream source issue

2021-10-27 Thread Qihua Yang
Hi,

I am trying to use postgres DB as the stream data source and push to kafka
topic. Here is how I config database source. Looks like it didn't read out
any data. But I didn't see any error from the flink log.
I did a test, tried to insert wrong data to database, I saw flink throw
below error. Looks like flink try to read data from database.
*java.lang.ClassCastException: class java.lang.Long cannot be cast to class
java.lang.Integer (java.lang.Long and java.lang.Integer are in module
java.base of loader 'bootstrap')*

I saw  job manager shows switched from DEPLOYING to RUNNING. and switched
from RUNNING to FINISHED immediately.
Can anyone help understand why?
Did I config anything wrong? Or I missed anything?



*2021-10-28 02:49:52.777 [flink-akka.actor.default-dispatcher-3] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Sink: test-sink
(2/2) (7aa24e97a11bbd831941d636910fe84f) switched from DEPLOYING to
RUNNING.2021-10-28 02:49:53.245 [flink-akka.actor.default-dispatcher-15]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
JdbcTableSource(id, name, address) -> Map -> Map -> Filter (1/2)
(558afdc9dd911684833d0e51943eef92) switched from RUNNING to FINISHED.*

val options = JdbcOptions.builder()
// .setDBUrl("jdbc:derby:memory:mydb")
.setDBUrl("")
.setTableName("test_store")
.setDriverName("org.postgresql.Driver")
.setUsername("dbUser")
.setPassword("123")
.build()
val readOptions = JdbcReadOptions.builder()
.setPartitionColumnName("id")
.setPartitionLowerBound(-1)
.setPartitionUpperBound(DB_SIZE)
.setNumPartitions(PARTITIONS)
//.setFetchSize(0)
.build()
val lookupOptions = JdbcLookupOptions.builder()
.setCacheMaxSize(-1)
.setCacheExpireMs(CACHE_SIZE)
.setMaxRetryTimes(2)
.build()
val dataSource = JdbcTableSource.builder()
.setOptions(options)
.setReadOptions(readOptions)
.setLookupOptions(lookupOptions)
.setSchema(storeSchema)
.build().getDataStream(env)