More details would be useful. You can include pseudo code for your job will
be useful (or actual code with any specific sections removed).

How are you making sure you read exactly same same of records from Kafka in
your reader? I don't see that you are configuring Kafka reader for that in
the code below.

On Wed, Jul 13, 2016 at 12:30 PM, amir bahmanyari <[email protected]>
wrote:

> try{
> PCollection<KV<String, String>>
> kafkarecords=p.apply(KafkaIO.read().withBootstrapServers("kafkaprt:9092").withTopics(topics).withValueCoder(StringUtf8Coder.of())
> .withoutMetadata()).apply(ParDo.named("startBundle").of(
> new DoFn<KV<byte[], String>, KV<String, String>>() {
> .....etc.
>

Reply via email to