[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807520#comment-17807520
 ] 

Almog Gavra commented on KAFKA-16141:
-

OK, after doing some more digging I don't think it's related to my change. 
Instead I believe it was caused by [https://github.com/apache/kafka/pull/14570] 
- that PR changed
{code:java}
KeyValueToTimestampedKeyValueByteStoreAdapter implements ...{code}
to
{code:java}
KeyValueToTimestampedKeyValueByteStoreAdapter implements ..., 
TimestampedBytesStore {code}
This caused WrappedStateStore#isTimestamped to return true when previously it 
returned false. This, in turn, caused us to initialize the store with the 
RecordConverts.RAW_TO_TIMESTAMED_INSTANCE as the converter (see 
StateManagerUtil#converterForStore). After a restore, the converter will 
prepend the record timestamp when it shouldn't, because an additional timestamp 
is then prepended when it goes through the adapter.

Related: https://issues.apache.org/jira/browse/KAFKA-15629

cc [~mjsax] [~hanyuzheng] 

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16141) StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails consistently in 3.7

2024-01-16 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17807434#comment-17807434
 ] 

Almog Gavra commented on KAFKA-16141:
-

I confirmed it’s most likely related to my change - changing it from:
final KeyValueBytesStoreSupplier persistentStoreSupplier = 
Stores.persistentKeyValueStore(persistentMemoryStoreName);
to
final KeyValueBytesStoreSupplier persistentStoreSupplier = 
Stores.persistentTimestampedKeyValueStore(persistentMemoryStoreName);
makes the test pass, so it’s almost certainly the same bug as in 
https://issues.apache.org/jira/browse/KAFKA-16046 

> StreamsStandbyTask##test_standby_tasks_rebalanceArguments:{ 
> “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false} fails 
> consistently in 3.7
> 
>
> Key: KAFKA-16141
> URL: https://issues.apache.org/jira/browse/KAFKA-16141
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Almog Gavra
>Priority: Blocker
>
> {code:java}
> kafkatest.tests.streams.streams_standby_replica_test.StreamsStandbyTask#test_standby_tasks_rebalanceArguments:{
>  “metadata_quorum”: “ISOLATED_KRAFT”, “use_new_coordinator”: false}
> TimeoutError("Did expect to read 'ACTIVE_TASKS:2 STANDBY_TASKS:[1-3]' from 
> ubuntu@worker26")
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_standby_replica_test.py",
>  line 79, in test_standby_tasks_rebalance
> self.wait_for_verification(processor_1, "ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]", processor_1.STDOUT_FILE)
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/base_streams_test.py",
>  line 96, in wait_for_verification
> err_msg="Did expect to read '%s' from %s" % (message, 
> processor.node.account))
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Did expect to read 'ACTIVE_TASKS:2 
> STANDBY_TASKS:[1-3]' from ubuntu@worker26
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2024-01-02 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801904#comment-17801904
 ] 

Almog Gavra commented on KAFKA-16046:
-

https://github.com/apache/kafka/pull/15073

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
> Fix For: 3.7.0
>
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2024-01-02 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra reopened KAFKA-16046:
-

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
> Fix For: 3.7.0
>
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2023-12-22 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra resolved KAFKA-16046.
-
Resolution: Fixed

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2023-12-21 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-16046:

Labels: streams  (was: )

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2023-12-21 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-16046:

Component/s: streams

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>  Labels: streams
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2023-12-21 Thread Almog Gavra (Jira)
Almog Gavra created KAFKA-16046:
---

 Summary: Stream Stream Joins fail after restoration with 
deserialization exceptions
 Key: KAFKA-16046
 URL: https://issues.apache.org/jira/browse/KAFKA-16046
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Almog Gavra
Assignee: Almog Gavra


Before KIP-954, the `KStreamImplJoin` class would always create non-timestamped 
persistent windowed stores. After that KIP, the default was changed to create 
timestamped stores. This wasn't compatible because, during restoration, 
timestamped stores have their changelog values transformed to prepend the 
timestamp to the value. This caused serialization errors when trying to read 
from the store because the deserializers did not expect the timestamp to be 
prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16046) Stream Stream Joins fail after restoration with deserialization exceptions

2023-12-21 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799623#comment-17799623
 ] 

Almog Gavra commented on KAFKA-16046:
-

https://github.com/apache/kafka/pull/15061

> Stream Stream Joins fail after restoration with deserialization exceptions
> --
>
> Key: KAFKA-16046
> URL: https://issues.apache.org/jira/browse/KAFKA-16046
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Blocker
>
> Before KIP-954, the `KStreamImplJoin` class would always create 
> non-timestamped persistent windowed stores. After that KIP, the default was 
> changed to create timestamped stores. This wasn't compatible because, during 
> restoration, timestamped stores have their changelog values transformed to 
> prepend the timestamp to the value. This caused serialization errors when 
> trying to read from the store because the deserializers did not expect the 
> timestamp to be prepended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16038) Periodic Logging of Current Assignment

2023-12-20 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-16038:

Summary: Periodic Logging of Current Assignment  (was: Periodic Logging fo 
Current Assignment)

> Periodic Logging of Current Assignment
> --
>
> Key: KAFKA-16038
> URL: https://issues.apache.org/jira/browse/KAFKA-16038
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Major
>
> Currently, assignment is only logged when a re-balance happens:
> {code:java}
> 15:14:56.263 
> [metrics-backend-8-a90ee750-e4bb-46d5-bb7d-32411a75dd26-StreamThread-1] INFO  
> org.apache.kafka.streams.processor.internals.TaskManager {} - stream-thread 
> [metrics-backend-8-a90ee750-e4bb-46d5-bb7d-32411a75dd26-StreamThread-1] 
> Handle new assignment with:
>     New active tasks: [0_7, 0_6, 0_5, 0_4, 0_3, 0_2, 0_1, 0_0]
>     New standby tasks: []
>     Existing active tasks: []
>     Existing standby tasks: [] {code}
> It would be helpful to periodically log the current assignment at a 
> configurable interval in an manner that's machine-friendly. This can help in 
> situations when a rebalance hasn't happened in a long while.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16038) Periodic Logging fo Current Assignment

2023-12-20 Thread Almog Gavra (Jira)
Almog Gavra created KAFKA-16038:
---

 Summary: Periodic Logging fo Current Assignment
 Key: KAFKA-16038
 URL: https://issues.apache.org/jira/browse/KAFKA-16038
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Almog Gavra
Assignee: Almog Gavra


Currently, assignment is only logged when a re-balance happens:
{code:java}
15:14:56.263 
[metrics-backend-8-a90ee750-e4bb-46d5-bb7d-32411a75dd26-StreamThread-1] INFO  
org.apache.kafka.streams.processor.internals.TaskManager {} - stream-thread 
[metrics-backend-8-a90ee750-e4bb-46d5-bb7d-32411a75dd26-StreamThread-1] Handle 
new assignment with:
    New active tasks: [0_7, 0_6, 0_5, 0_4, 0_3, 0_2, 0_1, 0_0]
    New standby tasks: []
    Existing active tasks: []
    Existing standby tasks: [] {code}
It would be helpful to periodically log the current assignment at a 
configurable interval in an manner that's machine-friendly. This can help in 
situations when a rebalance hasn't happened in a long while.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores

2023-11-21 Thread Almog Gavra (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-15215 ]


Almog Gavra deleted comment on KAFKA-15215:
-

was (Author: agavra):
Note that we decided not to have default.dsl.store work if you only pass it in 
to the main KafkaStreams constructor for backwards compatibility. Instead you 
should use the new dsl.store.suppliers configuration

> The default.dsl.store config is not compatible with custom state stores
> ---
>
> Key: KAFKA-15215
> URL: https://issues.apache.org/jira/browse/KAFKA-15215
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Almog Gavra
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.7.0
>
>
> Sort of a bug, sort of a new/missing feature. When we added the long-awaited 
> default.dsl.store config, it was decided to scope the initial KIP to just the 
> two out-of-the-box state stores types offered by Streams, rocksdb and 
> in-memory. The reason being that this would address a large number of the 
> relevant use cases, and could always be followed up with another KIP for 
> custom state stores if/when the demand arose.
> Of course, since rocksdb is the default anyways, the only beneficiaries of 
> this KIP right now are the people who specifically want only in-memory stores 
> – yet custom state stores users are probably by far the ones with the 
> greatest need for an easier way to configure the store type across an entire 
> application. And unfortunately, because the config currently relies on enum 
> definitions for the known OOTB store types, there's not really any way to 
> extend this feature as it is to work with custom implementations.
> I think this is a great feature, which is why I hope to see it extended to 
> the broader user base. Most likely we'll want to introduce a new config for 
> this, though whether it replaces the old default.dsl.store config or 
> complements it will have to be decided during the KIP discussion



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15774) Respect default.dsl.store Configuration Without Passing it to StreamsBuilder

2023-11-21 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra resolved KAFKA-15774.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

Note that we decided not to have default.dsl.store work if you only pass it in 
to the main KafkaStreams constructor for backwards compatibility. Instead you 
should use the new dsl.store.suppliers configuration

> Respect default.dsl.store Configuration Without Passing it to StreamsBuilder
> 
>
> Key: KAFKA-15774
> URL: https://issues.apache.org/jira/browse/KAFKA-15774
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Major
> Fix For: 3.7.0
>
>
> Currently if you only configure `default.dsl.store` as `in_memory` in your 
> `StreamsConfig` it will silently be ignored unless it's also passed into 
> `StreamsBuilder#new(TopologyConfig)`. We should improve this behavior to 
> properly respect it.
> This will become more important with the introduction of KIP-954.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores

2023-11-21 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17788559#comment-17788559
 ] 

Almog Gavra commented on KAFKA-15215:
-

Note that we decided not to have default.dsl.store work if you only pass it in 
to the main KafkaStreams constructor for backwards compatibility. Instead you 
should use the new dsl.store.suppliers configuration

> The default.dsl.store config is not compatible with custom state stores
> ---
>
> Key: KAFKA-15215
> URL: https://issues.apache.org/jira/browse/KAFKA-15215
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Almog Gavra
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.7.0
>
>
> Sort of a bug, sort of a new/missing feature. When we added the long-awaited 
> default.dsl.store config, it was decided to scope the initial KIP to just the 
> two out-of-the-box state stores types offered by Streams, rocksdb and 
> in-memory. The reason being that this would address a large number of the 
> relevant use cases, and could always be followed up with another KIP for 
> custom state stores if/when the demand arose.
> Of course, since rocksdb is the default anyways, the only beneficiaries of 
> this KIP right now are the people who specifically want only in-memory stores 
> – yet custom state stores users are probably by far the ones with the 
> greatest need for an easier way to configure the store type across an entire 
> application. And unfortunately, because the config currently relies on enum 
> definitions for the known OOTB store types, there's not really any way to 
> extend this feature as it is to work with custom implementations.
> I think this is a great feature, which is why I hope to see it extended to 
> the broader user base. Most likely we'll want to introduce a new config for 
> this, though whether it replaces the old default.dsl.store config or 
> complements it will have to be decided during the KIP discussion



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15215) The default.dsl.store config is not compatible with custom state stores

2023-11-21 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra resolved KAFKA-15215.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> The default.dsl.store config is not compatible with custom state stores
> ---
>
> Key: KAFKA-15215
> URL: https://issues.apache.org/jira/browse/KAFKA-15215
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Almog Gavra
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.7.0
>
>
> Sort of a bug, sort of a new/missing feature. When we added the long-awaited 
> default.dsl.store config, it was decided to scope the initial KIP to just the 
> two out-of-the-box state stores types offered by Streams, rocksdb and 
> in-memory. The reason being that this would address a large number of the 
> relevant use cases, and could always be followed up with another KIP for 
> custom state stores if/when the demand arose.
> Of course, since rocksdb is the default anyways, the only beneficiaries of 
> this KIP right now are the people who specifically want only in-memory stores 
> – yet custom state stores users are probably by far the ones with the 
> greatest need for an easier way to configure the store type across an entire 
> application. And unfortunately, because the config currently relies on enum 
> definitions for the known OOTB store types, there's not really any way to 
> extend this feature as it is to work with custom implementations.
> I think this is a great feature, which is why I hope to see it extended to 
> the broader user base. Most likely we'll want to introduce a new config for 
> this, though whether it replaces the old default.dsl.store config or 
> complements it will have to be decided during the KIP discussion



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15774) Respect default.dsl.store Configuration Without Passing it to StreamsBuilder

2023-11-02 Thread Almog Gavra (Jira)
Almog Gavra created KAFKA-15774:
---

 Summary: Respect default.dsl.store Configuration Without Passing 
it to StreamsBuilder
 Key: KAFKA-15774
 URL: https://issues.apache.org/jira/browse/KAFKA-15774
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Almog Gavra
Assignee: Almog Gavra


Currently if you only configure `default.dsl.store` as `in_memory` in your 
`StreamsConfig` it will silently be ignored unless it's also passed into 
`StreamsBuilder#new(TopologyConfig)`. We should improve this behavior to 
properly respect it.

This will become more important with the introduction of KIP-954.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2021-05-27 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352499#comment-17352499
 ] 

Almog Gavra commented on KAFKA-12774:
-

I went back to triage the severity of this because if things aren't getting 
logged via Log4j then our applications would be missing log-shipping, which is 
crticial in our production environments.

To confirm nothing was being logged outside of Log4J I hacked together 
something to try to reproduce this and I also added the following line any time 
a record was to be sent in RecordCollectorImpl to make sure I had exactly the 
same type of error that you encountered:
{code:java}
recordSendError(topic, new InvalidPidMappingException("foo"), serializedRecord);
{code}
I could not reproduce. The only things that were logged (notice that all Log4j 
loggers are turned OFF so only things that get logged to stdout get logged) 
were what I specifially logged to stdout. This was the stdout (I printed to 
stdout in the uncaught handler instead of logging):
{code:java}
SEEN: 0,0
(HERE) Uncaught exception handled - replacing thread Error encountered sending 
record to topic bar for task 0_0 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: foo
Exception handler choose to FAIL the processing, no more records would be sent.
{code}
We can leave this ticket open in case anyone else has any Ideas.

Here is the app:
{code:java}
public static void main(String[] args) throws InterruptedException, IOException 
{
  LogManager.getRootLogger().setLevel(Level.OFF);
  @SuppressWarnings("unchecked") Enumeration loggers = 
LogManager.getCurrentLoggers();
  while (loggers.hasMoreElements()) {
loggers.nextElement().setLevel(Level.OFF);
  }

  final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(1);
  cluster.start();
  cluster.createTopic("foo");

  final Properties config = new Properties();
  config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
  config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 123);
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
  config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
  config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
  config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
  config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
  config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
  config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

  final CountDownLatch sawKey = new CountDownLatch(1);
  final StreamsBuilder builder = new StreamsBuilder();
  builder.stream("foo")
  .filter((k, v) -> k != null)
  .peek((k, v) -> System.out.println("SEEN: " + k + "," + v))
  .peek((k ,v) -> {
if ((int) k == 0) sawKey.countDown();
  })
  .to("bar");

  final Topology build = builder.build(config);
  final KafkaStreams app = new KafkaStreams(build, config);

  app.setUncaughtExceptionHandler(exception -> {
System.out.println("(HERE) Uncaught exception handled - replacing 
thread " + exception.getMessage());
return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
  }
  );

  final CountDownLatch startLatch = new CountDownLatch(1);
  app.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING) {
  startLatch.countDown();
}
  });
  app.start();
  startLatch.await();

  final Properties producerProps = new Properties();
  producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
  producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
  producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);

  IntegrationTestUtils.produceKeyValuesSynchronously(
  "foo",
  IntStream.range(0, 1)
  .mapToObj(i -> KeyValue.pair(0, i))
  .collect(Collectors.toList()),
  producerProps,
  Time.SYSTEM);


  sawKey.await();
  app.close();
  app.cleanUp();
  cluster.after();
}{code}
 

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> 
>
> Key: KAFKA-12774
> URL: https://issues.apache.org/jira/browse/KAFKA-12774
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Jørgen
>Priority: Minor
> Fix For: 3.0.0, 2.8.1
>
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8,

[jira] [Updated] (KAFKA-12192) Add Configuration to Selectively Disable Topology Optimizations

2021-03-22 Thread Almog Gavra (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-12192:

Fix Version/s: (was: 2.8.0)

> Add Configuration to Selectively Disable Topology Optimizations
> ---
>
> Key: KAFKA-12192
> URL: https://issues.apache.org/jira/browse/KAFKA-12192
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Almog Gavra
>Assignee: Almog Gavra
>Priority: Major
>
> There are some reasons that may cause users to desire selectively disabling 
> certain topology optimizations, specifically the source-changelog 
> optimization. We should introduce a configuration that allows users to 
> disable such optimizations.
>  
> KIP incoming with more details :)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12192) Add Configuration to Selectively Disable Topology Optimizations

2021-01-13 Thread Almog Gavra (Jira)
Almog Gavra created KAFKA-12192:
---

 Summary: Add Configuration to Selectively Disable Topology 
Optimizations
 Key: KAFKA-12192
 URL: https://issues.apache.org/jira/browse/KAFKA-12192
 Project: Kafka
  Issue Type: Improvement
Reporter: Almog Gavra
Assignee: Almog Gavra
 Fix For: 2.8.0


There are some reasons that may cause users to desire selectively disabling 
certain topology optimizations, specifically the source-changelog optimization. 
We should introduce a configuration that allows users to disable such 
optimizations.

 

KIP incoming with more details :)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10569) Running aggregate queries on KSQL client side is getting to ERROR Shutdown broker because all log dirs in ...

2020-10-02 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17206551#comment-17206551
 ] 

Almog Gavra commented on KAFKA-10569:
-

Hello [~petregor...@gmail.com] - I took a look through the ticket and I think 
this probably belongs as an issue on 
[confluentinc/ksql|https://github.com/confluentinc/ksql] though I can chime in 
and say that if you expect a query to keep running, you need to use `CREATE 
STREAM  AS SELECT`. I recommend you take a look at 
[https://docs.ksqldb.io/en/latest/developer-guide/create-a-stream/#create-a-stream-backed-by-a-new-kafka-topic]
 before and see if that answers your question.

[~mjsax] I think we can close this ticket as "invalid"

> Running aggregate queries on KSQL client side is getting to ERROR Shutdown 
> broker because all log dirs in ...
> -
>
> Key: KAFKA-10569
> URL: https://issues.apache.org/jira/browse/KAFKA-10569
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
> Environment: local
>Reporter: Petre Gordan
>Priority: Critical
> Attachments: KSQLDBServerSideErrors.txt, KafkaClusterLogs.txt, 
> ProductsOrders.txt, ZiikeeperSideLog.txt, kafka-server-start.bat, ksql, 
> ksql-server-start, ksql-server.properties, schema-registry.properties, 
> server.properties, zookeeper-server-start.bat, zookeeper.properties
>
>
> Working on Windows 10 and confluent-5.5.0-2.12.zip and kafka_2.12-2.5.0. I'm 
> running locally:
>  * in powershell, zookeeper with: *bin\windows\zookeeper-server-start.bat 
> config\zookeeper.properties*
>  * in powershell, kafka-server
> *bin\windows\kafka-server-start.bat config\server.properties*
>  * in bash (with ubuntu) ksqldb server
> sudo bin/ksql-server-start etc/ksqldb/ksql-server.properties
>  * in bash (with ubuntu) ksql client
> sudo bin/ksql [http://0.0.0.0:8088|http://0.0.0.0:8088/]
> After all of these are sorted, than, I'm starting to practice the Kafka. So, 
> I'm creating tables, streams, making inserts, and all good. I can do small 
> queries like: 
> select * from products emit changes;, etc.
> All good until at this step.
> When, I'm trying to run every type of aggregate query, than is showing the 
> result after a while, but into the end, after I will press Ctrl+C to 
> terminate that and to do other query, everything is down.
> For example, see the attached .sql script, and after I will run that script 
> the products table and orders stream are created with success and populated 
> with success.
> After that if I run this query:
> select ProductRowKey, count(ProductRowKey) from orders group by ProductRowKey 
> emit changes;
> I can see the results, all good, but into the end if I will press Ctrl + C, 
> than everything is down.
>  
> Looking into the logs and taking based on the time history the main raised 
> warning and issues are:
>  * first is raised this: 
> Query terminated due to exception:org.eclipse.jetty.io.EofException 
> (io.confluent.ksql.rest.server.resources.streaming.QueryStreamWriter:95)
>  * than this: INFO stream-client [_confluent-ksql-default_transient State 
> transition from RUNNING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.KafkaStreams:285)
>  * than these:
> INFO stream-thread [_confluent-ksql-default_transient Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1116)
>  State transition from RUNNING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:221)
>  Shutdown complete 
> (org.apache.kafka.streams.processor.internals.StreamThread:1150)
>  * than this:
> INFO stream-thread [qtp2032891036-47] Deleting obsolete state directory 0_0 
> for task 0_0 as 1ms has elapsed
>  * than this:
> WARN Could not clean up the schema registry for query: 
> _confluent-ksql-default_transient_
>  * than this:
> WARN [Producer clientId=producer-1] Connection to node 0 
> (localhost/127.0.0.1:9092) could not be established. Broker may not be 
> available. (org.apache.kafka.clients.NetworkClient:763)
> all of these from above on the KSQLDB server side logs,
>  * finally this on the Kafka cluster side:
> ERROR Shutdown broker because all log dirs in  have failed 
> (kafka.log.LogManager)
>  
> And after that everything is down. Please see all the attached files to get 
> all the info.
>  
> Please help me with these.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8037) KTable restore may load bad data

2020-07-22 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163135#comment-17163135
 ] 

Almog Gavra commented on KAFKA-8037:


> That said...in my (admiteddly anecdotal) experience, the creation of extra 
> topics and extra load on the brokers, etc is a major pain point for users of 
> Streams. I'm pretty sure I've seen it quoted in a "why we decided against 
> Kafka Streams" type article. Compare this with the problem of asymmetric 
> serdes, for which we have received exactly zero complaints as far as I am 
> aware.

And now for a 180, I think this is petty convincing; I just hope we can figure 
a way that doesn't cause the problems we've run into. I see one way to do that: 
when this optimization is enabled, we should have a byte pass-through into the 
state store during normal operation. That guarantees that if there are any 
bugs, at least they happen both in normal operation and in recovery. It also 
guarantees that we don't run into side-effects with serializers (still possible 
for deserializers, but I'm not aware of any deserializers that have side 
effects).

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8037) KTable restore may load bad data

2020-07-22 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163135#comment-17163135
 ] 

Almog Gavra edited comment on KAFKA-8037 at 7/22/20, 11:27 PM:
---

> That said...in my (admiteddly anecdotal) experience, the creation of extra 
> topics and extra load on the brokers, etc is a major pain point for users of 
> Streams. I'm pretty sure I've seen it quoted in a "why we decided against 
> Kafka Streams" type article. Compare this with the problem of asymmetric 
> serdes, for which we have received exactly zero complaints as far as I am 
> aware.

And now for a 180, I think this is petty convincing; I just hope we can figure 
a way that doesn't cause the problems we've run into. I see one way to do that: 
when this optimization is enabled, we should have a byte pass-through into the 
state store during normal operation (I realize this might not be trivial to 
implement, after a discussion with Matthias). That guarantees that if there are 
any bugs, at least they happen both in normal operation and in recovery. It 
also guarantees that we don't run into side-effects with serializers (still 
possible for deserializers, but I'm not aware of any deserializers that have 
side effects).


was (Author: agavra):
> That said...in my (admiteddly anecdotal) experience, the creation of extra 
> topics and extra load on the brokers, etc is a major pain point for users of 
> Streams. I'm pretty sure I've seen it quoted in a "why we decided against 
> Kafka Streams" type article. Compare this with the problem of asymmetric 
> serdes, for which we have received exactly zero complaints as far as I am 
> aware.

And now for a 180, I think this is petty convincing; I just hope we can figure 
a way that doesn't cause the problems we've run into. I see one way to do that: 
when this optimization is enabled, we should have a byte pass-through into the 
state store during normal operation. That guarantees that if there are any 
bugs, at least they happen both in normal operation and in recovery. It also 
guarantees that we don't run into side-effects with serializers (still possible 
for deserializers, but I'm not aware of any deserializers that have side 
effects).

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8037) KTable restore may load bad data

2020-07-22 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163133#comment-17163133
 ] 

Almog Gavra commented on KAFKA-8037:


> Can you give a specific example of how things would break due to the 
> asymmetric JSON/AVRO serdes, and/or the schema registry side effects?

This depends on what you mean with “how things would break”. Is it matter of 
correctness that the state stores are identical before and after recovery? If I 
have a serde which only serializes half the fields (something ksqlDB supports 
today, and a feature that is useful for people that don't control the input 
data), then after I recover my local disk usage might be twice as much and 
lookups into the state store cost more to deserialize. Nothing "breaks" in the 
sense that you'll be able to deserialize all of that data and continue 
processing, but the efficiency of your system changes.

Things get a little worse from a side-effect perspective; specifically 
confluent schema registry serdes. If we're reusing the source topic, then 
naturally we should pass the source topic as the name to the serde (or else 
face KAFKA-10179). If I use a serializer that is not identical to what was 
writing to the topic to write to the state store, I will register schemas into 
the source topic subject - this might break other producers (something that 
ksqlDB users complain about: 
[https://github.com/confluentinc/ksql/issues/5553|https://github.com/confluentinc/ksql/issues/5553).]).

FWIW, the way we're working around this in ksql is to always serialize using 
the "phantom" changelog subject, and deserialize using both.

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8037) KTable restore may load bad data

2020-07-22 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163021#comment-17163021
 ] 

Almog Gavra commented on KAFKA-8037:


There are lots of threads going on in this discussion, but re: whether the 
optimization should be opt-in or opt-out:

[~ableegoldman] [~mjsax] - while I agree with you in theory that these (ones 
that have side effects and/or are asymmetric) serdes should be discouraged, I 
don't think that's a realistic possibility. Some of the most popular serdes 
have these properties:
 * All confluent schema registry serdes have side effects on serialization
 * AVRO reader/writer schemas are built to be asymmetric (and that's how they 
handle schema evolution)
 * JSON serdes are asymmetric if you allow "additional properties"

Moreso, many users might not even know if their serde is symmetric/has side 
effects and I think that makes it very difficult to require users to opt-out as 
opposed to allowing them to opt-in.

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8037) KTable restore may load bad data

2020-07-20 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161440#comment-17161440
 ] 

Almog Gavra commented on KAFKA-8037:


I'm not totally sure I'm understanding the solutions, but figured I should 
throw this out there anyway for the record.  [~ableegoldman] wrote a good 
summary of the goals:
 # reuse the input topic as the changelog to avoid replicating all that data
 # make restoration as fast as possible by copying plain bytes with_out_ 
deserializing
 # don't load bad data, or copy bytes during restoration that weren't copied 
during normal processing

I think 1 and 3 are mutually exclusive independently of 2. If I understand 
correctly, kafka doesn't care about "good" and "bad" data - so if there are two 
events for the same key and the second of the events is "bad", eventually the 
topic will be compacted and the good data won't even exist anymore. Worded 
differently, even if we passed the "bad" record into the deserializer and 
identified it as a bad record (which the user's handler decides to drop), we 
would have no way to update the state store to the "good" value.

Riffing off Sophie's idea of an inverse changelog topic, I think it would need 
to include the bad contents and the previously-known good contents. Then you 
could do a table scan of the materialized inverse changelog topic at the end 
and compare any keys that are present in the materialized source topic - if 
those keys have the same byte-for-byte data in them as the "bad" contents in 
the inverse-changelog topic, you would restore it to the "last known good" 
content. Otherwise (the data is not the "bad" data) you know a new event came 
in that was good and you can send a tombstone to the inverse-changelog topic.

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-26 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146707#comment-17146707
 ] 

Almog Gavra commented on KAFKA-10179:
-

[~ableegoldman] I confirmed locally that nothing "breaks" if we use a 
deserializer that projects a subset of the fields in the record, as you 
suspected, but consider the following points:
 # Some of the most popular serdes are asymmetric (e.g. avro builds in the 
concept of reader/writer schema into their APIs)
 # It may be impossible to determine, for a given serde, whether it is symmetric
 # State after recovery should be identical to before recovery for predictable 
operations (especially in cloud environments)
 # Some of the most popular serdes have side effects (e.g. Confluent schema 
registry serdes will create subjects on your behalf)

In practice, the first three points in conjunction with what [~mjsax] said (the 
source-topic-changelog optimization really only applies, if the data in the 
input topic is exactly the same as in the changelog topic and thus, we avoid 
creating the changelog topic), means that we can't safely turn on the 
source-topic-changelog optimization unless the user indicates either (a) they 
are using a symmetrical serde or (b) they are willing to waive 3 in order to 
speed up recovery ([~cadonna] if we consider 3 a matter of correctness, we 
can't sacrifice correctness for performance without the user's consent).

Even if the user indicates (a) or (b) above, I still don't think we can 
implement the fix described here because of the fourth point. It may be 
possible that the user is using a symmetric serde but their schema is not 
identical to the one that wrote to the kafka topic (e.g. ksql, for example, 
generates a new schema where all the fields are the same but the schema has a 
different name, I can also easily imagine a schema with _more_ fields that 
would write the same value as it read from an event with fewer fields).

I'm not sure I understand this comment: "The data would be deserialized and 
re-serialized using the same Serde (this is inefficiency we pay, as we also 
need to send the de-serialized data downstream for further processing)." Why 
can't we just always pass-through the data into the state store if the 
optimization is enabled?

 

> State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables
> -
>
> Key: KAFKA-10179
> URL: https://issues.apache.org/jira/browse/KAFKA-10179
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.7.0
>
>
> {{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
> store to the state store serdes. Currently, it always passes {{ ID>--changelog}} as the changelog topic name. However, for 
> optimized source tables the changelog topic is the source topic. 
> Most serdes do not use the topic name passed to them. However, if the serdes 
> actually use the topic name for (de)serialization, e.g., when Kafka Streams 
> is used with Confluent's Schema Registry, a 
> {{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8595) Support SerDe of Decimals in JSON that are not HEX encoded

2019-06-24 Thread Almog Gavra (JIRA)
Almog Gavra created KAFKA-8595:
--

 Summary: Support SerDe of Decimals in JSON that are not HEX encoded
 Key: KAFKA-8595
 URL: https://issues.apache.org/jira/browse/KAFKA-8595
 Project: Kafka
  Issue Type: Improvement
Reporter: Almog Gavra
Assignee: Almog Gavra


Most JSON data that utilizes precise decimal data represents it as a decimal 
string. Kafka Connect, on the other hand, only supports a binary HEX string 
encoding (see example below). We should support deserialization and 
serialization for any of the following types:
{code:java}
{
  "asHex": "D3J5",
  "asString": "10.12345"
  "asNumber": 10.2345
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8514) Kafka clients should not include Scala's Java 8 compatibility lib

2019-06-10 Thread Almog Gavra (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860241#comment-16860241
 ] 

Almog Gavra commented on KAFKA-8514:


[https://github.com/apache/kafka/pull/6910] - everything compiles and passes 
locally

> Kafka clients should not include Scala's Java 8 compatibility lib
> -
>
> Key: KAFKA-8514
> URL: https://issues.apache.org/jira/browse/KAFKA-8514
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Enno Runne
>Assignee: Almog Gavra
>Priority: Major
>
> The work with KAFKA-8305 brought in 
> "org.scala-lang.modules:scala-java8-compat_2.12" 
> as dependency of the client lib. This will give users from Scala an extra 
> headache as it would need to be excluded when working with another Scala 
> version.
> Instead, it should be moved to be a dependency of *"core"* for the 
> convenience of converting Java and Scala Option instances.
> See 
> [https://github.com/apache/kafka/commit/8e161580b859b2fcd54c59625e232b99f3bb48d0#diff-c197962302397baf3a4cc36463dce5eaR942]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8514) Kafka clients should not include Scala's Java 8 compatibility lib

2019-06-10 Thread Almog Gavra (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16860099#comment-16860099
 ] 

Almog Gavra commented on KAFKA-8514:


ack. thanks for pointing this out [~ennru]! I'll change this and have a PR out 
soon after making sure everything compiles.

> Kafka clients should not include Scala's Java 8 compatibility lib
> -
>
> Key: KAFKA-8514
> URL: https://issues.apache.org/jira/browse/KAFKA-8514
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Enno Runne
>Assignee: Almog Gavra
>Priority: Major
>
> The work with KAFKA-8305 brought in 
> "org.scala-lang.modules:scala-java8-compat_2.12" 
> as dependency of the client lib. This will give users from Scala an extra 
> headache as it would need to be excluded when working with another Scala 
> version.
> Instead, it should be moved to be a dependency of *"core"* for the 
> convenience of converting Java and Scala Option instances.
> See 
> [https://github.com/apache/kafka/commit/8e161580b859b2fcd54c59625e232b99f3bb48d0#diff-c197962302397baf3a4cc36463dce5eaR942]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8514) Kafka clients should not include Scala's Java 8 compatibility lib

2019-06-10 Thread Almog Gavra (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra reassigned KAFKA-8514:
--

Assignee: Almog Gavra

> Kafka clients should not include Scala's Java 8 compatibility lib
> -
>
> Key: KAFKA-8514
> URL: https://issues.apache.org/jira/browse/KAFKA-8514
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Enno Runne
>Assignee: Almog Gavra
>Priority: Major
>
> The work with KAFKA-8305 brought in 
> "org.scala-lang.modules:scala-java8-compat_2.12" 
> as dependency of the client lib. This will give users from Scala an extra 
> headache as it would need to be excluded when working with another Scala 
> version.
> Instead, it should be moved to be a dependency of *"core"* for the 
> convenience of converting Java and Scala Option instances.
> See 
> [https://github.com/apache/kafka/commit/8e161580b859b2fcd54c59625e232b99f3bb48d0#diff-c197962302397baf3a4cc36463dce5eaR942]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8305) AdminClient should support creating topics with default partitions and replication factor

2019-05-10 Thread Almog Gavra (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-8305:
---
Summary: AdminClient should support creating topics with default partitions 
and replication factor  (was: AdminClient should support creating topics with 
`default.replication.factor`)

> AdminClient should support creating topics with default partitions and 
> replication factor
> -
>
> Key: KAFKA-8305
> URL: https://issues.apache.org/jira/browse/KAFKA-8305
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Almog Gavra
>Priority: Major
>
> Today, the AdminClient creates topics by requiring a `NewTopic` object, which 
> must contain either partitions and replicas or an exact broker mapping (which 
> then infers partitions and replicas). Some users, however, could benefit from 
> just using the cluster default for replication factor but may not want to use 
> auto topic creation.
> NOTE: I am planning on working on this, but I do not have permissions to 
> assign this ticket to myself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8305) AdminClient should support creating topics with default partitions and replication factor

2019-05-10 Thread Almog Gavra (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16837631#comment-16837631
 ] 

Almog Gavra commented on KAFKA-8305:


Approved 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-464%3A+Defaults+for+AdminClient%23createTopic]

> AdminClient should support creating topics with default partitions and 
> replication factor
> -
>
> Key: KAFKA-8305
> URL: https://issues.apache.org/jira/browse/KAFKA-8305
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Almog Gavra
>Priority: Major
>
> Today, the AdminClient creates topics by requiring a `NewTopic` object, which 
> must contain either partitions and replicas or an exact broker mapping (which 
> then infers partitions and replicas). Some users, however, could benefit from 
> just using the cluster default for replication factor but may not want to use 
> auto topic creation.
> NOTE: I am planning on working on this, but I do not have permissions to 
> assign this ticket to myself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8305) AdminClient should support creating topics with `default.replication.factor`

2019-04-29 Thread Almog Gavra (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-8305:
---
Description: 
Today, the AdminClient creates topics by requiring a `NewTopic` object, which 
must contain either partitions and replicas or an exact broker mapping (which 
then infers partitions and replicas). Some users, however, could benefit from 
just using the cluster default for replication factor but may not want to use 
auto topic creation.

NOTE: I am planning on working on this, but I do not have permissions to assign 
this ticket to myself.

  was:
Today, the AdminClient creates topics by requiring a `NewTopic` object, which 
must contain either partitions and replicas or an exact broker mapping (which 
then infers partitions and replicas). Some users, however, could benefit from 
just using the cluster default for replication factor but may not want to use 
auto topic creation.

 

NOTE: I am planning on working on this, but I do not have permissions to assign 
this ticket to myself.


> AdminClient should support creating topics with `default.replication.factor`
> 
>
> Key: KAFKA-8305
> URL: https://issues.apache.org/jira/browse/KAFKA-8305
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Almog Gavra
>Priority: Major
>
> Today, the AdminClient creates topics by requiring a `NewTopic` object, which 
> must contain either partitions and replicas or an exact broker mapping (which 
> then infers partitions and replicas). Some users, however, could benefit from 
> just using the cluster default for replication factor but may not want to use 
> auto topic creation.
> NOTE: I am planning on working on this, but I do not have permissions to 
> assign this ticket to myself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8305) AdminClient should support creating topics with `default.replication.factor`

2019-04-29 Thread Almog Gavra (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Almog Gavra updated KAFKA-8305:
---
Description: 
Today, the AdminClient creates topics by requiring a `NewTopic` object, which 
must contain either partitions and replicas or an exact broker mapping (which 
then infers partitions and replicas). Some users, however, could benefit from 
just using the cluster default for replication factor but may not want to use 
auto topic creation.

 

NOTE: I am planning on working on this, but I do not have permissions to assign 
this ticket to myself.

  was:Today, the AdminClient creates topics by requiring a `NewTopic` object, 
which must contain either partitions and replicas or an exact broker mapping 
(which then infers partitions and replicas). Some users, however, could benefit 
from just using the cluster default for replication factor but may not want to 
use auto topic creation.


> AdminClient should support creating topics with `default.replication.factor`
> 
>
> Key: KAFKA-8305
> URL: https://issues.apache.org/jira/browse/KAFKA-8305
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Almog Gavra
>Priority: Major
>
> Today, the AdminClient creates topics by requiring a `NewTopic` object, which 
> must contain either partitions and replicas or an exact broker mapping (which 
> then infers partitions and replicas). Some users, however, could benefit from 
> just using the cluster default for replication factor but may not want to use 
> auto topic creation.
>  
> NOTE: I am planning on working on this, but I do not have permissions to 
> assign this ticket to myself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8305) AdminClient should support creating topics with `default.replication.factor`

2019-04-29 Thread Almog Gavra (JIRA)
Almog Gavra created KAFKA-8305:
--

 Summary: AdminClient should support creating topics with 
`default.replication.factor`
 Key: KAFKA-8305
 URL: https://issues.apache.org/jira/browse/KAFKA-8305
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Reporter: Almog Gavra


Today, the AdminClient creates topics by requiring a `NewTopic` object, which 
must contain either partitions and replicas or an exact broker mapping (which 
then infers partitions and replicas). Some users, however, could benefit from 
just using the cluster default for replication factor but may not want to use 
auto topic creation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)