invoke FlinkKafkaProducer011 constructor in scala: val producer = new FlinkKafkaProducer011[PVEvent.Entity](appConf.getPvEventTopic, new PvEventSerializeSchema, producerProps, Optional.of(FlinkRebalancePartitioner[PVEvent.Entity]))
and the constructor is : /** * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}. * * <p>If a partitioner is not provided, written records will be partitioned by the attached key of each * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they * will be distributed to Kafka partitions in a round-robin fashion. * * @param defaultTopicId The default topic to write data to * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. * If a partitioner is not provided, records will be partitioned by the key of each record * (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys * are {@code null}, then records will be distributed to Kafka partitions in a * round-robin fashion. */ public FlinkKafkaProducer011( String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { this( defaultTopicId, serializationSchema, producerConfig, customPartitioner, Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } but cannot complie pass, and IDEA show ''cannot resolve constructor" error. and i invoke other constructor that without java8 Optional params, it will no error。 because of java8 Optional param?what should i do?