[ 
https://issues.apache.org/jira/browse/KAFKA-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-8689.
--------------------------------
    Resolution: Duplicate

Duplicate of https://issues.apache.org/jira/browse/KAFKA-8558

> Cannot Name Join State Store Topics
> -----------------------------------
>
>                 Key: KAFKA-8689
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8689
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Simon Dean
>            Priority: Major
>
> Performing a join on two KStreams, produces two state store topics.  
> Currently the names state store topics are auto generated and cannot be 
> overridden. 
> Example code:
>  
> {code:java}
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.common.serialization.LongSerializer;
> import org.apache.kafka.common.serialization.Serde;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.common.serialization.StringSerializer;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.JoinWindows;
> import org.apache.kafka.streams.kstream.Joined;
> import org.apache.kafka.streams.kstream.KStream;
> import java.time.Duration;
> import java.util.HashMap;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.TimeUnit;
> public class JoinTopicNamesExample {
>     public static void main(final String[] args) throws InterruptedException {
>         new Thread(() -> {
>             produce(args);
>         }).run();
>         new Thread(() -> {
>             try {
>                 streams(args);
>             } catch (InterruptedException e) {
>                 e.printStackTrace();
>             }
>         }).run();
>     }
>     private static void produce(String[] args) {
>         Map<String, Object> props = new HashMap<>();
>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
>         props.put(ProducerConfig.RETRIES_CONFIG, 0);
>         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
>         props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
>         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
>         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
>         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> LongSerializer.class);
>         KafkaProducer<String, Long> producer = new KafkaProducer<>(props);
>         for (long i = 0; i < 10; i++) {
>             producer.send(new ProducerRecord("left", Long.toString(i), i));
>         }
>         for (long i = 0; i < 10; i++) {
>             producer.send(new ProducerRecord("right", Long.toString(i), i));
>         }
>     }
>     private static void streams(String[] args) throws InterruptedException {
>         final String bootstrapServers = args.length > 0 ? args[0] : 
> "localhost:9092";
>         final Properties streamsConfiguration = new Properties();
>         // Give the Streams application a unique name.  The name must be 
> unique in the Kafka cluster
>         // against which the application is run.
>         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "join-topic-names-example");
>         streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, 
> "user-region-lambda-example-client");
>         // Where to find Kafka broker(s).
>         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> bootstrapServers);
>         // Specify default (de)serializers for record keys and for record 
> values.
>         
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass().getName());
>         
> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.Long().getClass().getName());
>         // Records should be flushed every 10 seconds. This is less than the 
> default
>         // in order to keep this example interactive.
>         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 
> * 1000);
>         final Serde<String> stringSerde = Serdes.String();
>         final Serde<Long> longSerde = Serdes.Long();
>         final StreamsBuilder builder = new StreamsBuilder();
>         final KStream<String, Long> left = builder.stream("left", 
> Consumed.with(stringSerde, longSerde));
>         final KStream<String, Long> right = builder.stream("right", 
> Consumed.with(stringSerde, longSerde));
>         left.join(
>                 right,
>                 (value1, value2) -> value1 + value2,
>                 JoinWindows.of(Duration.ofHours(1)), 
>                 Joined.as("sum"));
>         final KafkaStreams streams = new KafkaStreams(builder.build(), 
> streamsConfiguration);
>         streams.start();
>         Thread.sleep(TimeUnit.MINUTES.toMillis(1));
>         // Add shutdown hook to respond to SIGTERM and gracefully close Kafka 
> Streams
>         Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
>     }
> }
> {code}
>  
>  
> Here are the topics produce by the example code:
>  * join-topic-names-example-KSTREAM-JOINOTHER-0000000005-store-changelog
>  * join-topic-names-example-KSTREAM-JOINTHIS-0000000004-store-changelog
>  * left
>  * right
> In the example code above, a material name is passed into the Join with 
> Joined.as("sum").  The "sum" name is ignored when the Kafka Stream decides on 
> the state store topic names. 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to