[jira] [Created] (KAFKA-9390) Non-key joining of KTable not compatible with confluent avro serdes

2020-01-08 Thread Andy Bryant (Jira)
Andy Bryant created KAFKA-9390:
--

 Summary: Non-key joining of KTable not compatible with confluent 
avro serdes
 Key: KAFKA-9390
 URL: https://issues.apache.org/jira/browse/KAFKA-9390
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.0
Reporter: Andy Bryant


I was trying out the new one-to-many KTable joins against some CDC data in Avro 
format and kept getting serialisation errors.

 
{code:java}
org.apache.kafka.common.errors.SerializationException: Error registering Avro 
schema: 
{"type":"record","name":"Key","namespace":"dbserver1.inventory.orders","fields":[

{"name":"order_number","type":"int"}

],"connect.name":"dbserver1.inventory.orders.Key"}
 Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Schema being registered is incompatible with an earlier schema; error code: 409
  
{code}
Both tables have avro keys of different types (one is an order key, the other a 
customer key).

This looks like it will cause issues.

[https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java#L57-L60]

 They will both attempt to register schemas with the same subject to the schema 
registry which will fail a backward compatibility check.

I also noticed in the schema registry there were some subjects that didn't have 
the application id prefix. This is probably caused by this...

 
[https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java#L88]

Where here {{repartitionTopicName}} doesn't have the application prefix.

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9390) Non-key joining of KTable not compatible with confluent avro serdes

2020-01-08 Thread Andy Bryant (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011426#comment-17011426
 ] 

Andy Bryant edited comment on KAFKA-9390 at 1/9/20 5:25 AM:


Here's the code snippet. I'm joining the orders table to the customers table 
using the customer key as the foreign key.
{code:java}
final KTable customersTable =
CdcHelpers.valueTable(streams.customers(), "customersTable");
final KTable 
ordersTable =
CdcHelpers.valueTable(streams.orders(), "ordersTable");
final KTable enrichedOrders =
ordersTable.join(
  customersTable,
  order -> new dbserver1.inventory.customers.Key(order.getPurchaser()),
  (order, customer) ->
  OrderView.newBuilder()
.setCustomerName(customer.getFirstName() + ' ' + 
customer.getLastName())
.setOrderId(order.getOrderNumber())

.setOrderDate(LocalDate.fromDateFields(Date.valueOf(java.time.LocalDate.ofEpochDay(order.getOrderDate()
.setOrderQuantity(order.getQuantity())
.setProductName("unknown")
.setProductWeight(0.0d)
.build(),
Named.as("wtf"),
AvroSerdes.materializedAs("ordersWithCustomerxx"));
{code}


was (Author: kiwiandy):
Here's the code snipped. I'm joining the orders table to the customers table 
using the customer key as the foreign key.
{code:java}
final KTable customersTable =
CdcHelpers.valueTable(streams.customers(), "customersTable");
final KTable 
ordersTable =
CdcHelpers.valueTable(streams.orders(), "ordersTable");
final KTable enrichedOrders =
ordersTable.join(
  customersTable,
  order -> new dbserver1.inventory.customers.Key(order.getPurchaser()),
  (order, customer) ->
  OrderView.newBuilder()
.setCustomerName(customer.getFirstName() + ' ' + 
customer.getLastName())
.setOrderId(order.getOrderNumber())

.setOrderDate(LocalDate.fromDateFields(Date.valueOf(java.time.LocalDate.ofEpochDay(order.getOrderDate()
.setOrderQuantity(order.getQuantity())
.setProductName("unknown")
.setProductWeight(0.0d)
.build(),
Named.as("wtf"),
AvroSerdes.materializedAs("ordersWithCustomerxx"));
{code}

> Non-key joining of KTable not compatible with confluent avro serdes
> ---
>
> Key: KAFKA-9390
> URL: https://issues.apache.org/jira/browse/KAFKA-9390
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Andy Bryant
>Priority: Major
>
> I was trying out the new one-to-many KTable joins against some CDC data in 
> Avro format and kept getting serialisation errors.
>  
> {code:java}
> org.apache.kafka.common.errors.SerializationException: Error registering Avro 
> schema: 
> {"type":"record","name":"Key","namespace":"dbserver1.inventory.orders","fields":[
> {"name":"order_number","type":"int"}
> ],"connect.name":"dbserver1.inventory.orders.Key"}
>  Caused by: 
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
> Schema being registered is incompatible with an earlier schema; error code: 
> 409
>   
> {code}
> Both tables have avro keys of different types (one is an order key, the other 
> a customer key).
> This looks like it will cause issues.
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java#L57-L60]
>  They will both attempt to register schemas with the same subject to the 
> schema registry which will fail a backward compatibility check.
> I also noticed in the schema registry there were some subjects that didn't 
> have the application id prefix. This is probably caused by this...
>  
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java#L88]
> Where here {{repartitionTopicName}} doesn't have the application prefix.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9390) Non-key joining of KTable not compatible with confluent avro serdes

2020-01-08 Thread Andy Bryant (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011426#comment-17011426
 ] 

Andy Bryant commented on KAFKA-9390:


Here's the code snipped. I'm joining the orders table to the customers table 
using the customer key as the foreign key.
{code:java}
final KTable customersTable =
CdcHelpers.valueTable(streams.customers(), "customersTable");
final KTable 
ordersTable =
CdcHelpers.valueTable(streams.orders(), "ordersTable");
final KTable enrichedOrders =
ordersTable.join(
  customersTable,
  order -> new dbserver1.inventory.customers.Key(order.getPurchaser()),
  (order, customer) ->
  OrderView.newBuilder()
.setCustomerName(customer.getFirstName() + ' ' + 
customer.getLastName())
.setOrderId(order.getOrderNumber())

.setOrderDate(LocalDate.fromDateFields(Date.valueOf(java.time.LocalDate.ofEpochDay(order.getOrderDate()
.setOrderQuantity(order.getQuantity())
.setProductName("unknown")
.setProductWeight(0.0d)
.build(),
Named.as("wtf"),
AvroSerdes.materializedAs("ordersWithCustomerxx"));
{code}

> Non-key joining of KTable not compatible with confluent avro serdes
> ---
>
> Key: KAFKA-9390
> URL: https://issues.apache.org/jira/browse/KAFKA-9390
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Andy Bryant
>Priority: Major
>
> I was trying out the new one-to-many KTable joins against some CDC data in 
> Avro format and kept getting serialisation errors.
>  
> {code:java}
> org.apache.kafka.common.errors.SerializationException: Error registering Avro 
> schema: 
> {"type":"record","name":"Key","namespace":"dbserver1.inventory.orders","fields":[
> {"name":"order_number","type":"int"}
> ],"connect.name":"dbserver1.inventory.orders.Key"}
>  Caused by: 
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
> Schema being registered is incompatible with an earlier schema; error code: 
> 409
>   
> {code}
> Both tables have avro keys of different types (one is an order key, the other 
> a customer key).
> This looks like it will cause issues.
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java#L57-L60]
>  They will both attempt to register schemas with the same subject to the 
> schema registry which will fail a backward compatibility check.
> I also noticed in the schema registry there were some subjects that didn't 
> have the application id prefix. This is probably caused by this...
>  
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java#L88]
> Where here {{repartitionTopicName}} doesn't have the application prefix.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9390) Non-key joining of KTable not compatible with confluent avro serdes

2020-01-08 Thread Andy Bryant (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17011427#comment-17011427
 ] 

Andy Bryant commented on KAFKA-9390:


Also I'm using the 2.4 version of Kafka Streams with 5.3.2 version of confluent 
serdes (since 5.4 isn't out)

> Non-key joining of KTable not compatible with confluent avro serdes
> ---
>
> Key: KAFKA-9390
> URL: https://issues.apache.org/jira/browse/KAFKA-9390
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Andy Bryant
>Priority: Major
>
> I was trying out the new one-to-many KTable joins against some CDC data in 
> Avro format and kept getting serialisation errors.
>  
> {code:java}
> org.apache.kafka.common.errors.SerializationException: Error registering Avro 
> schema: 
> {"type":"record","name":"Key","namespace":"dbserver1.inventory.orders","fields":[
> {"name":"order_number","type":"int"}
> ],"connect.name":"dbserver1.inventory.orders.Key"}
>  Caused by: 
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
> Schema being registered is incompatible with an earlier schema; error code: 
> 409
>   
> {code}
> Both tables have avro keys of different types (one is an order key, the other 
> a customer key).
> This looks like it will cause issues.
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java#L57-L60]
>  They will both attempt to register schemas with the same subject to the 
> schema registry which will fail a backward compatibility check.
> I also noticed in the schema registry there were some subjects that didn't 
> have the application id prefix. This is probably caused by this...
>  
> [https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java#L88]
> Where here {{repartitionTopicName}} doesn't have the application prefix.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-7608) A Kafka Streams DSL transform or processor call should trigger a repartition like a join

2018-11-08 Thread Andy Bryant (JIRA)
Andy Bryant created KAFKA-7608:
--

 Summary: A Kafka Streams DSL transform or processor call should 
trigger a repartition like a join
 Key: KAFKA-7608
 URL: https://issues.apache.org/jira/browse/KAFKA-7608
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.0
Reporter: Andy Bryant


Currently in Kafka Streams, if any DSL operation occurs that may modify the 
keys of the record stream, the stream is flagged for repartitioning. Currently 
this flag is checked prior to a stream join or an aggregation and if set the 
stream is piped through a transient repartition topic. This ensures messages 
with the same key are always co-located in the same partition and hence same 
stream task and state store.

The same mechanism should be used to trigger repartitioning prior to stream 
{{transform}}, {{transformValues}} and {{process}} calls that specify one or 
more state stores.

Currently without the forced repartitioning, for streams where the key has been 
modified, there is no guarantee the same keys will be processed by the same 
task which would be what you expect when using a state store. Given that 
aggregations and joins already automatically make this guarantee it seems 
inconsistent that {{transform}} and {{process}} do not provide the same 
guarantees.

To achieve the same guarantees currently, developers must manually pipe the 
stream through a topic to force the repartitioning. This works, but is 
sub-optimal since you don't get the handy optimisation where the repartition 
topic contents is purged after use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6641) Consider auto repartitioning for Stream.transform() API

2018-11-08 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679272#comment-16679272
 ] 

Andy Bryant commented on KAFKA-6641:


This is done now by the looks. In KStreamImpl.transform it generated a new 
KStreamImpl with repartitionRequired flat set to true. Similarly for 
transformValues it just propagates the current flag value as expect.

 

> Consider auto repartitioning for Stream.transform() API
> ---
>
> Key: KAFKA-6641
> URL: https://issues.apache.org/jira/browse/KAFKA-6641
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: api
>
> Today with map / mapValues of Streams DSL, we will set a flag / not set a 
> flag for the underlying topology builder; but for transform / 
> transformValues, we do not make such marking choices. Maybe the topology 
> builder can still make such flagging for transform() to indicate that since 
> the key maybe changed, we should issue a repartition for the downstream 
> stateful operators when necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7608) A Kafka Streams DSL transform or process call should potentially trigger a repartition

2018-11-08 Thread Andy Bryant (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Bryant updated KAFKA-7608:
---
Summary: A Kafka Streams DSL transform or process call should potentially 
trigger a repartition  (was: A Kafka Streams DSL transform or process call 
should trigger a repartition, like a join)

> A Kafka Streams DSL transform or process call should potentially trigger a 
> repartition
> --
>
> Key: KAFKA-7608
> URL: https://issues.apache.org/jira/browse/KAFKA-7608
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Andy Bryant
>Priority: Major
>
> Currently in Kafka Streams, if any DSL operation occurs that may modify the 
> keys of the record stream, the stream is flagged for repartitioning. 
> Currently this flag is checked prior to a stream join or an aggregation and 
> if set the stream is piped through a transient repartition topic. This 
> ensures messages with the same key are always co-located in the same 
> partition and hence same stream task and state store.
> The same mechanism should be used to trigger repartitioning prior to stream 
> {{transform}}, {{transformValues}} and {{process}} calls that specify one or 
> more state stores.
> Currently without the forced repartitioning, for streams where the key has 
> been modified, there is no guarantee the same keys will be processed by the 
> same task which would be what you expect when using a state store. Given that 
> aggregations and joins already automatically make this guarantee it seems 
> inconsistent that {{transform}} and {{process}} do not provide the same 
> guarantees.
> To achieve the same guarantees currently, developers must manually pipe the 
> stream through a topic to force the repartitioning. This works, but is 
> sub-optimal since you don't get the handy optimisation where the repartition 
> topic contents is purged after use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7608) A Kafka Streams DSL transform or process call should trigger a repartition, like a join

2018-11-08 Thread Andy Bryant (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Bryant updated KAFKA-7608:
---
Summary: A Kafka Streams DSL transform or process call should trigger a 
repartition, like a join  (was: A Kafka Streams DSL transform or processor call 
should trigger a repartition like a join)

> A Kafka Streams DSL transform or process call should trigger a repartition, 
> like a join
> ---
>
> Key: KAFKA-7608
> URL: https://issues.apache.org/jira/browse/KAFKA-7608
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Andy Bryant
>Priority: Major
>
> Currently in Kafka Streams, if any DSL operation occurs that may modify the 
> keys of the record stream, the stream is flagged for repartitioning. 
> Currently this flag is checked prior to a stream join or an aggregation and 
> if set the stream is piped through a transient repartition topic. This 
> ensures messages with the same key are always co-located in the same 
> partition and hence same stream task and state store.
> The same mechanism should be used to trigger repartitioning prior to stream 
> {{transform}}, {{transformValues}} and {{process}} calls that specify one or 
> more state stores.
> Currently without the forced repartitioning, for streams where the key has 
> been modified, there is no guarantee the same keys will be processed by the 
> same task which would be what you expect when using a state store. Given that 
> aggregations and joins already automatically make this guarantee it seems 
> inconsistent that {{transform}} and {{process}} do not provide the same 
> guarantees.
> To achieve the same guarantees currently, developers must manually pipe the 
> stream through a topic to force the repartitioning. This works, but is 
> sub-optimal since you don't get the handy optimisation where the repartition 
> topic contents is purged after use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6641) Consider auto repartitioning for Stream.transform() API

2018-11-08 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16679272#comment-16679272
 ] 

Andy Bryant edited comment on KAFKA-6641 at 11/8/18 4:22 AM:
-

This is done now by the looks. In {{KStreamImpl.transform}} it generated a new 
{{KStreamImpl}} with {{repartitionRequired}} flag set to {{true}}. Similarly 
for {{transformValues}} it just propagates the current flag value as expected.

 


was (Author: kiwiandy):
This is done now by the looks. In KStreamImpl.transform it generated a new 
KStreamImpl with repartitionRequired flat set to true. Similarly for 
transformValues it just propagates the current flag value as expect.

 

> Consider auto repartitioning for Stream.transform() API
> ---
>
> Key: KAFKA-6641
> URL: https://issues.apache.org/jira/browse/KAFKA-6641
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: api
>
> Today with map / mapValues of Streams DSL, we will set a flag / not set a 
> flag for the underlying topology builder; but for transform / 
> transformValues, we do not make such marking choices. Maybe the topology 
> builder can still make such flagging for transform() to indicate that since 
> the key maybe changed, we should issue a repartition for the downstream 
> stateful operators when necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7608) A Kafka Streams DSL transform or process call should potentially trigger a repartition

2018-11-15 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688729#comment-16688729
 ] 

Andy Bryant commented on KAFKA-7608:


Hi Bill

That sounds like a better solution. I see your point about not assuming how the 
state store is used especially given these are lower level APIs. I like the 
idea of giving hints that a topic can be internally managed as this may be 
generally useful in other patterns as well.

As part of the KIP it would be worth calling out in the docs that a repartition 
is not triggered for these calls, and providing the above suggestion if they do 
indeed need a repartition.

Cheers

Andy 

> A Kafka Streams DSL transform or process call should potentially trigger a 
> repartition
> --
>
> Key: KAFKA-7608
> URL: https://issues.apache.org/jira/browse/KAFKA-7608
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Andy Bryant
>Priority: Major
>
> Currently in Kafka Streams, if any DSL operation occurs that may modify the 
> keys of the record stream, the stream is flagged for repartitioning. 
> Currently this flag is checked prior to a stream join or an aggregation and 
> if set the stream is piped through a transient repartition topic. This 
> ensures messages with the same key are always co-located in the same 
> partition and hence same stream task and state store.
> The same mechanism should be used to trigger repartitioning prior to stream 
> {{transform}}, {{transformValues}} and {{process}} calls that specify one or 
> more state stores.
> Currently without the forced repartitioning, for streams where the key has 
> been modified, there is no guarantee the same keys will be processed by the 
> same task which would be what you expect when using a state store. Given that 
> aggregations and joins already automatically make this guarantee it seems 
> inconsistent that {{transform}} and {{process}} do not provide the same 
> guarantees.
> To achieve the same guarantees currently, developers must manually pipe the 
> stream through a topic to force the repartitioning. This works, but is 
> sub-optimal since you don't get the handy optimisation where the repartition 
> topic contents is purged after use.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7671) A KStream/GlobalKTable join shouldn't reset the repartition flag

2018-11-25 Thread Andy Bryant (JIRA)
Andy Bryant created KAFKA-7671:
--

 Summary: A KStream/GlobalKTable join shouldn't reset the 
repartition flag
 Key: KAFKA-7671
 URL: https://issues.apache.org/jira/browse/KAFKA-7671
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.0.1
Reporter: Andy Bryant


Currently a KStream/GlobalKTable join resets the repartition required flag to 
false.

I have a topology where I map a stream, join with a GlobalKTable, then 
groupByKey then aggregate.

The aggregate wasn't behaving correctly because it didn't force a repartition 
as I expected. The KStream/GlobalKTable join had reset the flag and hence I was 
getting the same keys in different partitions.

Since a KStream/GlobalKTable join does not itself force a repartition, it 
should simply propagate the flag down to the resultant KStream the same way 
most other operators work.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7671) A KStream/GlobalKTable join shouldn't reset the repartition flag

2018-11-27 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701203#comment-16701203
 ] 

Andy Bryant commented on KAFKA-7671:


Hi [~mjsax],

I just checked the old versions (1.1, 1.0, 0.11, 0.10.2). Looks like the 
{{repartitionRequired}} was set to {{false}} for globalKTable joins all the way 
back to when the GlobalKTable was introduced in 0.10.2 with KAFKA-4490.

See 
[https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L596]

Cheers
Andy

> A KStream/GlobalKTable join shouldn't reset the repartition flag
> 
>
> Key: KAFKA-7671
> URL: https://issues.apache.org/jira/browse/KAFKA-7671
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Andy Bryant
>Priority: Major
>
> Currently a KStream/GlobalKTable join resets the repartition required flag to 
> false.
> I have a topology where I map a stream, join with a GlobalKTable, then 
> groupByKey then aggregate.
> The aggregate wasn't behaving correctly because it didn't force a repartition 
> as I expected. The KStream/GlobalKTable join had reset the flag and hence I 
> was getting the same keys in different partitions.
> Since a KStream/GlobalKTable join does not itself force a repartition, it 
> should simply propagate the flag down to the resultant KStream the same way 
> most other operators work.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2017-12-17 Thread Andy Bryant (JIRA)
Andy Bryant created KAFKA-6378:
--

 Summary: NullPointerException on KStream-GlobalKTable leftJoin 
when KeyValueMapper returns null
 Key: KAFKA-6378
 URL: https://issues.apache.org/jira/browse/KAFKA-6378
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Andy Bryant


On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
stream fails with a NullPointerException (see stacktrace below). On Kafka 
0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
the table value set to null.

The use-case for this is joining a stream to a table containing reference data 
where the stream foreign key may be null. There is no straight-forward 
workaround in this case with Kafka 1.0.0 without having to resort to either 
generating a key that will never match or branching the stream for records that 
don't have the foreign key.


Exception in thread "workshop-simple-example-client-StreamThread-1" 
java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
at 
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
at 
org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
at 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2017-12-23 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16302605#comment-16302605
 ] 

Andy Bryant commented on KAFKA-6378:


I'm using {{null}} here to indicate there is no possible match for this record. 
Another option would be to make the more explicit, say by return an 
{{Optional}} in the {{KeyValueMapper}}. However if {{null}} is never a valid 
value for the key of a table or globalKTable, you could explicitly have this as 
a supported return value that indicates no match and not crash the stream.

Not having the ability to simply indicate there is no match is a real issue for 
us, as this pattern has come up at several clients where they wish to replicate 
SQL view like behaviour in streams joining on a nullable foreign key.

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2017-12-23 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16302605#comment-16302605
 ] 

Andy Bryant edited comment on KAFKA-6378 at 12/23/17 10:52 PM:
---

I'm using {{null}} here to indicate there is no possible match for the stream 
record in the globalKTable. Another option would be to make the more explicit, 
say by returning an {{Optional}} in the {{KeyValueMapper}}. However if {{null}} 
is never a valid value for the key of a table or globalKTable, you could 
explicitly have this as a supported return value that indicates no match and 
not crash the stream.

Mapping the stream records to replace {{null}} references with a dummy 
'sentinel' value smells too, especially when you don't control the values in 
the key and you can't be sure you have picked an 'invalid' key.

Not having the ability to simply indicate there is no match is a real issue for 
us, as this pattern has come up at several clients where they wish to replicate 
SQL view like behaviour in streams joining on a nullable foreign key.


was (Author: kiwiandy):
I'm using {{null}} here to indicate there is no possible match for this record. 
Another option would be to make the more explicit, say by return an 
{{Optional}} in the {{KeyValueMapper}}. However if {{null}} is never a valid 
value for the key of a table or globalKTable, you could explicitly have this as 
a supported return value that indicates no match and not crash the stream.

Not having the ability to simply indicate there is no match is a real issue for 
us, as this pattern has come up at several clients where they wish to replicate 
SQL view like behaviour in streams joining on a nullable foreign key.

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.st

[jira] [Comment Edited] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2017-12-23 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16302605#comment-16302605
 ] 

Andy Bryant edited comment on KAFKA-6378 at 12/23/17 10:52 PM:
---

I'm using {{null}} here to indicate there is no possible match for the stream 
record in the globalKTable. Another option would be to make this more explicit, 
say by returning an {{Optional}} in the {{KeyValueMapper}}. However if {{null}} 
is never a valid value for the key of a table or globalKTable, you could 
explicitly have this as a supported return value that indicates no match and 
not crash the stream.

Mapping the stream records to replace {{null}} references with a dummy 
'sentinel' value smells too, especially when you don't control the values in 
the key and you can't be sure you have picked an 'invalid' key.

Not having the ability to simply indicate there is no match is a real issue for 
us, as this pattern has come up at several clients where they wish to replicate 
SQL view like behaviour in streams joining on a nullable foreign key.


was (Author: kiwiandy):
I'm using {{null}} here to indicate there is no possible match for the stream 
record in the globalKTable. Another option would be to make the more explicit, 
say by returning an {{Optional}} in the {{KeyValueMapper}}. However if {{null}} 
is never a valid value for the key of a table or globalKTable, you could 
explicitly have this as a supported return value that indicates no match and 
not crash the stream.

Mapping the stream records to replace {{null}} references with a dummy 
'sentinel' value smells too, especially when you don't control the values in 
the key and you can't be sure you have picked an 'invalid' key.

Not having the ability to simply indicate there is no match is a real issue for 
us, as this pattern has come up at several clients where they wish to replicate 
SQL view like behaviour in streams joining on a nullable foreign key.

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.jav

[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2017-12-25 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16303186#comment-16303186
 ] 

Andy Bryant commented on KAFKA-6378:


I can definitely see how having {{null}} keys is problematic, but in this case 
neither the records on the stream or on the table have {{null}} keys - it is 
just being used in the {{KeyValueMapper}} function to indicate there can be no 
match. Stream->GlobalKTable joins are different from the other joins. They do 
not need to be co-partitioned and you have the addition of the 
{{KeyValueMapper}} function to derive the table key to match on. The joined 
output value will retain the non-null key from the stream regardless of the 
{{KeyValueMapper}} function value or if there was a match or not.

With regard to the ambiguity of {{null}}'s passed to the {{ValueJoiner}}, once 
you're able to move to Java 8+ in your APIs you could use {{Optional}} to 
differentiate between a matching {{null}} value and no match. For 
KStream-GlobalKTable there shouldn't be any ambiguity since {{null}} stream 
values are dropped and {{null}} table values indicate deleted entries, so a 
{{null}} {{value2}} argument can only indicate the absence of a matching table 
entry.

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-01-10 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321374#comment-16321374
 ] 

Andy Bryant commented on KAFKA-6378:


A sentinel value is ok where only a subset of the available values for a type 
are valid, but it does seem messy to have to convert {{null}} values to the 
sentinel before the join then back to {{null}} again in the merge function 
afterwards.

Also it doesn't cater for the case where you can't pick a sentinel because all 
values of a type are valid.

Since as Matthias pointed out {{null}} can never be a valid key explicitly 
calling it out as indicating no match in the docs and updated the code so it 
doesn't crash (a two line change by the looks) seems like a nice clean approach 
to me.


> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-01-11 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16322091#comment-16322091
 ] 

Andy Bryant commented on KAFKA-6378:


Sure thing Matthias. Will try and get that out tomorrow

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6378) NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper returns null

2018-02-01 Thread Andy Bryant (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349439#comment-16349439
 ] 

Andy Bryant commented on KAFKA-6378:


Thanks [~guozhang] - much appreciated!

> NullPointerException on KStream-GlobalKTable leftJoin when KeyValueMapper 
> returns null
> --
>
> Key: KAFKA-6378
> URL: https://issues.apache.org/jira/browse/KAFKA-6378
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Andy Bryant
>Assignee: Andy Bryant
>Priority: Major
> Fix For: 1.1.0, 1.0.1, 1.2.0
>
>
> On a Stream->GlobalKTable leftJoin if the KeyValueMapper returns null, the 
> stream fails with a NullPointerException (see stacktrace below). On Kafka 
> 0.11.0.0 the stream processes this successfully, calling the ValueJoiner with 
> the table value set to null.
> The use-case for this is joining a stream to a table containing reference 
> data where the stream foreign key may be null. There is no straight-forward 
> workaround in this case with Kafka 1.0.0 without having to resort to either 
> generating a key that will never match or branching the stream for records 
> that don't have the foreign key.
> Exception in thread "workshop-simple-example-client-StreamThread-1" 
> java.lang.NullPointerException
>   at java.base/java.util.Objects.requireNonNull(Objects.java:221)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:136)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:184)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:116)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.get(KTableSourceValueGetterSupplier.java:49)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:56)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2018-07-26 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16559225#comment-16559225
 ] 

Andy Bryant commented on KAFKA-2260:


👍🏽 Would prove very handy in event source based designs

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Ben Kirwin
>Priority: Minor
> Attachments: KAFKA-2260.patch, expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7051) Improve the efficiency of the ReplicaManager when there are many partitions

2018-08-16 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583248#comment-16583248
 ] 

Andy Bryant commented on KAFKA-7051:


We've seen this codeblock taking considerable CPU and time when profiling the 
broker pushing data into a replicated topic where the cluster had a few 
thousand partitions. Very keen to see this PR make 2.1.0

> Improve the efficiency of the ReplicaManager when there are many partitions
> ---
>
> Key: KAFKA-7051
> URL: https://issues.apache.org/jira/browse/KAFKA-7051
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.0
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7051) Improve the efficiency of the ReplicaManager when there are many partitions

2018-08-16 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583248#comment-16583248
 ] 

Andy Bryant edited comment on KAFKA-7051 at 8/17/18 1:23 AM:
-

We've seen the {{fetchInfos.collectFirst}} code block taking considerable CPU 
and time when profiling the broker pushing data into a replicated topic where 
the cluster had a few thousand partitions. Very keen to see this PR make 2.1.0


was (Author: kiwiandy):
We've seen this codeblock taking considerable CPU and time when profiling the 
broker pushing data into a replicated topic where the cluster had a few 
thousand partitions. Very keen to see this PR make 2.1.0

> Improve the efficiency of the ReplicaManager when there are many partitions
> ---
>
> Key: KAFKA-7051
> URL: https://issues.apache.org/jira/browse/KAFKA-7051
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.0
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7373) GetOffsetShell doesn't work when SSL authentication is enabled

2018-09-02 Thread Andy Bryant (JIRA)
Andy Bryant created KAFKA-7373:
--

 Summary: GetOffsetShell doesn't work when SSL authentication is 
enabled
 Key: KAFKA-7373
 URL: https://issues.apache.org/jira/browse/KAFKA-7373
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Andy Bryant


GetOffsetShell doesn't provide a mechanism to provide additional configuration 
for the underlying KafkaConsumer as does the `ConsoleConsumer`. Passing SSL 
config as system properties doesn't propagate to the consumer either.
{code:java}
10:47 $ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
${BROKER_LIST} --topic cld-dev-sor-crods-crodsdba_contact

Exception in thread "main" org.apache.kafka.common.errors.TimeoutException: 
Timeout expired while fetching topic metadata{code}
Editing {{GetOffsetShell.scala}} to include the SSL properties in the 
KafkaConsumer configuration resolved the issue.

Providing {{consumer-property}} and {{consumer-config}} configuration options 
for {{kafka-run-class-sh}} or creating a separate run script for offsets and 
using these properties in {{GetOffsetShell.scala}} seems like a good solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5235) GetOffsetShell: retrieve offsets for all given topics and partitions with single request to the broker

2018-09-06 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16606488#comment-16606488
 ] 

Andy Bryant commented on KAFKA-5235:


Looking forward to this KIP being implemented. We currently can't use 
GetOffsetShell as it stands with our cluster as is uses SSL authentication (see 
KAFKA-7373). Having consistent parameter values will be a welcome change too.

> GetOffsetShell: retrieve offsets for all given topics and partitions with 
> single request to the broker
> --
>
> Key: KAFKA-5235
> URL: https://issues.apache.org/jira/browse/KAFKA-5235
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Priority: Major
>  Labels: kip, tool
> Fix For: 2.1.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> GetOffsetShell is implemented on old SimpleConsumer. It needs Zookeeper to 
> retrieve metadata about topics and partitions. At present, GetOffsetShell 
> does the following:
> - get metadata from Zookeeper
> - iterate over partitions
> - for each partition, connect to its leader broker and request offsets
> Instead, GetOffsetShell can use new KafkaConsumer and retrieve offsets by 
> means of endOffsets(), beginningOffsets() and offsetsForTimes() methods. One 
> request is sufficient for all topics and partitions.
> As far as GetOffsetShell is re-implemented with new KafkaConsumer API, it 
> will not depend on obsolete API: SimpleConsumer, old producer API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7373) GetOffsetShell doesn't work when SSL authentication is enabled

2018-09-06 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16606492#comment-16606492
 ] 

Andy Bryant commented on KAFKA-7373:


Thanks Stanislav - good spot on the KIP. Hopefully that one makes it into 2.1.0.

Regarding the OutOfMemory, try the following prior to the call to increase the 
default heap allocation for the call.
{code:java}
KAFKA_HEAP_OPTS=-Xms512m -Xmx1g{code}

> GetOffsetShell doesn't work when SSL authentication is enabled
> --
>
> Key: KAFKA-7373
> URL: https://issues.apache.org/jira/browse/KAFKA-7373
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Andy Bryant
>Priority: Major
>
> GetOffsetShell doesn't provide a mechanism to provide additional 
> configuration for the underlying KafkaConsumer as does the `ConsoleConsumer`. 
> Passing SSL config as system properties doesn't propagate to the consumer 
> either.
> {code:java}
> 10:47 $ ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> ${BROKER_LIST} --topic cld-dev-sor-crods-crodsdba_contact
> Exception in thread "main" org.apache.kafka.common.errors.TimeoutException: 
> Timeout expired while fetching topic metadata{code}
> Editing {{GetOffsetShell.scala}} to include the SSL properties in the 
> KafkaConsumer configuration resolved the issue.
> Providing {{consumer-property}} and {{consumer-config}} configuration options 
> for {{kafka-run-class-sh}} or creating a separate run script for offsets and 
> using these properties in {{GetOffsetShell.scala}} seems like a good solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)