Dear team,
I am currently using Apache Beam to read messages from Kafka with a
Protobuf schema and a schema registry URL, but I am still encountering
errors. Could you please help check my use of the Beam SDK for this case?
Below attachment is my code.
package org.example;
import com.google.protobuf.Message;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.DeserializerProvider;
import org.apache.beam.sdk.io.kafka.KafkaIO;
//import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.example.Apenpup;
import java.util.HashMap;
import java.util.Map;
public class KafkaPipeline {
private static final Logger LOG = LoggerFactory.getLogger(KafkaPipeline.class);
public static void main(String[] args) {
String schemaRegistryUrl = "http://localhost:8081";
String subject = "my-topic";
String boostrapServer = "xxxx:9092";
String topic = "xxxx";
String GROUP_ID = "my-group-id";
// Create a Pipeline object
Pipeline pipeline = Pipeline.create();
// Create the deserializer provider
ConfluentSchemaRegistryDeserializerProvider<Apenpup> deserializerProvider =
ConfluentSchemaRegistryDeserializerProvider.of(schemaRegistryUrl, subject,5);
// Create Kafka consumer configurations
Map<String, Object> kafkaConfigs = new HashMap<>();
// // Get the deserializer
org.apache.kafka.common.serialization.Deserializer<Apenpup> deserializer =
deserializerProvider.getDeserializer(kafkaConfigs, true );
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put("bootstrap.servers", boostrapServer);
consumerConfig.put("group.id", GROUP_ID);
consumerConfig.put("key.deserializer", StringDeserializer.class.getName());
consumerConfig.put("value.deserializer", KafkaProtobufDeserializer.class.getName());
consumerConfig.put("specific.protobuf.value.type", Apenpup.class.getName());
// Create an instance of KafkaProtobufDeserializer and configure it
// KafkaProtobufDeserializer<Apenpup> valueDeserializer = new KafkaProtobufDeserializer<>();
// valueDeserializer.configure(consumerConfig, false);
// Read from Kafka
PCollection<Apenpup> messages = pipeline.apply("ReadFromKafka", KafkaIO.<String, Apenpup>read()
.withBootstrapServers(boostrapServer)
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer((DeserializerProvider<Apenpup>) deserializer)
.updateConsumerProperties(consumerConfig)
.withoutMetadata())
.apply("ExtractValues", ParDo.of(new DoFn<KV<String, Apenpup>, Apenpup>() {
@ProcessElement
public void processElement(@Element KV<String, Apenpup> record, OutputReceiver<Apenpup> out) {
Apenpup value = record.getValue();
LOG.info("Received message: " + value);
out.output(value);
}
}));
// Apply windowing
PCollection<Apenpup> windowedMessages = messages.apply("WindowIntoFixedIntervals",
Window.into(FixedWindows.of(Duration.standardMinutes(1))));
// Apply a simple transformation
PCollection<String> transformedMessages = windowedMessages.apply("TransformMessages", ParDo.of(new DoFn<Apenpup, String>() {
@ProcessElement
public void processElement(@Element Apenpup message, OutputReceiver<String> out) {
String transformedMessage = message.toString().toUpperCase();
LOG.info("Transformed message: " + transformedMessage);
out.output(transformedMessage);
}
}));
// Write the results to text files
transformedMessages.apply("WriteToText", TextIO.write()
.to("/home/thuybui1/apachebeampipeline/output/output")
.withWindowedWrites()
.withNumShards(1)
.withSuffix(".txt"));
// Run the pipeline
pipeline.run().waitUntilFinish();
}
}