Andriy Redko created FLINK-30718: ------------------------------------ Summary: Cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter Key: FLINK-30718 URL: https://issues.apache.org/jira/browse/FLINK-30718 Project: Flink Issue Type: Bug Components: Connectors / Opensearch Affects Versions: 1.16.0 Reporter: Andriy Redko
When using OpenSearchSink programmatically {noformat} final StreamExecutionEnvironment env = StreamExecutionEnvironment .createRemoteEnvironment("localhost", 8081); final Collection<Tuple4<String, String, Long, Long>> users = new ArrayList<>(); users.add(Tuple4.of("u1", "admin", 100L, 200L)); final DataStream<Tuple4<String, String, Long, Long>> source = env.fromCollection(users); final OpensearchSink<Tuple4<String, String, Long, Long>> sink = new OpensearchSinkBuilder<Tuple4<String, String, Long, Long>>() .setHosts(new HttpHost("localhost", 9200, "https")) .setEmitter( (element, ctx, indexer) -> { indexer.add( Requests .indexRequest() .index("users") .id(element.f0) .source(Map.ofEntries( Map.entry("user_id", element.f0), Map.entry("user_name", element.f1), Map.entry("uv", element.f2), Map.entry("pv", element.f3) ))); }) .setConnectionUsername("admin") .setConnectionPassword("admin") .setAllowInsecure(true) .setBulkFlushMaxActions(1) .build(); source.sinkTo(sink); env.execute("Opensearch end to end sink test example"); {noformat} the stream processing fails with the exception{color:#000000} {color} {noformat} Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:399) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:162) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:681) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter of type org.apache.flink.connector.opensearch.sink.OpensearchEmitter in instance of org.apache.flink.connector.opensearch.sink.OpensearchSink at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076) at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039) at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293) at java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383) ... 9 more {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)