[jira] [Commented] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb
[ https://issues.apache.org/jira/browse/KAFKA-7640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16719527#comment-16719527 ] hitesh gollahalli bachanna commented on KAFKA-7640: --- [~vvcephei] sure, I have attached some sceenshot that shows how the data and request are on different host. I will work on gathering actual debug logs from kafka internal class. There are 3 files. 1. intital_data_host.jpeg – shows that which consumer received data. The last column(host) shows that. But iI used the ip address to make the actual rest call, which are routable. 2. rest_api_call_host.jpeg – after I sent the message, I make a rest call, they land on some random node, host name are on the last column. 3. rest_redirect_host.jpeg – code check of the host is localSelf, if not send the request to ip/host to fetch the result, which is a different host compared to what is shown in the intital_data_host.jpeg file. here is rest call code for reference {code:java} HostStoreInfo hostStoreInfo = metadataService.streamsMetadataForStoreAndKey( config.getMOVE_DATA_AGGS_STORE_NAME(), storeDpci, new JsonSerializer<>()); System.out.println("host_info :" + hostStoreInfo + " for_key :" + storeDpci); MoveDataVo moveDataVo = null; System.out.println("localSelf :" + localSelf.host() + " remote_host:" + hostStoreInfo.getHost()); if (localSelf.host().equals(hostStoreInfo.getHost())) { ReadOnlyKeyValueStore store = kafkaStreams.store(config.getMOVE_DATA_AGGS_STORE_NAME(), QueryableStoreTypes.keyValueStore()); if (store != null) { moveDataVo = store.get(storeDpci); } } else { String fooResourceUrl = "http://; + hostStoreInfo.getHost() + ":" + 8080 + "/dataaggs/v1/get_state?storedpci=" + storeDpci + ""; ResponseEntity response = restTemplate.getForEntity(fooResourceUrl + "", MoveDataVo.class); moveDataVo = response.getBody(); System.out.println("response_from_other: " + moveDataVo); } {code} > Kafka stream interactive query not returning data when state is backed by > rocksdb > - > > Key: KAFKA-7640 > URL: https://issues.apache.org/jira/browse/KAFKA-7640 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: hitesh gollahalli bachanna >Priority: Major > Attachments: intital_data_host.jpeg, rest_api_call_host.jpeg, > rest_redirect_host.jpeg > > > I have a kafka stream app running with 36 different instance (one for each > partition). Each instance come up one after the other. And I am building rest > service on top of the state to access the data. > Here some code that I use: > {code:java} > StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); > --> call this find ouy which host has the key > if (localSelf.host().equals(hostStoreInfo.getHost())) { > get the key from local store > } > else { > call the remote host using restTemplate > }{code} > The problem now is `metadata` object returned has a different host/ip but the > data is on a different node. I was able to see using some application logs I > printed. This happens every time I start my application. > The `allMetadata` method in `KafkaStreams` class says the value will be > update as when the partition get reassigned. But its not happening in this > case. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb
[ https://issues.apache.org/jira/browse/KAFKA-7640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hitesh gollahalli bachanna updated KAFKA-7640: -- Attachment: intital_data_host.jpeg rest_api_call_host.jpeg rest_redirect_host.jpeg > Kafka stream interactive query not returning data when state is backed by > rocksdb > - > > Key: KAFKA-7640 > URL: https://issues.apache.org/jira/browse/KAFKA-7640 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: hitesh gollahalli bachanna >Priority: Major > Attachments: intital_data_host.jpeg, rest_api_call_host.jpeg, > rest_redirect_host.jpeg > > > I have a kafka stream app running with 36 different instance (one for each > partition). Each instance come up one after the other. And I am building rest > service on top of the state to access the data. > Here some code that I use: > {code:java} > StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); > --> call this find ouy which host has the key > if (localSelf.host().equals(hostStoreInfo.getHost())) { > get the key from local store > } > else { > call the remote host using restTemplate > }{code} > The problem now is `metadata` object returned has a different host/ip but the > data is on a different node. I was able to see using some application logs I > printed. This happens every time I start my application. > The `allMetadata` method in `KafkaStreams` class says the value will be > update as when the partition get reassigned. But its not happening in this > case. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb
[ https://issues.apache.org/jira/browse/KAFKA-7640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718018#comment-16718018 ] hitesh gollahalli bachanna commented on KAFKA-7640: --- Hello [~vvcephei] The rest api return 200 status code, but no data 1. Client is on 2.0.0 version but the server is 1.1.1 2. We use Virtual IP (to load balance ) as the kafka broker address in client code. 3. I have 36 partitions in the topic, for which there are 36 consumer on different machines. 4. I have about 360 million messages on the topic. I see rebalance happening, because i see a bunch of `revoked at the beginning of consumer rebalance`. Things I see in the when DEBUG is enabled. Using older server API v7 to send FETCHUsing older server API v5 to send METADATA \{topics=[],allow_auto_topic_creation=false} Using older server API v1 to send HEARTBEAT Is there anything in particular I need to look for in the logs. > Kafka stream interactive query not returning data when state is backed by > rocksdb > - > > Key: KAFKA-7640 > URL: https://issues.apache.org/jira/browse/KAFKA-7640 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: hitesh gollahalli bachanna >Priority: Major > > I have a kafka stream app running with 36 different instance (one for each > partition). Each instance come up one after the other. And I am building rest > service on top of the state to access the data. > Here some code that I use: > {code:java} > StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); > --> call this find ouy which host has the key > if (localSelf.host().equals(hostStoreInfo.getHost())) { > get the key from local store > } > else { > call the remote host using restTemplate > }{code} > The problem now is `metadata` object returned has a different host/ip but the > data is on a different node. I was able to see using some application logs I > printed. This happens every time I start my application. > The `allMetadata` method in `KafkaStreams` class says the value will be > update as when the partition get reassigned. But its not happening in this > case. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb
hitesh gollahalli bachanna created KAFKA-7640: - Summary: Kafka stream interactive query not returning data when state is backed by rocksdb Key: KAFKA-7640 URL: https://issues.apache.org/jira/browse/KAFKA-7640 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.0.0 Reporter: hitesh gollahalli bachanna I have a kafka stream app running with 36 different instance (one for each partition). Each instance come up one after the other. And I am building rest service on top of the state to access the data. Here some code that I use: {code:java} StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); --> call this find ouy which host has the key if (localSelf.host().equals(hostStoreInfo.getHost())) { get the key from local store } else { call the remote host using restTemplate }{code} The problem now is `metadata` object returned has a different host/ip but the data is on a different node. I was able to see using some application logs I printed. This happens every time I start my application. The `allMetadata` method in `KafkaStreams` class says the value will be update as when the partition get reassigned. But its not happening in this case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)