Re: Kafka Streams And Partitioning

2021-03-17 Thread Gareth Collins
Hi Sophie,

Thanks very much for the response!

So if I understand correctly it will be impossible to avoid the repartition
topic?

e.g. my original message may have key = A...and will be partitioned on A.

But in my Kafka Streams app, I will want to aggregate on A:B or A:C or A:D
(B, C or D come from extra key values in the data)...but continue to
partition on A. Then later
read via REST all values for A. So to make this work I have to have a
repartition topic even though I am not really repartitioning (i.e. all
records for A should still be processed
together). Is my understanding correct?

So WindowedStreamPartitioner is a special case for avoiding the repartition
topic?

thanks in advance,
Gareth

On Wed, Mar 17, 2021 at 7:59 PM Sophie Blee-Goldman
 wrote:

> Hey Gareth,
>
> Kafka Streams state store partitioning is based on the partitioning of the
> upstream input topics.
> If you want your RocksDB stores to be partitioned based on the prefix of a
> key, then you should
> make sure the input topic feeding into it uses whatever partitioning
> strategy you had in mind.
>
> If the source topics are user input topics and you have control over the
> production to these topics,
> then just use a custom partitioner to produce to them. If you don't have
> control over them, you can
> insert an intermediate/repartition topic between the input topics and the
> subtopology with the RocksDB.
> Check out the KStream#repartitioned operator, it accepts a Repartitioned
> which itself accepts a
> StreamPartitioner that you can use to control the partitioning.
>
> You can check out the class WindowedStreamPartitioner for an example: this
> is how we handle the
> WindowStore case that you pointed out.
>
>
>
> On Mon, Mar 15, 2021 at 11:45 AM Gareth Collins <
> gareth.o.coll...@gmail.com>
> wrote:
>
> > Hi,
> >
> > This may be a newbie question but is it possible to control the
> > partitioning of a RocksDB KeyValueStore in Kafka Streams?
> >
> > For example, I perhaps only want to partition based on a prefix of a key
> > rather than the full key. I assume something similar must be done for the
> > WindowStore to partition without the window start time and sequence
> number
> > (otherwise window entries could be spread across partitions)?
> >
> > Sort of like the window store, I am wanting to be able to retrieve all
> > values with a certain key prefix from the KeyValueStore with one read
> > operation.
> >
> > Is this possible?
> >
> > thanks in advance,
> > Gareth Collins
> >
>


Re: Kafka Streams And Partitioning

2021-03-17 Thread Sophie Blee-Goldman
Hey Gareth,

Kafka Streams state store partitioning is based on the partitioning of the
upstream input topics.
If you want your RocksDB stores to be partitioned based on the prefix of a
key, then you should
make sure the input topic feeding into it uses whatever partitioning
strategy you had in mind.

If the source topics are user input topics and you have control over the
production to these topics,
then just use a custom partitioner to produce to them. If you don't have
control over them, you can
insert an intermediate/repartition topic between the input topics and the
subtopology with the RocksDB.
Check out the KStream#repartitioned operator, it accepts a Repartitioned
which itself accepts a
StreamPartitioner that you can use to control the partitioning.

You can check out the class WindowedStreamPartitioner for an example: this
is how we handle the
WindowStore case that you pointed out.



On Mon, Mar 15, 2021 at 11:45 AM Gareth Collins 
wrote:

> Hi,
>
> This may be a newbie question but is it possible to control the
> partitioning of a RocksDB KeyValueStore in Kafka Streams?
>
> For example, I perhaps only want to partition based on a prefix of a key
> rather than the full key. I assume something similar must be done for the
> WindowStore to partition without the window start time and sequence number
> (otherwise window entries could be spread across partitions)?
>
> Sort of like the window store, I am wanting to be able to retrieve all
> values with a certain key prefix from the KeyValueStore with one read
> operation.
>
> Is this possible?
>
> thanks in advance,
> Gareth Collins
>


Re: MirrorMaker 2 and Negative Lag

2021-03-17 Thread Alan Ning
OK. I follow now. Let me try to re-test to see if it makes a difference.

Thanks.

... Alan

On Wed, Mar 17, 2021 at 5:46 PM Samuel Cantero  wrote:

> I've found that bug the hard way. FWIW I've migrated several clusters from
> kafka 0.10 to kafka 2.x using mm2. So offsets sync work fine for kafka
> 0.10.
>
> Best,
>
> On Wed, Mar 17, 2021 at 6:43 PM Samuel Cantero 
> wrote:
>
> > No, what I meant is that offsets sync won't work if
> > `consumer.auto.offset.reset:latest` (it was not talking about that
> > particular bug). Try setting `consumer.auto.offset.reset:earliest` and do
> > verify if offsets are sync'd correctly.
> >
> > Best,
> >
> > On Wed, Mar 17, 2021 at 6:42 PM Alan Ning  wrote:
> >
> >> Hey Samuel,
> >>
> >> I am aware of that `consumer.auto.offset.reset:latest` problem. It was
> >> because this PR
> >> 
> never
> >> made it to trunk. I patched MM2 locally for 2.7 so that `latest` offset
> >> will work.
> >>
> >> ... Alan
> >>
> >> On Wed, Mar 17, 2021 at 4:50 PM Samuel Cantero 
> >> wrote:
> >>
> >> > I've seen this before. I've found that consumer offsets sync does not
> >> work
> >> > with `consumer.auto.offset.reset:latest`. If you set this to earliest,
> >> then
> >> > it should work. One way to workaround the need to start from earliest
> >> is by
> >> > starting with latest and once mirroring is ongoing swap to earliest.
> >> This
> >> > won't affect mirroring as the mm2 consumers will resume from the last
> >> > committed offsets.
> >> >
> >> > Best,
> >> >
> >> > On Wed, Mar 17, 2021 at 5:27 PM Ning Zhang 
> >> wrote:
> >> >
> >> > > Hello Alan,
> >> > >
> >> > > I may probably see the similar case. One quick validation that could
> >> be
> >> > > run is to test on the source cluster with higher Kafka version. If
> >> still
> >> > > not working, please email me and I could introduce you to person who
> >> may
> >> > > have similar case before.
> >> > >
> >> > > On 2021/03/15 21:59:03, Alan Ning  wrote:
> >> > > > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all
> topics
> >> > from
> >> > > > one cluster to another while preserving through
> >> > > > `sync.group.offsets.enabled=true`. My source cluster is running
> >> Kafka
> >> > > 0.10,
> >> > > > while the target cluster is running 2.6.1.
> >> > > >
> >> > > > While I can see data being replicated, the data on the replicated
> >> > > Consumer
> >> > > > Group in the target cluster looks wrong. The lag values of the
> >> > replicated
> >> > > > Consumer Group are large negative values, and the LOG-END-OFFSET
> are
> >> > > mostly
> >> > > > 0. I determined this information from kafka-consumer-groups.sh.
> >> > > >
> >> > > > I checked the
> >> kafka_consumer_consumer_fetch_manager_metrics_records_lag
> >> > > JMX
> >> > > > metrics in MM2 and the reported lag is zero for all partitions.
> >> > > >
> >> > > > By using `sync.group.offsets.enabled=true`, I envisioned that MM2
> >> will
> >> > > > automatically replicate and sync all Consumer Groups with a
> >> meaningful
> >> > > > offset in the target cluster. Am I misunderstanding how MM2 is
> >> supposed
> >> > > to
> >> > > > work?
> >> > > >
> >> > > > Here is my mm2.properties and the CG details.
> >> > > >
> >> > > > # mm2.properties
> >> > > > ```
> >> > > > clusters = src, dst
> >> > > > src.bootstrap.servers = 10.0.0.1:9092
> >> > > > dst.bootstrap.servers = 10.0.0.2:9092
> >> > > > src->dst.enabled = true
> >> > > > src->dst.topics = compute.*
> >> > > > src->dst.offset.flush.timeout.ms=6
> >> > > > src->dst.buffer.memory=1
> >> > > > dst->src.enabled = true
> >> > > > dst->src.topics = .*
> >> > > > replication.factor=3
> >> > > > src->dst.sync.group.offsets.enabled = true
> >> > > > src->dst.emit.checkpoints.enabled = true
> >> > > > src->dst.consumer.auto.offset.reset=latest
> >> > > > consumer.auto.offset.reset = latest
> >> > > > auto.offset.reset = latest
> >> > > > replication.policy.class =
> >> > > > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
> >> > > > checkpoints.topic.replication.factor=3
> >> > > > heartbeats.topic.replication.factor=3
> >> > > > offset-syncs.topic.replication.factor=3
> >> > > > offset.storage.replication.factor=3
> >> > > > status.storage.replication.factor=3
> >> > > > config.storage.replication.factor=3
> >> > > > sync.topic.acls.enabled = false
> >> > > > sync.group.offsets.enabled = true
> >> > > > emit.checkpoints.enabled = true
> >> > > > tasks.max = 8
> >> > > > dst.producer.offset.flush.timeout.ms = 6
> >> > > > dst.offset.flush.timeout.ms = 6
> >> > > > ```
> >> > > >
> >> > > > Consumer Group details
> >> > > > ```
> >> > > > GROUP TOPIC
> >> > > > PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> >> > > > CONSUMER-ID HOSTCLIENT-ID
> >> > > > kafka-group-Compute-Requests Compute-Requests 57 5305947
> >> > >  0
> >> > > >   -5305947-  

Re: MirrorMaker 2 and Negative Lag

2021-03-17 Thread Samuel Cantero
I've found that bug the hard way. FWIW I've migrated several clusters from
kafka 0.10 to kafka 2.x using mm2. So offsets sync work fine for kafka 0.10.

Best,

On Wed, Mar 17, 2021 at 6:43 PM Samuel Cantero  wrote:

> No, what I meant is that offsets sync won't work if
> `consumer.auto.offset.reset:latest` (it was not talking about that
> particular bug). Try setting `consumer.auto.offset.reset:earliest` and do
> verify if offsets are sync'd correctly.
>
> Best,
>
> On Wed, Mar 17, 2021 at 6:42 PM Alan Ning  wrote:
>
>> Hey Samuel,
>>
>> I am aware of that `consumer.auto.offset.reset:latest` problem. It was
>> because this PR
>>  never
>> made it to trunk. I patched MM2 locally for 2.7 so that `latest` offset
>> will work.
>>
>> ... Alan
>>
>> On Wed, Mar 17, 2021 at 4:50 PM Samuel Cantero 
>> wrote:
>>
>> > I've seen this before. I've found that consumer offsets sync does not
>> work
>> > with `consumer.auto.offset.reset:latest`. If you set this to earliest,
>> then
>> > it should work. One way to workaround the need to start from earliest
>> is by
>> > starting with latest and once mirroring is ongoing swap to earliest.
>> This
>> > won't affect mirroring as the mm2 consumers will resume from the last
>> > committed offsets.
>> >
>> > Best,
>> >
>> > On Wed, Mar 17, 2021 at 5:27 PM Ning Zhang 
>> wrote:
>> >
>> > > Hello Alan,
>> > >
>> > > I may probably see the similar case. One quick validation that could
>> be
>> > > run is to test on the source cluster with higher Kafka version. If
>> still
>> > > not working, please email me and I could introduce you to person who
>> may
>> > > have similar case before.
>> > >
>> > > On 2021/03/15 21:59:03, Alan Ning  wrote:
>> > > > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics
>> > from
>> > > > one cluster to another while preserving through
>> > > > `sync.group.offsets.enabled=true`. My source cluster is running
>> Kafka
>> > > 0.10,
>> > > > while the target cluster is running 2.6.1.
>> > > >
>> > > > While I can see data being replicated, the data on the replicated
>> > > Consumer
>> > > > Group in the target cluster looks wrong. The lag values of the
>> > replicated
>> > > > Consumer Group are large negative values, and the LOG-END-OFFSET are
>> > > mostly
>> > > > 0. I determined this information from kafka-consumer-groups.sh.
>> > > >
>> > > > I checked the
>> kafka_consumer_consumer_fetch_manager_metrics_records_lag
>> > > JMX
>> > > > metrics in MM2 and the reported lag is zero for all partitions.
>> > > >
>> > > > By using `sync.group.offsets.enabled=true`, I envisioned that MM2
>> will
>> > > > automatically replicate and sync all Consumer Groups with a
>> meaningful
>> > > > offset in the target cluster. Am I misunderstanding how MM2 is
>> supposed
>> > > to
>> > > > work?
>> > > >
>> > > > Here is my mm2.properties and the CG details.
>> > > >
>> > > > # mm2.properties
>> > > > ```
>> > > > clusters = src, dst
>> > > > src.bootstrap.servers = 10.0.0.1:9092
>> > > > dst.bootstrap.servers = 10.0.0.2:9092
>> > > > src->dst.enabled = true
>> > > > src->dst.topics = compute.*
>> > > > src->dst.offset.flush.timeout.ms=6
>> > > > src->dst.buffer.memory=1
>> > > > dst->src.enabled = true
>> > > > dst->src.topics = .*
>> > > > replication.factor=3
>> > > > src->dst.sync.group.offsets.enabled = true
>> > > > src->dst.emit.checkpoints.enabled = true
>> > > > src->dst.consumer.auto.offset.reset=latest
>> > > > consumer.auto.offset.reset = latest
>> > > > auto.offset.reset = latest
>> > > > replication.policy.class =
>> > > > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
>> > > > checkpoints.topic.replication.factor=3
>> > > > heartbeats.topic.replication.factor=3
>> > > > offset-syncs.topic.replication.factor=3
>> > > > offset.storage.replication.factor=3
>> > > > status.storage.replication.factor=3
>> > > > config.storage.replication.factor=3
>> > > > sync.topic.acls.enabled = false
>> > > > sync.group.offsets.enabled = true
>> > > > emit.checkpoints.enabled = true
>> > > > tasks.max = 8
>> > > > dst.producer.offset.flush.timeout.ms = 6
>> > > > dst.offset.flush.timeout.ms = 6
>> > > > ```
>> > > >
>> > > > Consumer Group details
>> > > > ```
>> > > > GROUP TOPIC
>> > > > PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
>> > > > CONSUMER-ID HOSTCLIENT-ID
>> > > > kafka-group-Compute-Requests Compute-Requests 57 5305947
>> > >  0
>> > > >   -5305947-   -   -
>> > > > kafka-group-Compute-Requests Compute-Requests 20 5164205
>> > >  0
>> > > >   -5164205-   -   -
>> > > > kafka-group-Compute-Requests Compute-Requests 53 4208527
>> > >  0
>> > > >   -4208527-   -   -
>> > > > kafka-group-Compute-Requests Compute-Requests 82 524

Re: MirrorMaker 2 and Negative Lag

2021-03-17 Thread Samuel Cantero
No, what I meant is that offsets sync won't work if
`consumer.auto.offset.reset:latest` (it was not talking about that
particular bug). Try setting `consumer.auto.offset.reset:earliest` and do
verify if offsets are sync'd correctly.

Best,

On Wed, Mar 17, 2021 at 6:42 PM Alan Ning  wrote:

> Hey Samuel,
>
> I am aware of that `consumer.auto.offset.reset:latest` problem. It was
> because this PR
>  never
> made it to trunk. I patched MM2 locally for 2.7 so that `latest` offset
> will work.
>
> ... Alan
>
> On Wed, Mar 17, 2021 at 4:50 PM Samuel Cantero 
> wrote:
>
> > I've seen this before. I've found that consumer offsets sync does not
> work
> > with `consumer.auto.offset.reset:latest`. If you set this to earliest,
> then
> > it should work. One way to workaround the need to start from earliest is
> by
> > starting with latest and once mirroring is ongoing swap to earliest. This
> > won't affect mirroring as the mm2 consumers will resume from the last
> > committed offsets.
> >
> > Best,
> >
> > On Wed, Mar 17, 2021 at 5:27 PM Ning Zhang 
> wrote:
> >
> > > Hello Alan,
> > >
> > > I may probably see the similar case. One quick validation that could be
> > > run is to test on the source cluster with higher Kafka version. If
> still
> > > not working, please email me and I could introduce you to person who
> may
> > > have similar case before.
> > >
> > > On 2021/03/15 21:59:03, Alan Ning  wrote:
> > > > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics
> > from
> > > > one cluster to another while preserving through
> > > > `sync.group.offsets.enabled=true`. My source cluster is running Kafka
> > > 0.10,
> > > > while the target cluster is running 2.6.1.
> > > >
> > > > While I can see data being replicated, the data on the replicated
> > > Consumer
> > > > Group in the target cluster looks wrong. The lag values of the
> > replicated
> > > > Consumer Group are large negative values, and the LOG-END-OFFSET are
> > > mostly
> > > > 0. I determined this information from kafka-consumer-groups.sh.
> > > >
> > > > I checked the
> kafka_consumer_consumer_fetch_manager_metrics_records_lag
> > > JMX
> > > > metrics in MM2 and the reported lag is zero for all partitions.
> > > >
> > > > By using `sync.group.offsets.enabled=true`, I envisioned that MM2
> will
> > > > automatically replicate and sync all Consumer Groups with a
> meaningful
> > > > offset in the target cluster. Am I misunderstanding how MM2 is
> supposed
> > > to
> > > > work?
> > > >
> > > > Here is my mm2.properties and the CG details.
> > > >
> > > > # mm2.properties
> > > > ```
> > > > clusters = src, dst
> > > > src.bootstrap.servers = 10.0.0.1:9092
> > > > dst.bootstrap.servers = 10.0.0.2:9092
> > > > src->dst.enabled = true
> > > > src->dst.topics = compute.*
> > > > src->dst.offset.flush.timeout.ms=6
> > > > src->dst.buffer.memory=1
> > > > dst->src.enabled = true
> > > > dst->src.topics = .*
> > > > replication.factor=3
> > > > src->dst.sync.group.offsets.enabled = true
> > > > src->dst.emit.checkpoints.enabled = true
> > > > src->dst.consumer.auto.offset.reset=latest
> > > > consumer.auto.offset.reset = latest
> > > > auto.offset.reset = latest
> > > > replication.policy.class =
> > > > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
> > > > checkpoints.topic.replication.factor=3
> > > > heartbeats.topic.replication.factor=3
> > > > offset-syncs.topic.replication.factor=3
> > > > offset.storage.replication.factor=3
> > > > status.storage.replication.factor=3
> > > > config.storage.replication.factor=3
> > > > sync.topic.acls.enabled = false
> > > > sync.group.offsets.enabled = true
> > > > emit.checkpoints.enabled = true
> > > > tasks.max = 8
> > > > dst.producer.offset.flush.timeout.ms = 6
> > > > dst.offset.flush.timeout.ms = 6
> > > > ```
> > > >
> > > > Consumer Group details
> > > > ```
> > > > GROUP TOPIC
> > > > PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > > > CONSUMER-ID HOSTCLIENT-ID
> > > > kafka-group-Compute-Requests Compute-Requests 57 5305947
> > >  0
> > > >   -5305947-   -   -
> > > > kafka-group-Compute-Requests Compute-Requests 20 5164205
> > >  0
> > > >   -5164205-   -   -
> > > > kafka-group-Compute-Requests Compute-Requests 53 4208527
> > >  0
> > > >   -4208527-   -   -
> > > > kafka-group-Compute-Requests Compute-Requests 82 5247928
> > >  0
> > > >   -5247928-   -   -
> > > > kafka-group-Compute-Requests Compute-Requests 65 5574520
> > >  0
> > > >   -5574520-   -   -
> > > > kafka-group-Compute-Requests Compute-Requests 11 5190708
> > > > 209 -5190499-   

Re: MirrorMaker 2 and Negative Lag

2021-03-17 Thread Alan Ning
Hey Ning,

My source cluster is a very old cluster running 0.10. I don't think I can
change the version. In fact, my whole effort is to migrate out of the
legacy cluster to 2.6+. Any insight would be greatly appreciated.

Thank you.

... Alan

On Wed, Mar 17, 2021 at 4:27 PM Ning Zhang  wrote:

> Hello Alan,
>
> I may probably see the similar case. One quick validation that could be
> run is to test on the source cluster with higher Kafka version. If still
> not working, please email me and I could introduce you to person who may
> have similar case before.
>
> On 2021/03/15 21:59:03, Alan Ning  wrote:
> > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics from
> > one cluster to another while preserving through
> > `sync.group.offsets.enabled=true`. My source cluster is running Kafka
> 0.10,
> > while the target cluster is running 2.6.1.
> >
> > While I can see data being replicated, the data on the replicated
> Consumer
> > Group in the target cluster looks wrong. The lag values of the replicated
> > Consumer Group are large negative values, and the LOG-END-OFFSET are
> mostly
> > 0. I determined this information from kafka-consumer-groups.sh.
> >
> > I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag
> JMX
> > metrics in MM2 and the reported lag is zero for all partitions.
> >
> > By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will
> > automatically replicate and sync all Consumer Groups with a meaningful
> > offset in the target cluster. Am I misunderstanding how MM2 is supposed
> to
> > work?
> >
> > Here is my mm2.properties and the CG details.
> >
> > # mm2.properties
> > ```
> > clusters = src, dst
> > src.bootstrap.servers = 10.0.0.1:9092
> > dst.bootstrap.servers = 10.0.0.2:9092
> > src->dst.enabled = true
> > src->dst.topics = compute.*
> > src->dst.offset.flush.timeout.ms=6
> > src->dst.buffer.memory=1
> > dst->src.enabled = true
> > dst->src.topics = .*
> > replication.factor=3
> > src->dst.sync.group.offsets.enabled = true
> > src->dst.emit.checkpoints.enabled = true
> > src->dst.consumer.auto.offset.reset=latest
> > consumer.auto.offset.reset = latest
> > auto.offset.reset = latest
> > replication.policy.class =
> > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
> > checkpoints.topic.replication.factor=3
> > heartbeats.topic.replication.factor=3
> > offset-syncs.topic.replication.factor=3
> > offset.storage.replication.factor=3
> > status.storage.replication.factor=3
> > config.storage.replication.factor=3
> > sync.topic.acls.enabled = false
> > sync.group.offsets.enabled = true
> > emit.checkpoints.enabled = true
> > tasks.max = 8
> > dst.producer.offset.flush.timeout.ms = 6
> > dst.offset.flush.timeout.ms = 6
> > ```
> >
> > Consumer Group details
> > ```
> > GROUP TOPIC
> > PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > CONSUMER-ID HOSTCLIENT-ID
> > kafka-group-Compute-Requests Compute-Requests 57 5305947
>  0
> >   -5305947-   -   -
> > kafka-group-Compute-Requests Compute-Requests 20 5164205
>  0
> >   -5164205-   -   -
> > kafka-group-Compute-Requests Compute-Requests 53 4208527
>  0
> >   -4208527-   -   -
> > kafka-group-Compute-Requests Compute-Requests 82 5247928
>  0
> >   -5247928-   -   -
> > kafka-group-Compute-Requests Compute-Requests 65 5574520
>  0
> >   -5574520-   -   -
> > kafka-group-Compute-Requests Compute-Requests 11 5190708
> > 209 -5190499-   -   -
> > ```
> >
> > Thanks
> >
> > ... Alan
> >
>


Re: MirrorMaker 2 and Negative Lag

2021-03-17 Thread Alan Ning
Hey Samuel,

I am aware of that `consumer.auto.offset.reset:latest` problem. It was
because this PR
 never
made it to trunk. I patched MM2 locally for 2.7 so that `latest` offset
will work.

... Alan

On Wed, Mar 17, 2021 at 4:50 PM Samuel Cantero  wrote:

> I've seen this before. I've found that consumer offsets sync does not work
> with `consumer.auto.offset.reset:latest`. If you set this to earliest, then
> it should work. One way to workaround the need to start from earliest is by
> starting with latest and once mirroring is ongoing swap to earliest. This
> won't affect mirroring as the mm2 consumers will resume from the last
> committed offsets.
>
> Best,
>
> On Wed, Mar 17, 2021 at 5:27 PM Ning Zhang  wrote:
>
> > Hello Alan,
> >
> > I may probably see the similar case. One quick validation that could be
> > run is to test on the source cluster with higher Kafka version. If still
> > not working, please email me and I could introduce you to person who may
> > have similar case before.
> >
> > On 2021/03/15 21:59:03, Alan Ning  wrote:
> > > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics
> from
> > > one cluster to another while preserving through
> > > `sync.group.offsets.enabled=true`. My source cluster is running Kafka
> > 0.10,
> > > while the target cluster is running 2.6.1.
> > >
> > > While I can see data being replicated, the data on the replicated
> > Consumer
> > > Group in the target cluster looks wrong. The lag values of the
> replicated
> > > Consumer Group are large negative values, and the LOG-END-OFFSET are
> > mostly
> > > 0. I determined this information from kafka-consumer-groups.sh.
> > >
> > > I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag
> > JMX
> > > metrics in MM2 and the reported lag is zero for all partitions.
> > >
> > > By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will
> > > automatically replicate and sync all Consumer Groups with a meaningful
> > > offset in the target cluster. Am I misunderstanding how MM2 is supposed
> > to
> > > work?
> > >
> > > Here is my mm2.properties and the CG details.
> > >
> > > # mm2.properties
> > > ```
> > > clusters = src, dst
> > > src.bootstrap.servers = 10.0.0.1:9092
> > > dst.bootstrap.servers = 10.0.0.2:9092
> > > src->dst.enabled = true
> > > src->dst.topics = compute.*
> > > src->dst.offset.flush.timeout.ms=6
> > > src->dst.buffer.memory=1
> > > dst->src.enabled = true
> > > dst->src.topics = .*
> > > replication.factor=3
> > > src->dst.sync.group.offsets.enabled = true
> > > src->dst.emit.checkpoints.enabled = true
> > > src->dst.consumer.auto.offset.reset=latest
> > > consumer.auto.offset.reset = latest
> > > auto.offset.reset = latest
> > > replication.policy.class =
> > > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
> > > checkpoints.topic.replication.factor=3
> > > heartbeats.topic.replication.factor=3
> > > offset-syncs.topic.replication.factor=3
> > > offset.storage.replication.factor=3
> > > status.storage.replication.factor=3
> > > config.storage.replication.factor=3
> > > sync.topic.acls.enabled = false
> > > sync.group.offsets.enabled = true
> > > emit.checkpoints.enabled = true
> > > tasks.max = 8
> > > dst.producer.offset.flush.timeout.ms = 6
> > > dst.offset.flush.timeout.ms = 6
> > > ```
> > >
> > > Consumer Group details
> > > ```
> > > GROUP TOPIC
> > > PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > > CONSUMER-ID HOSTCLIENT-ID
> > > kafka-group-Compute-Requests Compute-Requests 57 5305947
> >  0
> > >   -5305947-   -   -
> > > kafka-group-Compute-Requests Compute-Requests 20 5164205
> >  0
> > >   -5164205-   -   -
> > > kafka-group-Compute-Requests Compute-Requests 53 4208527
> >  0
> > >   -4208527-   -   -
> > > kafka-group-Compute-Requests Compute-Requests 82 5247928
> >  0
> > >   -5247928-   -   -
> > > kafka-group-Compute-Requests Compute-Requests 65 5574520
> >  0
> > >   -5574520-   -   -
> > > kafka-group-Compute-Requests Compute-Requests 11 5190708
> > > 209 -5190499-   -   -
> > > ```
> > >
> > > Thanks
> > >
> > > ... Alan
> > >
> >
>


Re: Right Use Case For Kafka Streams?

2021-03-17 Thread Guozhang Wang
Hello Gareth,

A common practice for rolling up aggregations with Kafka Streams is to do
the finest granularity at processor (5 days in your case), and to
coarse-grained rolling up upon query serving through the interactive query
API -- i.e. whenever a query is issued for a 30 day aggregate you do a
range scan on the 5-day-aggregate stores, and compute the rollup on the fly.

If you'd prefer to still materialize all of the granularities since maybe
their query frequency is high enough, maybe just go with three stores but
as three concatenated aggregations (i.e. a stream aggregation into 5-day,s
and the 5-day table aggregation to 10days, and 10-day table aggregation to
30-days).

Guozhang

On Mon, Mar 15, 2021 at 6:11 PM Gareth Collins 
wrote:

> Hi,
>
> We have a requirement to calculate metrics on a huge number of keys (could
> be hundreds of millions, perhaps billions of keys - attempting caching on
> individual keys in many cases will have almost a 0% cache hit rate). Is
> Kafka Streams with RocksDB and compacting topics the right tool for a task
> like that?
>
> As well, just from playing with Kafka Streams for a week it feels like it
> wants to create a lot of separate stores by default (if I want to calculate
> aggregates on five, ten and 30 days I will get three separate stores by
> default for this state data). Coming from a different distributed storage
> solution, I feel like I want to put them together in one store as I/O has
> always been my bottleneck (1 big read and 1 big write is better than three
> small separate reads and three small separate writes).
>
> But am I perhaps missing something here? I don't want to avoid the DSL that
> Kafka Streams provides if I don't have to. Will the Kafka Streams RocksDB
> solution be so much faster than a distributed read that it won't be the
> bottleneck even with huge amounts of data?
>
> Any info/opinions would be greatly appreciated.
>
> thanks in advance,
> Gareth Collins
>


-- 
-- Guozhang


Re: SAP PI/PO -> Kafka integration options

2021-03-17 Thread M. Manna
Kafka doesn’t have a rest proxy. Confluent does.
Also, Instaclustr offers a Kafka REST proxy.

Also, SAP has 100s of products including SAP Cloud Platform. So not sure
what this PI/PO means for your case.

Unless there’s something I’m unaware of, you referring to non-Apache
offering here. You might want to browse Confluent to understand what they
have for proxy.

Thanks,




On Wed, 17 Mar 2021 at 20:57, Blakaj Arian  wrote:

> Hi,
>
> My name is Arian. I work at Scania as an architect.
>
> We have SAP in our landscape and are using PI/PO as our integration
> platform. We are interested to know what type of integration options there
> is to integrate to Kafka from a SAP PI/PO perspective. We know that Kafka
> has an API proxy which we can utilize with our REST adapter but we are
> concerned as to if that will be sufficient enough for all of our use cases.
> There are adapters available for purchase (for instance Advantco Kafka
> adapter for PI/PO) but we would like to know if there is other options
> before we should take a look into that.
>
> Please also provide useful links, whitepaper regarding this (SAP PI/PO ->
> Kafka). Perhaps a meeting/demo session can be arranged.
>
> BR
> Arian
>


SAP PI/PO -> Kafka integration options

2021-03-17 Thread Blakaj Arian
Hi,

My name is Arian. I work at Scania as an architect.

We have SAP in our landscape and are using PI/PO as our integration platform. 
We are interested to know what type of integration options there is to 
integrate to Kafka from a SAP PI/PO perspective. We know that Kafka has an API 
proxy which we can utilize with our REST adapter but we are concerned as to if 
that will be sufficient enough for all of our use cases. There are adapters 
available for purchase (for instance Advantco Kafka adapter for PI/PO) but we 
would like to know if there is other options before we should take a look into 
that.

Please also provide useful links, whitepaper regarding this (SAP PI/PO -> 
Kafka). Perhaps a meeting/demo session can be arranged.

BR
Arian


Re: MirrorMaker 2 and Negative Lag

2021-03-17 Thread Samuel Cantero
I've seen this before. I've found that consumer offsets sync does not work
with `consumer.auto.offset.reset:latest`. If you set this to earliest, then
it should work. One way to workaround the need to start from earliest is by
starting with latest and once mirroring is ongoing swap to earliest. This
won't affect mirroring as the mm2 consumers will resume from the last
committed offsets.

Best,

On Wed, Mar 17, 2021 at 5:27 PM Ning Zhang  wrote:

> Hello Alan,
>
> I may probably see the similar case. One quick validation that could be
> run is to test on the source cluster with higher Kafka version. If still
> not working, please email me and I could introduce you to person who may
> have similar case before.
>
> On 2021/03/15 21:59:03, Alan Ning  wrote:
> > I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics from
> > one cluster to another while preserving through
> > `sync.group.offsets.enabled=true`. My source cluster is running Kafka
> 0.10,
> > while the target cluster is running 2.6.1.
> >
> > While I can see data being replicated, the data on the replicated
> Consumer
> > Group in the target cluster looks wrong. The lag values of the replicated
> > Consumer Group are large negative values, and the LOG-END-OFFSET are
> mostly
> > 0. I determined this information from kafka-consumer-groups.sh.
> >
> > I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag
> JMX
> > metrics in MM2 and the reported lag is zero for all partitions.
> >
> > By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will
> > automatically replicate and sync all Consumer Groups with a meaningful
> > offset in the target cluster. Am I misunderstanding how MM2 is supposed
> to
> > work?
> >
> > Here is my mm2.properties and the CG details.
> >
> > # mm2.properties
> > ```
> > clusters = src, dst
> > src.bootstrap.servers = 10.0.0.1:9092
> > dst.bootstrap.servers = 10.0.0.2:9092
> > src->dst.enabled = true
> > src->dst.topics = compute.*
> > src->dst.offset.flush.timeout.ms=6
> > src->dst.buffer.memory=1
> > dst->src.enabled = true
> > dst->src.topics = .*
> > replication.factor=3
> > src->dst.sync.group.offsets.enabled = true
> > src->dst.emit.checkpoints.enabled = true
> > src->dst.consumer.auto.offset.reset=latest
> > consumer.auto.offset.reset = latest
> > auto.offset.reset = latest
> > replication.policy.class =
> > com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
> > checkpoints.topic.replication.factor=3
> > heartbeats.topic.replication.factor=3
> > offset-syncs.topic.replication.factor=3
> > offset.storage.replication.factor=3
> > status.storage.replication.factor=3
> > config.storage.replication.factor=3
> > sync.topic.acls.enabled = false
> > sync.group.offsets.enabled = true
> > emit.checkpoints.enabled = true
> > tasks.max = 8
> > dst.producer.offset.flush.timeout.ms = 6
> > dst.offset.flush.timeout.ms = 6
> > ```
> >
> > Consumer Group details
> > ```
> > GROUP TOPIC
> > PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > CONSUMER-ID HOSTCLIENT-ID
> > kafka-group-Compute-Requests Compute-Requests 57 5305947
>  0
> >   -5305947-   -   -
> > kafka-group-Compute-Requests Compute-Requests 20 5164205
>  0
> >   -5164205-   -   -
> > kafka-group-Compute-Requests Compute-Requests 53 4208527
>  0
> >   -4208527-   -   -
> > kafka-group-Compute-Requests Compute-Requests 82 5247928
>  0
> >   -5247928-   -   -
> > kafka-group-Compute-Requests Compute-Requests 65 5574520
>  0
> >   -5574520-   -   -
> > kafka-group-Compute-Requests Compute-Requests 11 5190708
> > 209 -5190499-   -   -
> > ```
> >
> > Thanks
> >
> > ... Alan
> >
>


Re: Mirrormaker 2.0 - duplicates with idempotence enabled

2021-03-17 Thread Ning Zhang
Hello Vangelis,

By default, current MM 2.0 guarantees "at-least" once delivery guarantee, 
meaning there will be duplicate messages under some failure scenarios.

If you prefer to no-message loss, there is a pending PR about MM 2.0

https://issues.apache.org/jira/browse/KAFKA-10339

On 2021/03/10 07:45:17, Vangelis Typaldos  wrote: 
> Hi,
> 
> I have setup mirrormaker2 (Kafka v.2.6.0) on 2 clusters (CL1,CL2) and the 
> mirroring seems to work properly except with an issue with duplicates in the 
> following scenario:
> While both clusters are up and running i simulate an incident, stopping one 
> by one the brokers of the CL2 cluster. Stopping the first two brokers does 
> not generate any issue. All messages of my test topic are mirrored without 
> problems on CL1.topic of CL2 cluster. After stopping the last broker, 
> obviously will stop mirroring messages in the CL2 side as all brokers are 
> down. There is always active a producer that feeds with messages during the 
> test on topic of CL1.
> The problem starts on restarting the brokers. After starting the first broker 
> i note that some messages (about 5%) are duplicated. I have connected a 
> client on CL1.topic and i can confirm that indeed there are duplicated 
> messages in my mirrored topic. 
> Kindly suggest how i could avoid these duplicates. Idempotence may not work 
> correctly during broker shutdown?
> In the following you can find my MM2 relative config
> clusters = CL1, CL2
> CL1.bootstrap.servers = broker1CL1:9092, broker2CL1:9092, broker3CL1:9092
> CL2.bootstrap.servers = broker1CL2:9092, broker2CL2:9092, broker3CL2:9092
> 
> PRIM->DSTR.enabled = true
> DSTR->PRIM.enabled = true
> 
> CL1.producer.enable.idempotence = true
> CL1.producer.acks=all
> CL1.producer.max.in.flight.requests.per.connection=5
> CL1.producer.retries=2147483647
> CL1.consumer.isolation.level=read_committed
> CL2.producer.enable.idempotence = true
> CL2.producer.acks=all
> CL2.producer.max.in.flight.requests.per.connection=5
> CL2.producer.retries=2147483647
> CL2.consumer.isolation.level=read_committed
> 
> 
> Best Regards,
> 


Re: MirrorMaker 2 and Negative Lag

2021-03-17 Thread Ning Zhang
Hello Alan,

I may probably see the similar case. One quick validation that could be run is 
to test on the source cluster with higher Kafka version. If still not working, 
please email me and I could introduce you to person who may have similar case 
before.

On 2021/03/15 21:59:03, Alan Ning  wrote: 
> I am running MirrorMaker 2 (Kafka 2.7), trying to migrate all topics from
> one cluster to another while preserving through
> `sync.group.offsets.enabled=true`. My source cluster is running Kafka 0.10,
> while the target cluster is running 2.6.1.
> 
> While I can see data being replicated, the data on the replicated Consumer
> Group in the target cluster looks wrong. The lag values of the replicated
> Consumer Group are large negative values, and the LOG-END-OFFSET are mostly
> 0. I determined this information from kafka-consumer-groups.sh.
> 
> I checked the kafka_consumer_consumer_fetch_manager_metrics_records_lag JMX
> metrics in MM2 and the reported lag is zero for all partitions.
> 
> By using `sync.group.offsets.enabled=true`, I envisioned that MM2 will
> automatically replicate and sync all Consumer Groups with a meaningful
> offset in the target cluster. Am I misunderstanding how MM2 is supposed to
> work?
> 
> Here is my mm2.properties and the CG details.
> 
> # mm2.properties
> ```
> clusters = src, dst
> src.bootstrap.servers = 10.0.0.1:9092
> dst.bootstrap.servers = 10.0.0.2:9092
> src->dst.enabled = true
> src->dst.topics = compute.*
> src->dst.offset.flush.timeout.ms=6
> src->dst.buffer.memory=1
> dst->src.enabled = true
> dst->src.topics = .*
> replication.factor=3
> src->dst.sync.group.offsets.enabled = true
> src->dst.emit.checkpoints.enabled = true
> src->dst.consumer.auto.offset.reset=latest
> consumer.auto.offset.reset = latest
> auto.offset.reset = latest
> replication.policy.class =
> com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
> checkpoints.topic.replication.factor=3
> heartbeats.topic.replication.factor=3
> offset-syncs.topic.replication.factor=3
> offset.storage.replication.factor=3
> status.storage.replication.factor=3
> config.storage.replication.factor=3
> sync.topic.acls.enabled = false
> sync.group.offsets.enabled = true
> emit.checkpoints.enabled = true
> tasks.max = 8
> dst.producer.offset.flush.timeout.ms = 6
> dst.offset.flush.timeout.ms = 6
> ```
> 
> Consumer Group details
> ```
> GROUP TOPIC
> PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> CONSUMER-ID HOSTCLIENT-ID
> kafka-group-Compute-Requests Compute-Requests 57 5305947 0
>   -5305947-   -   -
> kafka-group-Compute-Requests Compute-Requests 20 5164205 0
>   -5164205-   -   -
> kafka-group-Compute-Requests Compute-Requests 53 4208527 0
>   -4208527-   -   -
> kafka-group-Compute-Requests Compute-Requests 82 5247928 0
>   -5247928-   -   -
> kafka-group-Compute-Requests Compute-Requests 65 5574520 0
>   -5574520-   -   -
> kafka-group-Compute-Requests Compute-Requests 11 5190708
> 209 -5190499-   -   -
> ```
> 
> Thanks
> 
> ... Alan
> 


Re: Issue writing to Google BigQuery

2021-03-17 Thread Mich Talebzadeh
On the sideline I also validated the Json format

{"schema": { "type": "struct", "fields": [ { "field": "rowkey", "type":
"string", "optional": true}],"optional": false,"name": "BQ"}, "payload":
{"rowkey": "c0224abd-a6c4-4743-ac01-55e7b6980062"}}

[image: image.png]

Thanks


LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 17 Mar 2021 at 09:56, Mich Talebzadeh 
wrote:

>
> This is what is termed as fun and game.
>
> Trying to write a single column (for the sake of test) in this case to
> BigQuery from Kafka. I am sending the schema and payload as per docs
>
> This is message sent that I can get it from console
>
> $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server rhes75:9092 
> --from-beginning --topic md --property print.key=true
>
> Note that it also prints kafka key
>
>
> 9485818a-e6c5-434d-9096-29c6e3f55148{"schema": { "type": "struct", 
> "fields": [ { "field": "rowkey", "type": "string", "optional": 
> true}],"optional": false,"name": "BQ"}, "payload": {"rowkey": 
> "9485818a-e6c5-434d-9096-29c6e3f55148"}}
>
> The error thrown is
>
>
> [2021-03-17 09:29:16,655] ERROR WorkerSinkTask{id=bigquery-sink-0} Task threw 
> an uncaught and unrecoverable exception. Task is being killed and will not 
> recover until manually restarted. Error: Top-level Kafka Connect schema must 
> be of type 'struct' (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
>
> This is the the standalone properties file
>
>
> bootstrap.servers=rhes75:9092
> key.converter=org.apache.kafka.connect.storage.StringConverter
> #key.converter=org.apache.kafka.connect.json.JsonConverter
> key.converter.schemas.enable=true
> value.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter.schemas.enable=true
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
> offset.storage.file.filename=/tmp/connect_bq.offsetsoffset.flush.interval.ms=1
>
> and this is the sink properties file
>
>
> name=bigquery-sink
> connector.type=bigquery-connector
> connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> defaultDataset=test
> project=project_name
> topics=md
> autoCreateTables=false
> gcsBucketName=tmp_storage_bucket
> queueSize=-1
> bigQueryRetry=0
> bigQueryRetryWait=1000
> bigQueryMessageTimePartitioning=false
> bigQueryPartitionDecorator=true
> timePartitioningType=DAY
> keySource=FILE
> keyfile=xxx.json
> sanitizeTopics=false
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> threadPoolSize=10
> allBQFieldsNullable=false
> avroDataCacheSize=100
> batchLoadIntervalSec=120
> convertDoubleSpecialValues=false
> enableBatchLoad=false
> upsertEnabled=false
> deleteEnabled=false
> mergeIntervalMs=6
> mergeRecordsThreshold=-1
> autoCreateBucket=true
> allowNewBigQueryFields=false
> allowBigQueryRequiredFieldRelaxation=false
> allowSchemaUnionization=false
> kafkaDataFieldName=null
> kafkaKeyFieldName=null
>
> I am sure someone should be able to spot the error here.
>
>
> Many thanks
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


SAP PI/PO -> Kafka integration options

2021-03-17 Thread Blakaj Arian
Hi,

My name is Arian. I work at Scania as an architect.

We have SAP in our landscape and are using PI/PO as our integration platform. 
We are interested to know what type of integration options there is to 
integrate to Kafka from a SAP PI/PO perspective. We know that Kafka has an API 
proxy which we can utilize with our REST adapter but we are concerned as to if 
that will be sufficient enough for all of our use cases. There are adapters 
available for purchase (for instance Advantco Kafka adapter for PI/PO) but we 
would like to know if there is other options before we should take a look into 
that.
Please also provide useful links, whitepaper regarding this (SAP PI/PO -> 
Kafka). Perhaps a meeting/demo session can be arranged.

BR
Arian



Issue writing to Google BigQuery

2021-03-17 Thread Mich Talebzadeh
This is what is termed as fun and game.

Trying to write a single column (for the sake of test) in this case to
BigQuery from Kafka. I am sending the schema and payload as per docs

This is message sent that I can get it from console

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server
rhes75:9092 --from-beginning --topic md --property print.key=true

Note that it also prints kafka key


9485818a-e6c5-434d-9096-29c6e3f55148{"schema": { "type": "struct",
"fields": [ { "field": "rowkey", "type": "string", "optional":
true}],"optional": false,"name": "BQ"}, "payload": {"rowkey":
"9485818a-e6c5-434d-9096-29c6e3f55148"}}

The error thrown is


[2021-03-17 09:29:16,655] ERROR WorkerSinkTask{id=bigquery-sink-0}
Task threw an uncaught and unrecoverable exception. Task is being
killed and will not recover until manually restarted. Error: Top-level
Kafka Connect schema must be of type 'struct'
(org.apache.kafka.connect.runtime.WorkerSinkTask:612)

This is the the standalone properties file


bootstrap.servers=rhes75:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect_bq.offsetsoffset.flush.interval.ms=1

and this is the sink properties file


name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=project_name
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=xxx.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=6
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null

I am sure someone should be able to spot the error here.


Many thanks


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Emit events that are NOT joined

2021-03-17 Thread Rubén Terceño
Maybe it’s not what you are looking for, but I think that functionality can
be implemented in two steps.

First, you perform a LEFT join, and then you filter by “null” on the joined
field to identify those whose joins didn’t succeed.

HTH,

Rubén

El El mar, 16 mar 2021 a las 5:34, Ross Black 
escribió:

> Hi,
>
> I am trying to find the best pattern to solve a specific problem using
> Kafka streaming.  All of our current processing uses the Kafka streaming
> API (using multiple joins, windows, repartitions etc) so I already think I
> have a decent grasp of the fundamentals.
>
> We have 2 streams of events:
> - primary events (P), which indicate some key event in the system and carry
> a large amount of data
> - secondary events (S), which should *always* occur as a follow-on to the
> primary event and only contain a reference to the single associated primary
> event.
>
> I want to join secondary events to primary events (the simple part) BUT I
> also want to find out when secondary events have been *unable* to be
> joined.
> A secondary is unable to be joined:
> - when primary event delivery has been delayed (so that secondary events
> are received before the associated primary event)
> - when primary events go missing (the event collection system is noisy, so
> we do lose a small bu significant number of primary events)
> - due to coding errors in the collectors, where an incorrect reference has
> been inserted into the secondary event
>
> Currently this functionality is implemented using a database:
> - primary events are inserted into the database and then secondary events
> lookup the primary by-reference.  If the primary is found the secondary is
> sent to a "JOINED" topic.
> - if the primary is not found, the secondary event is buffered in the
> database until the primary is received and then joined+emitted (and the
> secondary event is removed from the DB)
> - after some arbitrary time period, the database is queried for outstanding
> not-joined secondary events and they are emitted to an "UNJOINED" topic.
> This allows alerting on unmatched secondary events to drive quality
> measures, and allows offline analysis (to understand why)
>
> Some options:
> 1. Implement the same strategy as existing except using Kafka state stores
> instead of the DB.  With this approach I am concerned about atomic
> correctness - i.e. that state in the Kafka store can be managed so that the
> event is never sent to both JOINED and UNJOINED.
>
>
> 2. Continually emit key-values to a "PENDING" topic for the secondary join.
> An example sequence could be something like ...(where primary events = P,
> secondary events = S) :
> a) receive S with no matching P => emit {S, false}
> b) receive matching P for S => emit {S, null} (to effectively delete it
> from the topic)
> c) receive S with matching P => do not emit anything
>
> Now the problem becomes more like building a time-window of events from
> PENDING, to eventually emit the events to UNJOINED.  I am also uncertain as
> how to ensure events can never end up in both JOINED and UNJOINED.
>
> My apologies for the wall of text .. I find it a difficult problem to
> explain. 😏
>
>
> Is there some pattern I am missing that will help solve this problem?
> Any help / other suggestions would be appreciated.
>
> Thanks,
> Ross
>
-- 
Rubén
--



Confluent
Data in Motion

Rubén Terceño Rodríguez

Director, Solutions Engineering, EMEA

+34 661 42 42 28

Follow us:  Blog

• Slack  • Twitter
 • YouTube