It might be easier to use a `Transformer` with a state store. Each time you receive an input record, you first check if the parent entry is in the store. If yes, add the new record, otherwise not.
-Matthias On 1/28/19 2:37 PM, Nan Xu wrote: > hi, > I was writing a simple stream app, all it does is producer send a sequence > of path and value, for example > path /0 , value 1 > path /0/1, value 2 > path /0/1/2, value 3 > and kafka stream take those input and produce a ktable store. > > There is a rule. if parent path is not exist, then child can not insert. > so if /0/1 is not there, insert /0/1/2 should be filter out. > > I use the following program to process it. path send as /0, /0/1, /0/1/2, > ....., /0/1/../9. > > Because the filter is depends on the ktable store, which was build after > the filter stream. When filter check for a path if its parent exist, it > could be the parent path already pass the filter, but not at the store > yet, but from filter, it think the parent is not exist. this is more of a > problem of asyn processing. because the parent is not fully done( to the > store), and next element start processing (filter) > > Another problem is because parent key and child key are different, so the > path arrival seq could be different as producer send sequence, which also > cause the child get filter out. producer send as /0, /0/1, /0/1/2.. but > broker get it as /0, /0/1/2, /0/1,.....then all the following path will be > filter out, because /0/1/2 don't get a chance to get created. > > any thoughts to solve this? > > Thanks, > Nan > > > val streamProperties = new Properties() > streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-first-streams-application1") > streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092") > streamProperties.put(StreamsConfig.CLIENT_ID_CONFIG, > "important-test-client") > streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > classOf[StringSerde].getName) > streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > classOf[IntegerSerde].getName) > > val streamBuilder = new StreamsBuilder() > val topic = "input" > > val inputStream = streamBuilder.stream[String, Integer](topic) > > val materialized = Materialized.as[String, Integer, KeyValueStore[Bytes, > Array[Byte]]](topic + "_store") > .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer()) > > val reducer = new Reducer[Integer](){ > override def apply(value1: Integer, value2: Integer): Integer = { > value2 > } > } > > //value is not important, only care key. > val ktable = inputStream.filter(filter).groupByKey().reduce(reducer, > materialized) > > // make sure parent exist. > def filter(key: String, value: Integer): Boolean = { > println("===current store===, checking key " + key + " value: " + value > ) > store.all().forEachRemaining(x => println(x.key)) > val parent = key.trim().substring(0, key.lastIndexOf("/")) > if(parent == "") { > true > } else { > if (store.get(parent) == null) { > println("not found parent" + parent) > false > } else { > true > } > } > } > > val topology = streamBuilder.build() > val streams = new KafkaStreams(topology, streamProperties) > streams.start() > > Thread.sleep(6000) > val storeName = ktable.queryableStoreName() > val store = streams.store(storeName, > QueryableStoreTypes.keyValueStore[String, Integer]) > > > val senderProperties = new Properties > senderProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092") > senderProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer") > senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > classOf[StringSerializer].getName) > senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > classOf[IntegerSerializer].getName) > val producer = new KafkaProducer[String, Integer](senderProperties) > > > for(j <- 1 until 10) { > val path = for(i <- 0 until j) yield { > "/" + i > } > producer.send(new ProducerRecord(topic, path.mkString(""), j)) > } > > Thread.sleep(3000) > println("====show final store state====") > store.all().forEachRemaining(x => println(x.key)) > > Thread.sleep(10000000) > > > output: > ===current store===, checking key /0 value: 1 > ===current store===, checking key /0/1 value: 2 > /0 > ===current store===, checking key /0/1/2/3 value: 4 > /0/1 > /0 > not found parent/0/1/2 > ===current store===, checking key /0/1/2 value: 3 > /0/1 > /0 > ===current store===, checking key /0/1/2/3/4/5 value: 6 > /0/1 > /0 > /0/1/2 > not found parent/0/1/2/3/4 > ===current store===, checking key /0/1/2/3/4 value: 5 > /0/1 > /0 > /0/1/2 > not found parent/0/1/2/3 > ===current store===, checking key /0/1/2/3/4/5/6 value: 7 > /0/1 > /0 > /0/1/2 > not found parent/0/1/2/3/4/5 > ===current store===, checking key /0/1/2/3/4/5/6/7/8 value: 9 > /0/1 > /0 > /0/1/2 > not found parent/0/1/2/3/4/5/6/7 > ===current store===, checking key /0/1/2/3/4/5/6/7 value: 8 > /0/1 > /0 > /0/1/2 > not found parent/0/1/2/3/4/5/6 > ====show final store state==== > /0/1 > /0 > /0/1/2 >
signature.asc
Description: OpenPGP digital signature