Hi!

It is very likely that versions of your Flink client and Flink standalone
cluster do not match. SubtaskStateMapper.DISCARD_EXTRA_STATE is removed
since Flink 1.14 so please make sure that your Flink client version is also
1.14.

Sigalit Eliazov <e.siga...@gmail.com> 于2021年11月10日周三 上午5:46写道:

> Hello
>
> i am creating new pipeline which
>
>    1. receives info from kafka (mainly the key)
>
>    2. with this key select information from a D
>
>    3. writes to kafka the results
>
> Flink is running has a standalone cluster
>
> I am failing on the pipeline deployment when activating step 2 with the 
> following error
>
> [org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to 
> deserialize JobGraph
>
> Caused by: java.lang.IllegalArgumentException: No enum constant 
> org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.DISCARD_EXTRA_STATE
>
> the code:
> PCollection<KV<String, Iterable<String>>> input = 
> pipeline.apply("readFromKafka",
>                 KafkaTransform.readStrFromKafka(
>                         pipelineUtil.getBootstrapServers(), topic))
>         .apply("window", Window.<KV<String, String>>into(new GlobalWindows()) 
> // Everything into global window.
>                 
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>                 .discardingFiredPanes())
>         .apply("S", GroupByKey.create());
> PCollection<MyObject> output = input.apply("read from db", JdbcIO.<KV<String, 
> Iterable<String>>, AnalyticsResult>readAll()
>         .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
>                         "org.postgresql.Driver", PipelineUtil.getDbInfo())
>                 .withUsername("123")
>                 .withPassword(PipelineUtil.readCredentials()))
>         .withQuery("select *  from a where id = ? order by insert_timestamp 
> limit 5")
>         .withParameterSetter(new JdbcIO.PreparedStatementSetter<KV<String, 
> Iterable<String>>>() {
>             @Override
>             public void setParameters(KV<String, Iterable<String>> element,
>                                       PreparedStatement preparedStatement) 
> throws Exception {
>                 String nfcId = element.getKey();
>                 preparedStatement.setString(1, nfcId);
>             }
>         })
>         .withRowMapper(new JdbcIO.RowMapper<MyObject>() {
>             public AnalyticsResult mapRow(ResultSet resultSet) throws 
> Exception {
>                 MyObject obj = new MyObject(
>                         resultSet.getString("name"),
>                 );
>
>                 return obj;
>             }
>         }).withCoder(SerializableCoder.of(AnalyticsResult.class)));
>
>
> any ideas?
>
> Thanks a lot
>
> S
>
>

Reply via email to