Re: select records using JDBC with parameters

2021-11-10 Thread Sigalit Eliazov
Thanks alot it was really related to different versions.
I have one more question about this solution
the select statement returns list of results
i see that when retrieving data we activate row mapper which handles only
one row at a time
and return PCollection of that row
do i have a way to aggregate those results and return an object with list?
i need to write to kafka an object that holds the list of results

Thanks again
S

On Wed, Nov 10, 2021 at 3:28 AM Caizhi Weng  wrote:

> Hi!
>
> It is very likely that versions of your Flink client and Flink standalone
> cluster do not match. SubtaskStateMapper.DISCARD_EXTRA_STATE is removed
> since Flink 1.14 so please make sure that your Flink client version is also
> 1.14.
>
> Sigalit Eliazov  于2021年11月10日周三 上午5:46写道:
>
>> Hello
>>
>> i am creating new pipeline which
>>
>>1. receives info from kafka (mainly the key)
>>
>>2. with this key select information from a D
>>
>>3. writes to kafka the results
>>
>> Flink is running has a standalone cluster
>>
>> I am failing on the pipeline deployment when activating step 2 with the 
>> following error
>>
>> [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to 
>> deserialize JobGraph
>>
>> Caused by: java.lang.IllegalArgumentException: No enum constant 
>> org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE
>>
>> the code:
>> PCollection>> input = 
>> pipeline.apply("readFromKafka",
>> KafkaTransform.readStrFromKafka(
>> pipelineUtil.getBootstrapServers(), topic))
>> .apply("window", Window.>into(new 
>> GlobalWindows()) // Everything into global window.
>> 
>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>> .discardingFiredPanes())
>> .apply("S", GroupByKey.create());
>> PCollection output = input.apply("read from db", 
>> JdbcIO.>, AnalyticsResult>readAll()
>> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
>> "org.postgresql.Driver", PipelineUtil.getDbInfo())
>> .withUsername("123")
>> .withPassword(PipelineUtil.readCredentials()))
>> .withQuery("select *  from a where id = ? order by insert_timestamp 
>> limit 5")
>> .withParameterSetter(new JdbcIO.PreparedStatementSetter> Iterable>>() {
>> @Override
>> public void setParameters(KV> element,
>>   PreparedStatement preparedStatement) 
>> throws Exception {
>> String nfcId = element.getKey();
>> preparedStatement.setString(1, nfcId);
>> }
>> })
>> .withRowMapper(new JdbcIO.RowMapper() {
>> public AnalyticsResult mapRow(ResultSet resultSet) throws 
>> Exception {
>> MyObject obj = new MyObject(
>> resultSet.getString("name"),
>> );
>>
>> return obj;
>> }
>> }).withCoder(SerializableCoder.of(AnalyticsResult.class)));
>>
>>
>> any ideas?
>>
>> Thanks a lot
>>
>> S
>>
>>


Re: select records using JDBC with parameters

2021-11-09 Thread Caizhi Weng
Hi!

It is very likely that versions of your Flink client and Flink standalone
cluster do not match. SubtaskStateMapper.DISCARD_EXTRA_STATE is removed
since Flink 1.14 so please make sure that your Flink client version is also
1.14.

Sigalit Eliazov  于2021年11月10日周三 上午5:46写道:

> Hello
>
> i am creating new pipeline which
>
>1. receives info from kafka (mainly the key)
>
>2. with this key select information from a D
>
>3. writes to kafka the results
>
> Flink is running has a standalone cluster
>
> I am failing on the pipeline deployment when activating step 2 with the 
> following error
>
> [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to 
> deserialize JobGraph
>
> Caused by: java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE
>
> the code:
> PCollection>> input = 
> pipeline.apply("readFromKafka",
> KafkaTransform.readStrFromKafka(
> pipelineUtil.getBootstrapServers(), topic))
> .apply("window", Window.>into(new GlobalWindows()) 
> // Everything into global window.
> 
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
> .discardingFiredPanes())
> .apply("S", GroupByKey.create());
> PCollection output = input.apply("read from db", JdbcIO. Iterable>, AnalyticsResult>readAll()
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
> "org.postgresql.Driver", PipelineUtil.getDbInfo())
> .withUsername("123")
> .withPassword(PipelineUtil.readCredentials()))
> .withQuery("select *  from a where id = ? order by insert_timestamp 
> limit 5")
> .withParameterSetter(new JdbcIO.PreparedStatementSetter Iterable>>() {
> @Override
> public void setParameters(KV> element,
>   PreparedStatement preparedStatement) 
> throws Exception {
> String nfcId = element.getKey();
> preparedStatement.setString(1, nfcId);
> }
> })
> .withRowMapper(new JdbcIO.RowMapper() {
> public AnalyticsResult mapRow(ResultSet resultSet) throws 
> Exception {
> MyObject obj = new MyObject(
> resultSet.getString("name"),
> );
>
> return obj;
> }
> }).withCoder(SerializableCoder.of(AnalyticsResult.class)));
>
>
> any ideas?
>
> Thanks a lot
>
> S
>
>


select records using JDBC with parameters

2021-11-09 Thread Sigalit Eliazov
Hello

i am creating new pipeline which

   1. receives info from kafka (mainly the key)

   2. with this key select information from a D

   3. writes to kafka the results

Flink is running has a standalone cluster

I am failing on the pipeline deployment when activating step 2 with
the following error

[org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to
deserialize JobGraph

Caused by: java.lang.IllegalArgumentException: No enum constant
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE

the code:
PCollection>> input =
pipeline.apply("readFromKafka",
KafkaTransform.readStrFromKafka(
pipelineUtil.getBootstrapServers(), topic))
.apply("window", Window.>into(new
GlobalWindows()) // Everything into global window.

.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
.apply("S", GroupByKey.create());
PCollection output = input.apply("read from db",
JdbcIO.>, AnalyticsResult>readAll()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"org.postgresql.Driver", PipelineUtil.getDbInfo())
.withUsername("123")
.withPassword(PipelineUtil.readCredentials()))
.withQuery("select *  from a where id = ? order by
insert_timestamp limit 5")
.withParameterSetter(new
JdbcIO.PreparedStatementSetter>>() {
@Override
public void setParameters(KV> element,
  PreparedStatement
preparedStatement) throws Exception {
String nfcId = element.getKey();
preparedStatement.setString(1, nfcId);
}
})
.withRowMapper(new JdbcIO.RowMapper() {
public AnalyticsResult mapRow(ResultSet resultSet) throws
Exception {
MyObject obj = new MyObject(
resultSet.getString("name"),
);

return obj;
}
}).withCoder(SerializableCoder.of(AnalyticsResult.class)));


any ideas?

Thanks a lot

S