Hi,
I'm trying to send data to a Kafka topic using PyFlink (DataStream API),
while setting a key on the Kakfa record. The key is a simple string, the
value is a JSON string. What I have so far basically works, except the
whole record is sent as both the key and the value. How do I specify that I
want only the record["id"] to be the key? Unfortunately, the docs
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#serializer>
on
the topic don't indicate how to specify a key field.
Here's a simplified version of my code:
kafka_sink = (
KafkaSink.builder()
.set_bootstrap_servers(broker_url)
.set_property("security.protocol", "SASL_SSL")
.set_property("sasl.mechanism", "PLAIN")
.set_property(
"sasl.jaas.config",
f'org.apache.kafka.common.security.plain.PlainLoginModule required \
username="{my_username}" \
password="{my_password}";',
)
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("myTopic")
.set_value_serialization_schema(SimpleStringSchema())
.set_key_serialization_schema(SimpleStringSchema())
.build()
)
.build()
)
stream = env.add_source(kinesis_source)
.map(lambda event: json.loads(event))
.map(add_event_timestamps)
.assign_timestamps_and_watermarks(get_watermark_strategy())
.key_by(lambda record: record["id"], key_type=Types.STRING())
.window(EventTimeSessionWindows.with_gap(Time.seconds(30)))
.reduce(reduce_events)
.map(lambda e: json.dumps(e), output_type=Types.STRING())
One thing I've tried is changing the last line to:
.map(lambda e: (e["id"], json.dumps(e)))
That is, have the stream be a (key, value) tuple. When I do that, though, I
get an exception:
RuntimeError: java.lang.UnsupportedOperationException: A serializer has
> already been registered for the state; re-registration is not allowed.
Thanks,
Andrew