[
https://issues.apache.org/jira/browse/KAFKA-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bill Bejeck resolved KAFKA-8689.
--------------------------------
Resolution: Duplicate
Duplicate of https://issues.apache.org/jira/browse/KAFKA-8558
> Cannot Name Join State Store Topics
> -----------------------------------
>
> Key: KAFKA-8689
> URL: https://issues.apache.org/jira/browse/KAFKA-8689
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.3.0
> Reporter: Simon Dean
> Priority: Major
>
> Performing a join on two KStreams, produces two state store topics.
> Currently the names state store topics are auto generated and cannot be
> overridden.
> Example code:
>
> {code:java}
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.LongSerializer;
> import org.apache.kafka.common.serialization.Serde;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.JoinWindows;
> import org.apache.kafka.streams.kstream.Joined;
> import org.apache.kafka.streams.kstream.KStream;
> import java.time.Duration;
> import java.util.HashMap;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> public class JoinTopicNamesExample {
> public static void main(final String[] args) throws InterruptedException {
> new Thread(() -> {
> produce(args);
> }).run();
> new Thread(() -> {
> try {
> streams(args);
> } catch (InterruptedException e) {
> e.printStackTrace();
> }
> }).run();
> }
> private static void produce(String[] args) {
> Map<String, Object> props = new HashMap<>();
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(ProducerConfig.RETRIES_CONFIG, 0);
> props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
> props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
> props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> StringSerializer.class);
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> LongSerializer.class);
> KafkaProducer<String, Long> producer = new KafkaProducer<>(props);
> for (long i = 0; i < 10; i++) {
> producer.send(new ProducerRecord("left", Long.toString(i), i));
> }
> for (long i = 0; i < 10; i++) {
> producer.send(new ProducerRecord("right", Long.toString(i), i));
> }
> }
> private static void streams(String[] args) throws InterruptedException {
> final String bootstrapServers = args.length > 0 ? args[0] :
> "localhost:9092";
> final Properties streamsConfiguration = new Properties();
> // Give the Streams application a unique name. The name must be
> unique in the Kafka cluster
> // against which the application is run.
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "join-topic-names-example");
> streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG,
> "user-region-lambda-example-client");
> // Where to find Kafka broker(s).
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrapServers);
> // Specify default (de)serializers for record keys and for record
> values.
>
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
>
> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.Long().getClass().getName());
> // Records should be flushed every 10 seconds. This is less than the
> default
> // in order to keep this example interactive.
> streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10
> * 1000);
> final Serde<String> stringSerde = Serdes.String();
> final Serde<Long> longSerde = Serdes.Long();
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream<String, Long> left = builder.stream("left",
> Consumed.with(stringSerde, longSerde));
> final KStream<String, Long> right = builder.stream("right",
> Consumed.with(stringSerde, longSerde));
> left.join(
> right,
> (value1, value2) -> value1 + value2,
> JoinWindows.of(Duration.ofHours(1)),
> Joined.as("sum"));
> final KafkaStreams streams = new KafkaStreams(builder.build(),
> streamsConfiguration);
> streams.start();
> Thread.sleep(TimeUnit.MINUTES.toMillis(1));
> // Add shutdown hook to respond to SIGTERM and gracefully close Kafka
> Streams
> Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> }
> }
> {code}
>
>
> Here are the topics produce by the example code:
> * join-topic-names-example-KSTREAM-JOINOTHER-0000000005-store-changelog
> * join-topic-names-example-KSTREAM-JOINTHIS-0000000004-store-changelog
> * left
> * right
> In the example code above, a material name is passed into the Join with
> Joined.as("sum"). The "sum" name is ignored when the Kafka Stream decides on
> the state store topic names.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)