Andriy Redko created FLINK-30718:
------------------------------------

             Summary: Cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter 
                 Key: FLINK-30718
                 URL: https://issues.apache.org/jira/browse/FLINK-30718
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Opensearch
    Affects Versions: 1.16.0
            Reporter: Andriy Redko


When using OpenSearchSink programmatically
{noformat}
 
final StreamExecutionEnvironment env = StreamExecutionEnvironment
            .createRemoteEnvironment("localhost", 8081);

        final Collection<Tuple4<String, String, Long, Long>> users = new 
ArrayList<>();
        users.add(Tuple4.of("u1", "admin", 100L, 200L));

        final DataStream<Tuple4<String, String, Long, Long>> source = 
env.fromCollection(users);
        final OpensearchSink<Tuple4<String, String, Long, Long>> sink =
            new OpensearchSinkBuilder<Tuple4<String, String, Long, Long>>()
                .setHosts(new HttpHost("localhost", 9200, "https"))
                .setEmitter( (element, ctx, indexer) -> {
                    indexer.add(
                        Requests
                            .indexRequest()
                            .index("users")
                            .id(element.f0)
                            .source(Map.ofEntries(
                                Map.entry("user_id", element.f0),
                                Map.entry("user_name", element.f1),
                                Map.entry("uv", element.f2),
                                Map.entry("pv", element.f3)
                            )));
                        })
                .setConnectionUsername("admin")
                .setConnectionPassword("admin")
                .setAllowInsecure(true)
                .setBulkFlushMaxActions(1)
                .build();

        source.sinkTo(sink);
        env.execute("Opensearch end to end sink test example");

{noformat}
the stream processing fails with the exception{color:#000000}

{color}
{noformat}
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
        at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:399)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:162)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:681)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: cannot assign instance of 
java.lang.invoke.SerializedLambda to field 
org.apache.flink.connector.opensearch.sink.OpensearchSink.emitter of type 
org.apache.flink.connector.opensearch.sink.OpensearchEmitter in instance of 
org.apache.flink.connector.opensearch.sink.OpensearchSink
        at 
java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2076)
        at 
java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2039)
        at 
java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1293)
        at 
java.base/java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2512)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2419)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at 
java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at 
java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at 
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
        at 
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
        at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
        at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
        ... 9 more


 {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to