Re: select records using JDBC with parameters
Thanks alot it was really related to different versions. I have one more question about this solution the select statement returns list of results i see that when retrieving data we activate row mapper which handles only one row at a time and return PCollection of that row do i have a way to aggregate those results and return an object with list? i need to write to kafka an object that holds the list of results Thanks again S On Wed, Nov 10, 2021 at 3:28 AM Caizhi Weng wrote: > Hi! > > It is very likely that versions of your Flink client and Flink standalone > cluster do not match. SubtaskStateMapper.DISCARD_EXTRA_STATE is removed > since Flink 1.14 so please make sure that your Flink client version is also > 1.14. > > Sigalit Eliazov 于2021年11月10日周三 上午5:46写道: > >> Hello >> >> i am creating new pipeline which >> >>1. receives info from kafka (mainly the key) >> >>2. with this key select information from a D >> >>3. writes to kafka the results >> >> Flink is running has a standalone cluster >> >> I am failing on the pipeline deployment when activating step 2 with the >> following error >> >> [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to >> deserialize JobGraph >> >> Caused by: java.lang.IllegalArgumentException: No enum constant >> org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE >> >> the code: >> PCollection>> input = >> pipeline.apply("readFromKafka", >> KafkaTransform.readStrFromKafka( >> pipelineUtil.getBootstrapServers(), topic)) >> .apply("window", Window.>into(new >> GlobalWindows()) // Everything into global window. >> >> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >> .discardingFiredPanes()) >> .apply("S", GroupByKey.create()); >> PCollection output = input.apply("read from db", >> JdbcIO.>, AnalyticsResult>readAll() >> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( >> "org.postgresql.Driver", PipelineUtil.getDbInfo()) >> .withUsername("123") >> .withPassword(PipelineUtil.readCredentials())) >> .withQuery("select * from a where id = ? order by insert_timestamp >> limit 5") >> .withParameterSetter(new JdbcIO.PreparedStatementSetter> Iterable>>() { >> @Override >> public void setParameters(KV> element, >> PreparedStatement preparedStatement) >> throws Exception { >> String nfcId = element.getKey(); >> preparedStatement.setString(1, nfcId); >> } >> }) >> .withRowMapper(new JdbcIO.RowMapper() { >> public AnalyticsResult mapRow(ResultSet resultSet) throws >> Exception { >> MyObject obj = new MyObject( >> resultSet.getString("name"), >> ); >> >> return obj; >> } >> }).withCoder(SerializableCoder.of(AnalyticsResult.class))); >> >> >> any ideas? >> >> Thanks a lot >> >> S >> >>
Re: select records using JDBC with parameters
Hi! It is very likely that versions of your Flink client and Flink standalone cluster do not match. SubtaskStateMapper.DISCARD_EXTRA_STATE is removed since Flink 1.14 so please make sure that your Flink client version is also 1.14. Sigalit Eliazov 于2021年11月10日周三 上午5:46写道: > Hello > > i am creating new pipeline which > >1. receives info from kafka (mainly the key) > >2. with this key select information from a D > >3. writes to kafka the results > > Flink is running has a standalone cluster > > I am failing on the pipeline deployment when activating step 2 with the > following error > > [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to > deserialize JobGraph > > Caused by: java.lang.IllegalArgumentException: No enum constant > org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE > > the code: > PCollection>> input = > pipeline.apply("readFromKafka", > KafkaTransform.readStrFromKafka( > pipelineUtil.getBootstrapServers(), topic)) > .apply("window", Window.>into(new GlobalWindows()) > // Everything into global window. > > .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) > .discardingFiredPanes()) > .apply("S", GroupByKey.create()); > PCollection output = input.apply("read from db", JdbcIO. Iterable>, AnalyticsResult>readAll() > .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( > "org.postgresql.Driver", PipelineUtil.getDbInfo()) > .withUsername("123") > .withPassword(PipelineUtil.readCredentials())) > .withQuery("select * from a where id = ? order by insert_timestamp > limit 5") > .withParameterSetter(new JdbcIO.PreparedStatementSetter Iterable>>() { > @Override > public void setParameters(KV> element, > PreparedStatement preparedStatement) > throws Exception { > String nfcId = element.getKey(); > preparedStatement.setString(1, nfcId); > } > }) > .withRowMapper(new JdbcIO.RowMapper() { > public AnalyticsResult mapRow(ResultSet resultSet) throws > Exception { > MyObject obj = new MyObject( > resultSet.getString("name"), > ); > > return obj; > } > }).withCoder(SerializableCoder.of(AnalyticsResult.class))); > > > any ideas? > > Thanks a lot > > S > >
select records using JDBC with parameters
Hello i am creating new pipeline which 1. receives info from kafka (mainly the key) 2. with this key select information from a D 3. writes to kafka the results Flink is running has a standalone cluster I am failing on the pipeline deployment when activating step 2 with the following error [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to deserialize JobGraph Caused by: java.lang.IllegalArgumentException: No enum constant org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE the code: PCollection>> input = pipeline.apply("readFromKafka", KafkaTransform.readStrFromKafka( pipelineUtil.getBootstrapServers(), topic)) .apply("window", Window.>into(new GlobalWindows()) // Everything into global window. .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()) .apply("S", GroupByKey.create()); PCollection output = input.apply("read from db", JdbcIO.>, AnalyticsResult>readAll() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "org.postgresql.Driver", PipelineUtil.getDbInfo()) .withUsername("123") .withPassword(PipelineUtil.readCredentials())) .withQuery("select * from a where id = ? order by insert_timestamp limit 5") .withParameterSetter(new JdbcIO.PreparedStatementSetter>>() { @Override public void setParameters(KV> element, PreparedStatement preparedStatement) throws Exception { String nfcId = element.getKey(); preparedStatement.setString(1, nfcId); } }) .withRowMapper(new JdbcIO.RowMapper() { public AnalyticsResult mapRow(ResultSet resultSet) throws Exception { MyObject obj = new MyObject( resultSet.getString("name"), ); return obj; } }).withCoder(SerializableCoder.of(AnalyticsResult.class))); any ideas? Thanks a lot S