It might be easier to use a `Transformer` with a state store. Each time
you receive an input record, you first check if the parent entry is in
the store. If yes, add the new record, otherwise not.

-Matthias

On 1/28/19 2:37 PM, Nan Xu wrote:
> hi,
> I was writing a simple stream app, all it does is producer send a sequence
> of path and value, for example
> path /0 ,  value 1
> path /0/1,  value 2
> path /0/1/2, value 3
> and kafka stream take those input and produce a ktable store.
> 
> There is a rule. if parent path is not exist, then child can not insert.
> so if /0/1 is not there,  insert /0/1/2 should be filter out.
> 
> I use the following program to process it.  path send as /0, /0/1, /0/1/2,
> ....., /0/1/../9.
> 
> Because the filter is depends on the ktable store, which was build after
> the filter stream.  When filter check for a path if its parent exist, it
> could be the parent path already pass the filter, but not at the store
> yet,  but from filter, it think the parent is not exist. this is more of a
> problem of asyn processing. because the parent is not fully done( to the
> store), and next element start processing (filter)
> 
> Another problem is because parent key and child key are different, so the
> path arrival seq could be different as producer send sequence, which also
> cause the child get filter out.  producer send as /0, /0/1, /0/1/2.. but
> broker get it as /0, /0/1/2, /0/1,.....then all the following path will be
> filter out, because /0/1/2 don't get a chance to get created.
> 
> any thoughts to solve this?
> 
> Thanks,
> Nan
> 
> 
> val streamProperties = new Properties()
>   streamProperties.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "my-first-streams-application1")
>   streamProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092")
>   streamProperties.put(StreamsConfig.CLIENT_ID_CONFIG,
> "important-test-client")
>   streamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> classOf[StringSerde].getName)
>   streamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> classOf[IntegerSerde].getName)
> 
>   val streamBuilder = new StreamsBuilder()
>   val topic = "input"
> 
>   val inputStream = streamBuilder.stream[String, Integer](topic)
> 
>   val materialized = Materialized.as[String, Integer, KeyValueStore[Bytes,
> Array[Byte]]](topic + "_store")
>     .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())
> 
>   val reducer = new Reducer[Integer](){
>     override def apply(value1: Integer, value2: Integer): Integer = {
>       value2
>     }
>   }
> 
>   //value is not important, only care key.
>   val ktable = inputStream.filter(filter).groupByKey().reduce(reducer,
> materialized)
> 
>   // make sure parent exist.
>   def filter(key: String, value: Integer): Boolean = {
>     println("===current store===, checking key " + key + " value: " + value
> )
>     store.all().forEachRemaining(x => println(x.key))
>     val parent = key.trim().substring(0, key.lastIndexOf("/"))
>     if(parent == "") {
>       true
>     } else {
>       if (store.get(parent) == null) {
>         println("not found parent" + parent)
>         false
>       } else {
>         true
>       }
>     }
>   }
> 
>   val topology = streamBuilder.build()
>   val streams = new KafkaStreams(topology, streamProperties)
>   streams.start()
> 
>   Thread.sleep(6000)
>   val storeName = ktable.queryableStoreName()
>   val store = streams.store(storeName,
> QueryableStoreTypes.keyValueStore[String, Integer])
> 
> 
> val senderProperties = new Properties
>   senderProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092")
>   senderProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer")
>   senderProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> classOf[StringSerializer].getName)
>   senderProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
> classOf[IntegerSerializer].getName)
>   val producer = new KafkaProducer[String, Integer](senderProperties)
> 
> 
>   for(j <- 1 until 10) {
>     val path = for(i <- 0 until j) yield {
>       "/" + i
>     }
>     producer.send(new ProducerRecord(topic, path.mkString(""), j))
>   }
> 
>   Thread.sleep(3000)
>   println("====show final store state====")
>   store.all().forEachRemaining(x => println(x.key))
> 
> Thread.sleep(10000000)
> 
> 
> output:
> ===current store===, checking key /0 value: 1
> ===current store===, checking key /0/1 value: 2
> /0
> ===current store===, checking key /0/1/2/3 value: 4
> /0/1
> /0
> not found parent/0/1/2
> ===current store===, checking key /0/1/2 value: 3
> /0/1
> /0
> ===current store===, checking key /0/1/2/3/4/5 value: 6
> /0/1
> /0
> /0/1/2
> not found parent/0/1/2/3/4
> ===current store===, checking key /0/1/2/3/4 value: 5
> /0/1
> /0
> /0/1/2
> not found parent/0/1/2/3
> ===current store===, checking key /0/1/2/3/4/5/6 value: 7
> /0/1
> /0
> /0/1/2
> not found parent/0/1/2/3/4/5
> ===current store===, checking key /0/1/2/3/4/5/6/7/8 value: 9
> /0/1
> /0
> /0/1/2
> not found parent/0/1/2/3/4/5/6/7
> ===current store===, checking key /0/1/2/3/4/5/6/7 value: 8
> /0/1
> /0
> /0/1/2
> not found parent/0/1/2/3/4/5/6
> ====show final store state====
> /0/1
> /0
> /0/1/2
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to