I am trying to Sink data to Hive via Confluent Kafka -> Flink -> Hive using following code snippet:
But I am getting following error: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<GenericRecord> stream = readFromKafka(env); private static final TypeInformation[] FIELD_TYPES = new TypeInformation[]{ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO }; JDBCAppendTableSink sink = JDBCAppendTableSink.builder() .setDrivername("org.apache.hive.jdbc.HiveDriver") .setDBUrl("jdbc:hive2://hiveconnstring") .setUsername("myuser") .setPassword("mypass") .setQuery("INSERT INTO testHiveDriverTable (key,value) VALUES (?,?)") .setBatchSize(1000) .setParameterTypes(FIELD_TYPES) .build(); DataStream<Row> rows = stream.map((MapFunction<GenericRecord, Row>) st1 -> { Row row = new Row(2); // row.setField(0, st1.get("SOME_ID")); row.setField(1, st1.get("SOME_ADDRESS")); return row; }); sink.emitDataStream(rows); env.execute("Flink101"); Caused by: java.lang.RuntimeException: Execution of JDBC statement failed. at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:219) at org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.snapshotState(JDBCSinkFunction.java:43) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:356) ... 12 more Caused by: java.sql.SQLException: Method not supported at org.apache.hive.jdbc.HiveStatement.executeBatch(HiveStatement.java:381) at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:216) ... 17 more I checked hive-jdbc driver and it seems that the Method is not supported in hive-jdbc driver. public class HiveStatement implements java.sql.Statement { ... @Override public int[] executeBatch() throws SQLException { throw new SQLFeatureNotSupportedException("Method not supported"); } .. } Is there any way we can achieve this using JDBC Driver ? Let me know, Thanks in advance.