More details would be useful. You can include pseudo code for your job will be useful (or actual code with any specific sections removed).
How are you making sure you read exactly same same of records from Kafka in your reader? I don't see that you are configuring Kafka reader for that in the code below. On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <[email protected]> wrote: > try{ > PCollection<KV<String, String>> > kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of()) > .withoutMetadata()).apply(ParDo.named("startBundle").of( > new DoFn<KV<byte[], String>, KV<String, String>>() { > .....etc. >
