[
https://issues.apache.org/jira/browse/BEAM-12005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17549235#comment-17549235
]
Danny McCormick commented on BEAM-12005:
----------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/20842
> getting issue to load file into database (java.lang.ClassCastException:
> java.lang.String cannot be cast to org.apache.beam.sdk.values.KV)
> -----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-12005
> URL: https://issues.apache.org/jira/browse/BEAM-12005
> Project: Beam
> Issue Type: Bug
> Components: io-java-jdbc
> Affects Versions: 2.28.0
> Reporter: Gaurav Khandelwal
> Priority: P3
> Labels: ClassCastException, JdbcIO, MySQL, apache-beam
>
> Hi Team,
> We are getting below error :
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.beam.sdk.values.KV
>
> Our target is to load file into database. We tried following approach:
> {code:java}
> @SuppressWarnings("unchecked")
> public static void main(String[] args) {
> PCSI02AOptions options =
> PipelineOptionsFactory.fromArgs(args).withoutStrictParsing().as(PCSI02AOptions.class);
> Pipeline p = Pipeline.create(options);
> PCollection data1 = p.apply("Reading Text",
> TextIO.read().from(options.getInputFile()))
> .apply(ParDo.of(new GetRatePlanID()))
> .apply("Format Result",
>
> MapElements.into(TypeDescriptors.strings())
> .via((KV<String,
> Integer> ABC) -> ABC.getKey() + "," + +ABC.getValue()));
> data1.apply(JdbcIO.<KV<String, Iterable<Integer>>,
> String>readAll()
>
> .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
>
> .create("com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/ABC")
> .withUsername("abc")
> .withPassword("abc123"))
> .withCoder(StringUtf8Coder.of())
> .withParameterSetter(new
> JdbcIO.PreparedStatementSetter<KV<String, Iterable<Integer>>>() {
> @Override
> public void setParameters(KV<String,
> Iterable<Integer>> element,
> PreparedStatement
> preparedStatement) throws Exception {
> String[] range =
> element.getKey().split(",");
> preparedStatement.setInt(1,
> Integer.parseInt(range[0]));
> }
> }).withQuery("select * from ABC.PAY_PLAN_INFO
> where plan_key = ?")
> .withRowMapper((JdbcIO.RowMapper<String>)
> resultSet -> {
> ObjectMapper mapper = new
> ObjectMapper();
> ArrayNode arrayNode =
> mapper.createArrayNode();
> for (int i = 1; i <=
> resultSet.getMetaData().getColumnCount(); i++) {
> try {
> ObjectNode objectNode =
> mapper.createObjectNode();
>
> objectNode.put("column_name",resultSet.getMetaData().getColumnName(i));
>
> objectNode.put("value",resultSet.getString(i));
>
> arrayNode.add(objectNode);
> } catch (Exception e) {
> throw e;
> }
> }
> return
> mapper.writeValueAsString(arrayNode);
> })
> )
> ;
> State result = p.run().waitUntilFinish();
> System.out.println(result);
> }
> private static class GetPlanID extends DoFn<String, KV<String, Integer>> {
> @ProcessElement
> public void processElement(ProcessContext c)
> {
> String[] data = c.element().split(",");
> Integer plankey = Integer.parseInt(data[0]);
> String planid = data[1];
> c.output(KV.of(planid, plankey));
> }
> }{code}
>
> Error:
> {code:java}
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.beam.sdk.values.KVException in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.beam.sdk.values.KV
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)
>
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)
>
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219)
> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
> at com.loblaw.pcinsiders.jobflow.FiletoDB.main(FiletoDB.java:120)
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
> org.apache.beam.sdk.values.KV
> at
> com.loblaw.pcinsiders.jobflow.FiletoDB$1.setParameters(FiletoDB.java:1)
> at
> org.apache.beam.sdk.io.jdbc.JdbcIO$ReadFn.processElement(JdbcIO.java:910){code}
>
> Kindly suggest how we can resolve it ? Or do we have any reference for same
> if we have kindly share link or snippets.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)