[jira] [Commented] (KAFKA-6086) Provide for custom error handling when Kafka Streams fails to produce

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316143#comment-16316143
 ] 

ASF GitHub Bot commented on KAFKA-6086:
---

dguy closed pull request #4395: MINOR: Add documentation for KAFKA-6086 
(ProductionExceptionHandler)
URL: https://github.com/apache/kafka/pull/4395
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index dbac7fba2e0..256cc18b56f 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -69,6 +69,7 @@
   
   Optional configuration 
parameters
 default.deserialization.exception.handler
+default.production.exception.handler
 default.key.serde
 default.value.serde
 num.standby.replicas
@@ -216,77 +217,82 @@ bootstrap.serversException handling class that implements the DeserializationExceptionHandler interface.
 3 milliseconds
   
-  key.serde
+  default.production.exception.handler
+Medium
+Exception handling class that implements the ProductionExceptionHandler interface.
+DefaultProductionExceptionHandler
+  
+  key.serde
 Medium
 Default serializer/deserializer class for record 
keys, implements the Serde interface (see also value.serde).
 Serdes.ByteArray().getClass().getName()
   
-  metric.reporters
+  metric.reporters
 Low
 A list of classes to use as metrics reporters.
 the empty list
   
-  metrics.num.samples
+  metrics.num.samples
 Low
 The number of samples maintained to compute 
metrics.
 2
   
-  metrics.recording.level
+  metrics.recording.level
 Low
 The highest recording level for metrics.
 INFO
   
-  metrics.sample.window.ms
+  metrics.sample.window.ms
 Low
 The window of time a metrics sample is computed 
over.
 3 milliseconds
   
-  num.standby.replicas
+  num.standby.replicas
 Medium
 The number of standby replicas for each task.
 0
   
-  num.stream.threads
+  num.stream.threads
 Medium
 The number of threads to execute stream 
processing.
 1
   
-  partition.grouper
+  partition.grouper
 Low
 Partition grouper class that implements the PartitionGrouper 
interface.
 See Partition Grouper
   
-  poll.ms
+  poll.ms
 Low
 The amount of time in milliseconds to block 
waiting for input.
 100 milliseconds
   
-  replication.factor
+  replication.factor
 High
 The replication factor for changelog topics and 
repartition topics created by the application.
 1
   
-  state.cleanup.delay.ms
+  state.cleanup.delay.ms
 Low
 The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.
 600 milliseconds
   
-  state.dir
+  state.dir
 High
 Directory location for state stores.
 /var/lib/kafka-streams
   
-  timestamp.extractor
+  timestamp.extractor
 Medium
 Timestamp extractor class that implements the 
TimestampExtractor interface.
 See Timestamp Extractor
   
-  value.serde
+  value.serde
 Medium
 Default serializer/deserializer class for record 
values, implements the Serde interface (see also key.serde).
 Serdes.ByteArray().getClass().getName()
   
-  windowstore.changelog.additional.retention.ms
+  windowstore.changelog.additional.retention.ms
 Low
 Added to a windows maintainMs to ensure data is 
not deleted from the log prematurely. Allows for clock drift.
 8640 milliseconds = 1 day
@@ -309,6 +315,44 @@ bootstrap.servers
+  default.production.exception.handler
+  
+The default production exception handler allows you to 
manage exceptions triggered when trying to interact with a broker
+  such as attempting to produce a record that is too l

[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception

2018-01-08 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316179#comment-16316179
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-6260:
---

Short question. When will those fixes released? :-)
Unfortunately at 
[Relase|https://issues.apache.org/jira/projects/KAFKA?selectedItem=com.atlassian.jira.jira-projects-plugin:release-page]
 page there are no dates.

> AbstractCoordinator not clearly handles NULL Exception
> --
>
> Key: KAFKA-6260
> URL: https://issues.apache.org/jira/browse/KAFKA-6260
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
> Environment: RedHat Linux
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Jason Gustafson
> Fix For: 1.1.0, 1.0.1
>
>
> The error reporting is not clear. But it seems that Kafka Heartbeat shuts 
> down application due to NULL exception caused by "fake" disconnections.
> One more comment. We are processing messages in the stream, but sometimes we 
> have to block processing for minutes, as consumers are not handling too much 
> load. Is it possibble that when stream is waiting, then heartbeat is as well 
> blocked?
> Can you check that?
> {code}
> 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Received successful Heartbeat response
> 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Sending Heartbeat request to coordinator 
> cljp01.eb.lan.at:9093 (id: 2147483646 rack: null)
> 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Sending HEARTBEAT 
> {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08}
>  with correlation id 24 to node 2147483646
> 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT 
> with correlation id 24, received {throttle_time_ms=0,error_code=0}
> 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Received successful Heartbeat response
> 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout.
> 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Cancelled request 
> {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]}
>  with correlation id 21 due to node 1 being disconnected
> 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, 
> apiVersion=6, 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  correlationId=21) with correlation id 21 due to node 1 being disconnected
> 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Fetch request 
> {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, 
> maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, 
> maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, 
> maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, 
> maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, 
> maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed 
> org.apache.kafka.common.errors.DisconnectException: null
> 2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer 
> clientId=kafk

[jira] [Resolved] (KAFKA-4908) consumer.properties logging warnings

2018-01-08 Thread Seweryn Habdank-Wojewodzki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki resolved KAFKA-4908.
---
Resolution: Done

Not an issue for me anymore.

> consumer.properties logging warnings
> 
>
> Key: KAFKA-4908
> URL: https://issues.apache.org/jira/browse/KAFKA-4908
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Minor
>
> default consumer.properties at startaup of the console consumer delivered 
> with Kafka package are logging warnings:
> [2017-03-15 16:36:57,439] WARN The configuration 
> 'zookeeper.connection.timeout.ms' was supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2017-03-15 16:36:57,455] WARN The configuration 'zookeeper.connect' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (KAFKA-4908) consumer.properties logging warnings

2018-01-08 Thread Seweryn Habdank-Wojewodzki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki closed KAFKA-4908.
-

> consumer.properties logging warnings
> 
>
> Key: KAFKA-4908
> URL: https://issues.apache.org/jira/browse/KAFKA-4908
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Minor
>
> default consumer.properties at startaup of the console consumer delivered 
> with Kafka package are logging warnings:
> [2017-03-15 16:36:57,439] WARN The configuration 
> 'zookeeper.connection.timeout.ms' was supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2017-03-15 16:36:57,455] WARN The configuration 'zookeeper.connect' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-4315) Kafka Connect documentation problems

2018-01-08 Thread Seweryn Habdank-Wojewodzki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki resolved KAFKA-4315.
---
Resolution: Done

I do not care anymore about this matter.

> Kafka Connect documentation problems
> 
>
> Key: KAFKA-4315
> URL: https://issues.apache.org/jira/browse/KAFKA-4315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Seweryn Habdank-Wojewodzki
>
> On the base of documentation of the Kafka Connect - 
> http://kafka.apache.org/documentation#connect, I had tried to build example 
> in Java. It was not possible. 
> The code pieces available on the webpage are taken out of any context and 
> they are not compiling. 
> Also it seems they are taken completely from other code software parts, so 
> even putting them together shows, that they are not building any reasonable 
> example. And they tend to be very complex. where I would expect that the API 
> examples are driving "Hello World" like code.
> Also there are weak connections between examples from the Kafka documentation 
> and Kafka Connect tools code parts available in the Kafka source.
> Finally I would be nice to have a kind of statement in the Kafka 
> documentation which parts of API are stable and which are unstable or 
> experimental.
> I saw much (~20) of such a remarks in the Kafka code - I mean that API is 
> unstable. This note is very important, as we will plan additional effort to 
> prepare some facades for unstable code.
> In my opinion it is nothing wrong in experimental API, but all those matters 
> when documented shall be well documented. The current status of the main 
> Kafka documentation makes impression that Kafka Connect is well tested and 
> consistent and stable feature set, but it is not. What leads to confusion on 
> the effort management.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (KAFKA-4315) Kafka Connect documentation problems

2018-01-08 Thread Seweryn Habdank-Wojewodzki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki closed KAFKA-4315.
-

> Kafka Connect documentation problems
> 
>
> Key: KAFKA-4315
> URL: https://issues.apache.org/jira/browse/KAFKA-4315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Seweryn Habdank-Wojewodzki
>
> On the base of documentation of the Kafka Connect - 
> http://kafka.apache.org/documentation#connect, I had tried to build example 
> in Java. It was not possible. 
> The code pieces available on the webpage are taken out of any context and 
> they are not compiling. 
> Also it seems they are taken completely from other code software parts, so 
> even putting them together shows, that they are not building any reasonable 
> example. And they tend to be very complex. where I would expect that the API 
> examples are driving "Hello World" like code.
> Also there are weak connections between examples from the Kafka documentation 
> and Kafka Connect tools code parts available in the Kafka source.
> Finally I would be nice to have a kind of statement in the Kafka 
> documentation which parts of API are stable and which are unstable or 
> experimental.
> I saw much (~20) of such a remarks in the Kafka code - I mean that API is 
> unstable. This note is very important, as we will plan additional effort to 
> prepare some facades for unstable code.
> In my opinion it is nothing wrong in experimental API, but all those matters 
> when documented shall be well documented. The current status of the main 
> Kafka documentation makes impression that Kafka Connect is well tested and 
> consistent and stable feature set, but it is not. What leads to confusion on 
> the effort management.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3039) Temporary loss of leader resulted in log being completely truncated

2018-01-08 Thread Vincent Da (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316265#comment-16316265
 ] 

Vincent Da commented on KAFKA-3039:
---

we have also experienced this on a kafka server for many __consumer_offsets-X 
topic partiitons (and also on regular ones ) on a new kafka 0.11.0.2 
{noformat}
/usr/local/kafka/logs/server.log.2018-01-06-13-[2018-01-06 13:27:35,568] WARN 
[ReplicaFetcherThread-0-3]: Based on follower's leader epoch, leader replied 
with an unknown offset in tests-1. High watermark 0 will be used for 
truncation. (kafka.server.ReplicaFetcherThread)
/usr/local/kafka/logs/server.log.2018-01-06-13:[2018-01-06 13:27:35,568] INFO 
Truncating log __consumer_offsets-19 to offset 0. (kafka.log.Log)
{noformat}



> Temporary loss of leader resulted in log being completely truncated
> ---
>
> Key: KAFKA-3039
> URL: https://issues.apache.org/jira/browse/KAFKA-3039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: Debian 3.2.54-2 x86_64 GNU/Linux
>Reporter: Imran Patel
>Priority: Critical
>  Labels: reliability
>
> We had an event recently where the temporarily loss of a leader for a 
> partition (during a manual restart), resulted in the leader coming back with 
> no high watermark state and truncating its log to zero. Logs (attached below) 
> indicate that it did have the data but not the commit state. How is this 
> possible?
> Leader (broker 3)
> [2015-12-18 21:19:44,666] INFO Completed load of log messages-14 with log end 
> offset 14175963374 (kafka.log.Log)
> [2015-12-18 21:19:45,170] INFO Partition [messages,14] on broker 3: No 
> checkpointed highwatermark is found for partition [messages,14] 
> (kafka.cluster.Partition)
> [2015-12-18 21:19:45,238] INFO Truncating log messages-14 to offset 0. 
> (kafka.log.Log)
> [2015-12-18 21:20:34,066] INFO Partition [messages,14] on broker 3: Expanding 
> ISR for partition [messages,14] from 3 to 3,10 (kafka.cluster.Partition)
> Replica (broker 10)
> [2015-12-18 21:19:19,525] INFO Partition [messages,14] on broker 10: 
> Shrinking ISR for partition [messages,14] from 3,10,4 to 10,4 
> (kafka.cluster.Partition)
> [2015-12-18 21:20:34,049] ERROR [ReplicaFetcherThread-0-3], Current offset 
> 14175984203 for partition [messages,14] out of range; reset offset to 35977 
> (kafka.server.ReplicaFetcherThread)
> [2015-12-18 21:20:34,033] WARN [ReplicaFetcherThread-0-3], Replica 10 for 
> partition [messages,14] reset its fetch offset from 14175984203 to current 
> leader 3's latest offset 35977 (kafka.server.ReplicaFetcherThread)
> Some relevant config parameters:
> offsets.topic.replication.factor = 3
> offsets.commit.required.acks = -1
> replica.high.watermark.checkpoint.interval.ms = 5000
> unclean.leader.election.enable = false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6369) General wildcard support for ACL's in kafka

2018-01-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316649#comment-16316649
 ] 

Luiz Picanço commented on KAFKA-6369:
-

Seens related to KAFKA-5713

> General wildcard support for ACL's in kafka
> ---
>
> Key: KAFKA-6369
> URL: https://issues.apache.org/jira/browse/KAFKA-6369
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Antony Stubbs
>
> Especially for streams apps where all intermediate topics are prefixed with 
> the application id.
> For example, add read and write access to mystreamsapp.* so any new topics 
> created by the app don't need to have specific permissions applied to them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6422) When enable trace level log in mirror maker, it will throw null pointer exception and the mirror maker will shutdown

2018-01-08 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316749#comment-16316749
 ] 

James Cheng commented on KAFKA-6422:


I have no ability to merge PRs, but the change looks good to me.

> When enable trace level log in mirror maker, it will throw null pointer 
> exception and the mirror maker will shutdown
> 
>
> Key: KAFKA-6422
> URL: https://issues.apache.org/jira/browse/KAFKA-6422
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 0.11.0.1, 
> 0.11.0.2
>Reporter: Xin Li
>Assignee: Xin Li
>Priority: Minor
>  Labels: easyfix
> Fix For: 0.11.0.0
>
>
> https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/tools/MirrorMaker.scala#L414
> when enable trace level log in mirror maker, if the message value is null, it 
> will throw null pointer exception, and mirror maker will shutdown because of 
> that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6387) Worker's producer and consumer configs should inherit from worker configs

2018-01-08 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316820#comment-16316820
 ] 

Randall Hauch commented on KAFKA-6387:
--

There are a number of cases where this behavior might actually change how the 
producers and consumers used for connectors are configured. Consider a worker 
configuration includes the following configuration properties:

{code}
retries=2
max.partition.fetch.bytes=262144
{code}

This configuration file does not define 
"{{consumer.max.partition.fetch.bytes}}" or 
"{{producer.max.partition.fetch.bytes}}", so currently these would default to 
1048576. However, after this proposed change if implemented the 
"{{consumer.max.partition.fetch.bytes}}" or 
"{{producer.max.partition.fetch.bytes}}" values would be the inherited value of 
262144, not the default 1048576.

In short, implementing this change would break backward compatibility. We could 
implement a configuration switch that controls whether the configurations are 
inherited, but this adds complexity to the already-complex configuration 
mechanism.



> Worker's producer and consumer configs should inherit from worker configs
> -
>
> Key: KAFKA-6387
> URL: https://issues.apache.org/jira/browse/KAFKA-6387
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>  Labels: needs-kip
>
> Currently, the worker configuration file defines the connection properties 
> for the three separate types of connections being made to the Kafka cluster:
> # the worker group membership,
> # producers for source connectors,
> # the consumers for sink connectors. 
> The configs are namespaced because to properly support things like 
> interceptors where the configs for 2 and 3 would conflict (same config name, 
> different value).
> However, it would be beneficial when such control is not required for the 
> producers and consumers to inherit the top-level configurations yet be able 
> to override them with the {{producer.}} and {{consumer.}} namespaced 
> configurations. This way the producer- and consumer-specific configurations 
> need only be specified if/when they need to override the top-level 
> configurations. This may be necessary, for example, to have different ACLs 
> than the connector tasks compared to the producers and consumers.
> This will require a minimal KIP to explain the new behavior. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6387) Worker's producer and consumer configs should inherit from worker configs

2018-01-08 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-6387.
--
Resolution: Won't Fix

Because of the inability to maintain backward compatibility of the behavior, 
I'm withdrawing this request and marking this issue as WONTFIX.

> Worker's producer and consumer configs should inherit from worker configs
> -
>
> Key: KAFKA-6387
> URL: https://issues.apache.org/jira/browse/KAFKA-6387
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>  Labels: needs-kip
>
> Currently, the worker configuration file defines the connection properties 
> for the three separate types of connections being made to the Kafka cluster:
> # the worker group membership,
> # producers for source connectors,
> # the consumers for sink connectors. 
> The configs are namespaced because to properly support things like 
> interceptors where the configs for 2 and 3 would conflict (same config name, 
> different value).
> However, it would be beneficial when such control is not required for the 
> producers and consumers to inherit the top-level configurations yet be able 
> to override them with the {{producer.}} and {{consumer.}} namespaced 
> configurations. This way the producer- and consumer-specific configurations 
> need only be specified if/when they need to override the top-level 
> configurations. This may be necessary, for example, to have different ACLs 
> than the connector tasks compared to the producers and consumers.
> This will require a minimal KIP to explain the new behavior. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6387) Worker's producer and consumer configs should inherit from worker configs

2018-01-08 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316820#comment-16316820
 ] 

Randall Hauch edited comment on KAFKA-6387 at 1/8/18 7:25 PM:
--

Created 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-246%3A+Connect+producers+and+consumers+should+inherit+worker+configs.
 However, while writing up the KIP I discovered that there are a number of 
cases where this behavior might actually change how the producers and consumers 
used for connectors are configured. Consider a worker configuration includes 
the following configuration properties:

{code}
retries=2
max.partition.fetch.bytes=262144
{code}

This configuration file does not define 
"{{consumer.max.partition.fetch.bytes}}" or 
"{{producer.max.partition.fetch.bytes}}", so currently these would default to 
1048576. However, after this proposed change if implemented the 
"{{consumer.max.partition.fetch.bytes}}" or 
"{{producer.max.partition.fetch.bytes}}" values would be the inherited value of 
262144, not the default 1048576.

In short, implementing this change would break backward compatibility. We could 
implement a configuration switch that controls whether the configurations are 
inherited, but this adds complexity to the already-complex configuration 
mechanism.




was (Author: rhauch):
There are a number of cases where this behavior might actually change how the 
producers and consumers used for connectors are configured. Consider a worker 
configuration includes the following configuration properties:

{code}
retries=2
max.partition.fetch.bytes=262144
{code}

This configuration file does not define 
"{{consumer.max.partition.fetch.bytes}}" or 
"{{producer.max.partition.fetch.bytes}}", so currently these would default to 
1048576. However, after this proposed change if implemented the 
"{{consumer.max.partition.fetch.bytes}}" or 
"{{producer.max.partition.fetch.bytes}}" values would be the inherited value of 
262144, not the default 1048576.

In short, implementing this change would break backward compatibility. We could 
implement a configuration switch that controls whether the configurations are 
inherited, but this adds complexity to the already-complex configuration 
mechanism.



> Worker's producer and consumer configs should inherit from worker configs
> -
>
> Key: KAFKA-6387
> URL: https://issues.apache.org/jira/browse/KAFKA-6387
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>  Labels: needs-kip
>
> Currently, the worker configuration file defines the connection properties 
> for the three separate types of connections being made to the Kafka cluster:
> # the worker group membership,
> # producers for source connectors,
> # the consumers for sink connectors. 
> The configs are namespaced because to properly support things like 
> interceptors where the configs for 2 and 3 would conflict (same config name, 
> different value).
> However, it would be beneficial when such control is not required for the 
> producers and consumers to inherit the top-level configurations yet be able 
> to override them with the {{producer.}} and {{consumer.}} namespaced 
> configurations. This way the producer- and consumer-specific configurations 
> need only be specified if/when they need to override the top-level 
> configurations. This may be necessary, for example, to have different ACLs 
> than the connector tasks compared to the producers and consumers.
> This will require a minimal KIP to explain the new behavior. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods

2018-01-08 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6412:
--
Attachment: 6412-jmh.v1.txt

> Improve synchronization in CachingKeyValueStore methods
> ---
>
> Key: KAFKA-6412
> URL: https://issues.apache.org/jira/browse/KAFKA-6412
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
> Attachments: 6412-jmh.v1.txt, k-6412.v1.txt
>
>
> Currently CachingKeyValueStore methods are synchronized at method level.
> It seems we can use read lock for getter and write lock for put / delete 
> methods.
> For getInternal(), if the underlying thread is streamThread, the 
> getInternal() may trigger eviction. This can be handled by obtaining write 
> lock at the beginning of the method for streamThread.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6424) QueryableStateIntegrationTest#queryOnRebalance should be accept raw text

2018-01-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6424:
-
Component/s: streams

> QueryableStateIntegrationTest#queryOnRebalance should be accept raw text
> 
>
> Key: KAFKA-6424
> URL: https://issues.apache.org/jira/browse/KAFKA-6424
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Priority: Minor
>
> I was using QueryableStateIntegrationTest#queryOnRebalance for some 
> performance test by adding more sentences to inputValues.
> I found that when the sentence contains upper case letter, the test would 
> timeout.
> I get around this limitation by calling {{sentence.toLowerCase(Locale.ROOT)}} 
> before the split.
> Ideally we can specify the path to text file which contains the text. The 
> test can read the text file and generate the input array.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6424) QueryableStateIntegrationTest#queryOnRebalance should be accept raw text

2018-01-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6424:
-
Labels: newbie unit-test  (was: )

> QueryableStateIntegrationTest#queryOnRebalance should be accept raw text
> 
>
> Key: KAFKA-6424
> URL: https://issues.apache.org/jira/browse/KAFKA-6424
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Ted Yu
>Priority: Minor
>  Labels: newbie, unit-test
>
> I was using QueryableStateIntegrationTest#queryOnRebalance for some 
> performance test by adding more sentences to inputValues.
> I found that when the sentence contains upper case letter, the test would 
> timeout.
> I get around this limitation by calling {{sentence.toLowerCase(Locale.ROOT)}} 
> before the split.
> Ideally we can specify the path to text file which contains the text. The 
> test can read the text file and generate the input array.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316912#comment-16316912
 ] 

ASF GitHub Bot commented on KAFKA-6363:
---

guozhangwang closed pull request #4371: KAFKA-6363: Use MockAdminClient for any 
unit tests that depend on Adm…
URL: https://github.com/apache/kafka/pull/4371
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
similarity index 91%
rename from 
clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
rename to 
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index cca35ac22c9..10281fb6ffa 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -35,21 +35,21 @@
  * 
  * When finished, be sure to {@link #close() close} the environment object.
  */
-public class MockKafkaAdminClientEnv implements AutoCloseable {
+public class AdminClientUnitTestEnv implements AutoCloseable {
 private final Time time;
 private final Cluster cluster;
 private final MockClient mockClient;
 private final KafkaAdminClient adminClient;
 
-public MockKafkaAdminClientEnv(Cluster cluster, String...vals) {
+public AdminClientUnitTestEnv(Cluster cluster, String...vals) {
 this(Time.SYSTEM, cluster, vals);
 }
 
-public MockKafkaAdminClientEnv(Time time, Cluster cluster, String...vals) {
+public AdminClientUnitTestEnv(Time time, Cluster cluster, String...vals) {
 this(time, cluster, newStrMap(vals));
 }
 
-public MockKafkaAdminClientEnv(Time time, Cluster cluster, Map config) {
+public AdminClientUnitTestEnv(Time time, Cluster cluster, Map config) {
 this.time = time;
 this.cluster = cluster;
 AdminClientConfig adminClientConfig = new AdminClientConfig(config);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index c0fe73c36ed..84588a9f3be 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -38,10 +38,10 @@
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
@@ -75,8 +75,8 @@
 import java.util.concurrent.Future;
 
 import static java.util.Arrays.asList;
-import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.apache.kafka.common.requests.ResourceType.BROKER;
+import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -155,7 +155,7 @@ public void testGenerateClientId() {
 
KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG,
 "myCustomId")));
 }
 
-private static MockKafkaAdminClientEnv mockClientEnv(String... configVals) 
{
+private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
 HashMap nodes = new HashMap<>();
 nodes.put(0, new Node(0, "localhost", 8121));
 nodes.put(1, new Node(1, "localhost", 8122));
@@ -163,12 +163,12 @@ private static MockKafkaAdminClientEnv 
mockClientEnv(String... configVals) {
 Cluster cluster = new Cluster("mockClusterId", nodes.values(),
 Collections.emptySet(), 
Collections.emptySet(),
 Collections.emptySet(), nodes.get(0));
-return new MockKafkaAdminClientEnv(cluster, configVals);
+return new AdminClientUnitTestEnv(cluster, configVals);
 }
 
 @Test
 public void testCloseAdminClient() throws Exception {
-try (MockKafkaAdminClientEnv env = mockCl

[jira] [Updated] (KAFKA-3625) Move kafka-streams test fixtures into a published package

2018-01-08 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3625:
---
Description: 
The KStreamTestDriver and related fixtures defined in 
streams/src/test/java/org/apache/kafka/test would be useful to developers 
building applications on top of Kafka Streams, but they are not currently 
exposed in a package.

I propose moving this directory to live under streams/fixtures/src/main and 
creating a new 'streams:fixtures' project in the gradle configuration to 
publish these as a separate package.

KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams

  was:
The KStreamTestDriver and related fixtures defined in 
streams/src/test/java/org/apache/kafka/test would be useful to developers 
building applications on top of Kafka Streams, but they are not currently 
exposed in a package.

I propose moving this directory to live under streams/fixtures/src/main and 
creating a new 'streams:fixtures' project in the gradle configuration to 
publish these as a separate package.


> Move kafka-streams test fixtures into a published package
> -
>
> Key: KAFKA-3625
> URL: https://issues.apache.org/jira/browse/KAFKA-3625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip, user-experience
>
> The KStreamTestDriver and related fixtures defined in 
> streams/src/test/java/org/apache/kafka/test would be useful to developers 
> building applications on top of Kafka Streams, but they are not currently 
> exposed in a package.
> I propose moving this directory to live under streams/fixtures/src/main and 
> creating a new 'streams:fixtures' project in the gradle configuration to 
> publish these as a separate package.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3625) Move kafka-streams test fixtures into a published package

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317097#comment-16317097
 ] 

ASF GitHub Bot commented on KAFKA-3625:
---

mjsax opened a new pull request #4402: KAFKA-3625: Add public test utils for 
Kafka Streams [WIP]
URL: https://github.com/apache/kafka/pull/4402
 
 
- add new artifact test-utils
- add TopologyTestDriver
- add MockTime, TestRecord, add TestRecordFactory
   
   This PR requires a KIP and is WIP. DO NOT MERGE.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Move kafka-streams test fixtures into a published package
> -
>
> Key: KAFKA-3625
> URL: https://issues.apache.org/jira/browse/KAFKA-3625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip, user-experience
>
> The KStreamTestDriver and related fixtures defined in 
> streams/src/test/java/org/apache/kafka/test would be useful to developers 
> building applications on top of Kafka Streams, but they are not currently 
> exposed in a package.
> I propose moving this directory to live under streams/fixtures/src/main and 
> creating a new 'streams:fixtures' project in the gradle configuration to 
> publish these as a separate package.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6428) Fail builds on findbugs warnings

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317170#comment-16317170
 ] 

ASF GitHub Bot commented on KAFKA-6428:
---

ewencp closed pull request #4398: KAFKA-6428: Generate findbugs output for CI 
and fail builds for 'high' level warnings
URL: https://github.com/apache/kafka/pull/4398
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/README.md b/README.md
index 9371e482169..7db602325c8 100644
--- a/README.md
+++ b/README.md
@@ -182,13 +182,16 @@ The checkstyle warnings will be found in 
`reports/checkstyle/reports/main.html`
 subproject build directories. They are also are printed to the console. The 
build will fail if Checkstyle fails.
 
  Findbugs 
-Findbugs uses static analysis to look for bugs in the code.
+Findbugs uses static analysis to look for bugs in the code. Findbugs is 
executed as part of the normal build process
+and will be included in CI testing. Normally xml reports are generated which 
are not very human readable unless processed
+by Jenkins. If you want to use check Findbugs results during development, run 
it so it generates HTML output:
+
 You can run findbugs using:
 
-./gradlew findbugsMain findbugsTest -x test
+./gradlew findbugsMain findbugsTest -x test -PhtmlFindBugsReport=true
 
 The findbugs warnings will be found in `reports/findbugs/main.html` and 
`reports/findbugs/test.html` files in the subproject build
-directories.  Use -PxmlFindBugsReport=true to generate an XML report instead 
of an HTML one.
+directories.
 
 ### Common build options ###
 
diff --git a/build.gradle b/build.gradle
index 725cf0b8bb2..bdbf0fbe6bb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -371,13 +371,14 @@ subprojects {
   toolVersion = "3.0.1"
   excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml")
   ignoreFailures = false
+  reportLevel = "high"
 }
 test.dependsOn('findbugsMain')
 
 tasks.withType(FindBugs) {
   reports {
-xml.enabled(project.hasProperty('xmlFindBugsReport'))
-html.enabled(!project.hasProperty('xmlFindBugsReport'))
+xml.enabled(!project.hasProperty('htmlFindBugsReport'))
+html.enabled(project.hasProperty('htmlFindBugsReport'))
   }
 }
   }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fail builds on findbugs warnings
> 
>
> Key: KAFKA-6428
> URL: https://issues.apache.org/jira/browse/KAFKA-6428
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>
> Findbugs spots likely bugs, and especially for warnings at the High level, it 
> actually has pretty good signal for real bugs (or just things that might be 
> risky). We should be failing builds, especially PRs, if any sufficiently high 
> warnings are listed. We should get this enabled for that level and then 
> decide if we want to adjust the level of warnings we want to address.
> This likely relates to KAFKA-5887 since findbugs may not be sufficiently 
> maintained for JDK9 support. In any case, the intent is to fail the build 
> based on whichever tool is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6428) Fail builds on findbugs warnings

2018-01-08 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-6428.
--
Resolution: Invalid

Seems it was already setup to report & fail, the reporting on Jenkins was just 
missing.

> Fail builds on findbugs warnings
> 
>
> Key: KAFKA-6428
> URL: https://issues.apache.org/jira/browse/KAFKA-6428
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>
> Findbugs spots likely bugs, and especially for warnings at the High level, it 
> actually has pretty good signal for real bugs (or just things that might be 
> risky). We should be failing builds, especially PRs, if any sufficiently high 
> warnings are listed. We should get this enabled for that level and then 
> decide if we want to adjust the level of warnings we want to address.
> This likely relates to KAFKA-5887 since findbugs may not be sufficiently 
> maintained for JDK9 support. In any case, the intent is to fail the build 
> based on whichever tool is used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly

2018-01-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6398:
-
Summary: Non-aggregation KTable generation operator does not construct 
value getter correctly  (was: Stream-Table join fails, if table is not 
materialized)

> Non-aggregation KTable generation operator does not construct value getter 
> correctly
> 
>
> Key: KAFKA-6398
> URL: https://issues.apache.org/jira/browse/KAFKA-6398
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>  Labels: bug
>
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
> building: StateStore null is not added yet.
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., 
> "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor KSTREAM-JOIN-05
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
> topology building: Processor KSTREAM-JOIN-05 has no access to 
> StateStore KTABLE-SOURCE-STATE-STORE-00
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = 
> builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly

2018-01-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6398:
-
Labels: bug  (was: )

> Non-aggregation KTable generation operator does not construct value getter 
> correctly
> 
>
> Key: KAFKA-6398
> URL: https://issues.apache.org/jira/browse/KAFKA-6398
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>  Labels: bug
>
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
> building: StateStore null is not added yet.
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., 
> "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor KSTREAM-JOIN-05
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
> topology building: Processor KSTREAM-JOIN-05 has no access to 
> StateStore KTABLE-SOURCE-STATE-STORE-00
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = 
> builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly

2018-01-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6398:
-
Priority: Critical  (was: Major)

> Non-aggregation KTable generation operator does not construct value getter 
> correctly
> 
>
> Key: KAFKA-6398
> URL: https://issues.apache.org/jira/browse/KAFKA-6398
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bug
>
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
> building: StateStore null is not added yet.
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., 
> "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor KSTREAM-JOIN-05
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
> topology building: Processor KSTREAM-JOIN-05 has no access to 
> StateStore KTABLE-SOURCE-STATE-STORE-00
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
> {noformat}
> One can workaround by piping the result through a topic:
> {noformat}
> final KTable filteredKTable = 
> builder.table("table-topic").filter(...).through("TOPIC");;
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly

2018-01-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6398:
-
Description: 
For any operator that generates a KTable, its {{valueGetterSupplier}} has three 
code path:

1. If the operator is a KTable source operator, using its materialized state 
store for value getter (note that currently we always materialize on KTable 
source).
2. If the operator is an aggregation operator, then its generated KTable should 
always be materialized so we just use its materialized state store.
3. Otherwise, we treat the value getter in a per-operator basis.

For 3) above, what we SHOULD do is that, if the generated KTable is 
materialized, the value getter would just rely on its materialized state store 
to get the value; otherwise we just rely on the operator itself to define which 
parent's value getter to inherit and what computational logic to apply 
on-the-fly to get the value. For example, for {{KTable#filter()}} where the 
{{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just get 
from parent's value getter and then apply the filter on the fly; and in 
addition we should let the future operators to be able to access its parent's 
materialized state store via {{connectProcessorAndStateStore}}.

However, current code does not do this correctly: it 1) does not check if the 
result KTable is materialized or not, but always try to use its parent's value 
getter, and 2) it does not try to connect its parent's materialized store to 
the future operator. As a result, these operators such as {{KTable#filter}}, 
{{KTable#mapValues}}, and {{KTable#join(KTable)}} would result in 
TopologyException when building. The following is an example:



Using a non-materialized KTable in a stream-table join fails:

{noformat}
final KTable filteredKTable = builder.table("table-topic").filter(...);
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

fails with
{noformat}
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: StateStore null is not added yet.

at 
org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
at 
org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
{noformat}

Adding a store name is not sufficient as workaround but fails differently:
{noformat}
final KTable filteredKTable = builder.table("table-topic").filter(..., 
"STORE-NAME");
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

error:
{noformat}
org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor KSTREAM-JOIN-05

at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
at 
org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
at 
org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
topology building: Processor KSTREAM-JOIN-05 has no access to 
StateStore KTABLE-SOURCE-STATE-STORE-00
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
at 
org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
at 
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
{noformat}

One can workaround by piping the result through a topic:
{noformat}
final KTable filteredKTable = 
builder.table("table-topic").filter(...).through("TOPIC");;
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}




Note that

[jira] [Updated] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly

2018-01-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6398:
-
Description: 
For any operator that generates a KTable, its {{valueGetterSupplier}} has three 
code path:

1. If the operator is a KTable source operator, using its materialized state 
store for value getter (note that currently we always materialize on KTable 
source).
2. If the operator is an aggregation operator, then its generated KTable should 
always be materialized so we just use its materialized state store.
3. Otherwise, we treat the value getter in a per-operator basis.

For 3) above, what we SHOULD do is that, if the generated KTable is 
materialized, the value getter would just rely on its materialized state store 
to get the value; otherwise we just rely on the operator itself to define which 
parent's value getter to inherit and what computational logic to apply 
on-the-fly to get the value. For example, for {{KTable#filter()}} where the 
{{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just get 
from parent's value getter and then apply the filter on the fly; and in 
addition we should let the future operators to be able to access its parent's 
materialized state store via {{connectProcessorAndStateStore}}.

However, current code does not do this correctly: it 1) does not check if the 
result KTable is materialized or not, but always try to use its parent's value 
getter, and 2) it does not try to connect its parent's materialized store to 
the future operator. As a result, these operators such as {{KTable#filter}}, 
{{KTable#mapValues}}, and {{KTable#join(KTable)}} would result in 
TopologyException when building. The following is an example:



Using a non-materialized KTable in a stream-table join fails:

{noformat}
final KTable filteredKTable = builder.table("table-topic").filter(...);
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

fails with
{noformat}
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
building: StateStore null is not added yet.

at 
org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
at 
org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
at 
org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
{noformat}

Adding a store name is not sufficient as workaround but fails differently:
{noformat}
final KTable filteredKTable = builder.table("table-topic").filter(..., 
"STORE-NAME");
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}

error:
{noformat}
org.apache.kafka.streams.errors.StreamsException: failed to initialize 
processor KSTREAM-JOIN-05

at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
at 
org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
at 
org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
topology building: Processor KSTREAM-JOIN-05 has no access to 
StateStore KTABLE-SOURCE-STATE-STORE-00
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
at 
org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
at 
org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
at 
org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53)
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111)
{noformat}

One can workaround by piping the result through a topic:
{noformat}
final KTable filteredKTable = 
builder.table("table-topic").filter(...).through("TOPIC");;
builder.stream("stream-topic").join(filteredKTable,...);
{noformat}




Note that

[jira] [Commented] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly

2018-01-08 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317240#comment-16317240
 ] 

Guozhang Wang commented on KAFKA-6398:
--

[~bja...@isi.nc] I can reproduce the issue you mentioned, and I have a theory 
on its root cause, which is reflected in the updated description of this JIRA. 
My plan is to merge https://github.com/apache/kafka/pull/4384/files first, 
which fixes a minor issue but not this ticket as a whole, and then will 
continue to work on this ticket.

> Non-aggregation KTable generation operator does not construct value getter 
> correctly
> 
>
> Key: KAFKA-6398
> URL: https://issues.apache.org/jira/browse/KAFKA-6398
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bug
>
> For any operator that generates a KTable, its {{valueGetterSupplier}} has 
> three code path:
> 1. If the operator is a KTable source operator, using its materialized state 
> store for value getter (note that currently we always materialize on KTable 
> source).
> 2. If the operator is an aggregation operator, then its generated KTable 
> should always be materialized so we just use its materialized state store.
> 3. Otherwise, we treat the value getter in a per-operator basis.
> For 3) above, what we SHOULD do is that, if the generated KTable is 
> materialized, the value getter would just rely on its materialized state 
> store to get the value; otherwise we just rely on the operator itself to 
> define which parent's value getter to inherit and what computational logic to 
> apply on-the-fly to get the value. For example, for {{KTable#filter()}} where 
> the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just 
> get from parent's value getter and then apply the filter on the fly; and in 
> addition we should let the future operators to be able to access its parent's 
> materialized state store via {{connectProcessorAndStateStore}}.
> However, current code does not do this correctly: it 1) does not check if the 
> result KTable is materialized or not, but always try to use its parent's 
> value getter, and 2) it does not try to connect its parent's materialized 
> store to the future operator. As a result, these operators such as 
> {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would 
> result in TopologyException when building. The following is an example:
> 
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
> building: StateStore null is not added yet.
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., 
> "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor KSTREAM-JOIN-05
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
> topology building: Processor KSTREAM-JOIN-05 has no access to 
> StateStore KTABLE-SOURCE-STATE-STORE-00
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
>   at 
> org.apache.kafka.streams.kstream.inte

[jira] [Commented] (KAFKA-6422) When enable trace level log in mirror maker, it will throw null pointer exception and the mirror maker will shutdown

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317292#comment-16317292
 ] 

ASF GitHub Bot commented on KAFKA-6422:
---

hachikuji closed pull request #4387: KAFKA-6422 Mirror maker will throw null 
pointer exception when the message value is null
URL: https://github.com/apache/kafka/pull/4387
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 618fd2a95b9..907fe20f414 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -67,7 +67,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 
   private[tools] var producer: MirrorMakerProducer = null
   private var mirrorMakerThreads: Seq[MirrorMakerThread] = null
-  private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false)
+  private val isShuttingDown: AtomicBoolean = new AtomicBoolean(false)
   // Track the messages not successfully sent by mirror maker.
   private val numDroppedMessages: AtomicInteger = new AtomicInteger(0)
   private var messageHandler: MirrorMakerMessageHandler = null
@@ -384,7 +384,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   }
 
   def cleanShutdown() {
-if (isShuttingdown.compareAndSet(false, true)) {
+if (isShuttingDown.compareAndSet(false, true)) {
   info("Start clean shutdown.")
   // Shutdown consumer threads.
   info("Shutting down consumer threads.")
@@ -426,7 +426,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
   try {
 while (!exitingOnSendFailure && !shuttingDown && 
mirrorMakerConsumer.hasData) {
   val data = mirrorMakerConsumer.receive()
-  trace("Sending message with value size %d and offset 
%d".format(data.value.length, data.offset))
+  if (data.value != null) {
+trace("Sending message with value size %d and offset 
%d.".format(data.value.length, data.offset))
+  } else {
+trace("Sending message with null value and offset 
%d.".format(data.offset))
+  }
   val records = messageHandler.handle(data)
   records.asScala.foreach(producer.send)
   maybeFlushAndCommitOffsets()
@@ -459,7 +463,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
 shutdownLatch.countDown()
 info("Mirror maker thread stopped")
 // if it exits accidentally, stop the entire mirror maker
-if (!isShuttingdown.get()) {
+if (!isShuttingDown.get()) {
   fatal("Mirror maker thread exited abnormally, stopping the whole 
mirror maker.")
   sys.exit(-1)
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> When enable trace level log in mirror maker, it will throw null pointer 
> exception and the mirror maker will shutdown
> 
>
> Key: KAFKA-6422
> URL: https://issues.apache.org/jira/browse/KAFKA-6422
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 0.11.0.1, 
> 0.11.0.2
>Reporter: Xin Li
>Assignee: Xin Li
>Priority: Minor
>  Labels: easyfix
> Fix For: 0.11.0.0
>
>
> https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/tools/MirrorMaker.scala#L414
> when enable trace level log in mirror maker, if the message value is null, it 
> will throw null pointer exception, and mirror maker will shutdown because of 
> that.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2018-01-08 Thread Joerg Heinicke (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joerg Heinicke updated KAFKA-6366:
--
Attachment: ConverterProcessor_DEBUG.zip

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
>Assignee: Jason Gustafson
> Attachments: 6366.v1.txt, ConverterProcessor.zip, 
> ConverterProcessor_DEBUG.zip, Screenshot-2017-12-19 21.35-22.10 processing.png
>
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
> ...
> the following 9 lines are repeated around hundred times.
> ...
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(R

[jira] [Created] (KAFKA-6433) Connect distributed workers should fail if their config is "incompatible" with leader's

2018-01-08 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-6433:


 Summary: Connect distributed workers should fail if their config 
is "incompatible" with leader's
 Key: KAFKA-6433
 URL: https://issues.apache.org/jira/browse/KAFKA-6433
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Randall Hauch


Currently, each distributed worker config must have the same `worker.id` and 
must use the same internal topics for configs, offsets, and status. 
Additionally, each worker must be configured to have the same connectors, SMTs, 
and converters; confusing error messages will result when some workers are able 
to deploy connector tasks with SMTs while others fail when they are missing 
plugins the other workers do have.

Ideally, a Connect workers would only be allowed to join the cluster if it were 
"compatible" with the the existing cluster, where "compatible" perhaps includes 
using the same internal topics and having the same set of plugins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6433) Connect distributed workers should fail if their config is "incompatible" with leader's

2018-01-08 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317325#comment-16317325
 ] 

Randall Hauch commented on KAFKA-6433:
--

Incidentally, it would not help to change what Connect stores in the status 
topic, because if the workers are using different status topics they would read 
different status information. A better option might be to ship this additional 
information in the metadata used in Connect's rebalance subprotocol. We can't 
do this today, but we're talking about evolving the protocol for incremental 
rebalance, and it'd be great to also add some additional worker metadata during 
that evolution as well as tolerate optional metadata, enabling adding more 
metadata fields that may not be necessary across the cluster.

> Connect distributed workers should fail if their config is "incompatible" 
> with leader's
> ---
>
> Key: KAFKA-6433
> URL: https://issues.apache.org/jira/browse/KAFKA-6433
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>
> Currently, each distributed worker config must have the same `worker.id` and 
> must use the same internal topics for configs, offsets, and status. 
> Additionally, each worker must be configured to have the same connectors, 
> SMTs, and converters; confusing error messages will result when some workers 
> are able to deploy connector tasks with SMTs while others fail when they are 
> missing plugins the other workers do have.
> Ideally, a Connect workers would only be allowed to join the cluster if it 
> were "compatible" with the the existing cluster, where "compatible" perhaps 
> includes using the same internal topics and having the same set of plugins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6433) Connect distributed workers should fail if their config is "incompatible" with leader's

2018-01-08 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6433:
-
Labels: needs-kip  (was: )

> Connect distributed workers should fail if their config is "incompatible" 
> with leader's
> ---
>
> Key: KAFKA-6433
> URL: https://issues.apache.org/jira/browse/KAFKA-6433
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>  Labels: needs-kip
>
> Currently, each distributed worker config must have the same `worker.id` and 
> must use the same internal topics for configs, offsets, and status. 
> Additionally, each worker must be configured to have the same connectors, 
> SMTs, and converters; confusing error messages will result when some workers 
> are able to deploy connector tasks with SMTs while others fail when they are 
> missing plugins the other workers do have.
> Ideally, a Connect workers would only be allowed to join the cluster if it 
> were "compatible" with the the existing cluster, where "compatible" perhaps 
> includes using the same internal topics and having the same set of plugins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6433) Connect distributed workers should fail if their config is "incompatible" with leader's

2018-01-08 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317329#comment-16317329
 ] 

Randall Hauch commented on KAFKA-6433:
--

KAFKA-5505 will likely be implemented as a change/evolution of Connect's 
rebalance subprotocol, and the requirements to solve this issue should be 
considered as part of that effort.

> Connect distributed workers should fail if their config is "incompatible" 
> with leader's
> ---
>
> Key: KAFKA-6433
> URL: https://issues.apache.org/jira/browse/KAFKA-6433
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>  Labels: needs-kip
>
> Currently, each distributed worker config must have the same `worker.id` and 
> must use the same internal topics for configs, offsets, and status. 
> Additionally, each worker must be configured to have the same connectors, 
> SMTs, and converters; confusing error messages will result when some workers 
> are able to deploy connector tasks with SMTs while others fail when they are 
> missing plugins the other workers do have.
> Ideally, a Connect workers would only be allowed to join the cluster if it 
> were "compatible" with the the existing cluster, where "compatible" perhaps 
> includes using the same internal topics and having the same set of plugins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5505) Connect: Do not restart connector and existing tasks on task-set change

2018-01-08 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317333#comment-16317333
 ] 

Randall Hauch commented on KAFKA-5505:
--

We'll likely change/evolve Connect's rebalance subprotocol to enable 
implementing this improvement. See also KAFKA-6433 for another improvement that 
will require changes to the metadata used in the subprotocol.

> Connect: Do not restart connector and existing tasks on task-set change
> ---
>
> Key: KAFKA-5505
> URL: https://issues.apache.org/jira/browse/KAFKA-5505
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Per Steffensen
>
> I am writing a connector with a frequently changing task-set. It is really 
> not working very well, because the connector and all existing tasks are 
> restarted when the set of tasks changes. E.g. if the connector is running 
> with 10 tasks, and an additional task is needed, the connector itself and all 
> 10 existing tasks are restarted, just to make the 11th task run also. My 
> tasks have a fairly heavy initialization, making it extra annoying. I would 
> like to see a change, introducing a "mode", where only new/deleted tasks are 
> started/stopped when notifying the system that the set of tasks changed 
> (calling context.requestTaskReconfiguration() - or something similar).
> Discussed this issue a little on d...@kafka.apache.org in the thread "Kafka 
> Connect: To much restarting with a SourceConnector with dynamic set of tasks"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2018-01-08 Thread Joerg Heinicke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317346#comment-16317346
 ] 

Joerg Heinicke commented on KAFKA-6366:
---

Sorry for the delay. Even though I planned to at least try to extract the 
information from the system over Christmas holidays I haven't managed. Now back 
from vacation I have done so. I attached the log file 
[^ConverterProcessor_DEBUG.zip] (which is around 700k lines and 150 MB for 
about 5 mins!). I don't get additional hints on the issue, not sure whether it 
helps you to confirm the scenario.

We don't have particular test scenarios to test the patch which means we would 
have to run this directly in production - which I'm not too comfortable with if 
you could not even confirm the scenario yet. Another question which comes to my 
mind is how the consumer will behave in case we hit the scenario with the patch 
applied since apparently all other threads are still able to commit while the 
failing thread (pool-5-thread-5 in the attached log file) marked the 
coordinator dead, i.e. what is the expected and probably originally intended 
behavior. And on the most basic and practical side: How do I get a Kafka 
distribution with the patch applied? Apparently I will have to build it myself. 
Can you give me some kick-off hints? Is the documentation at 
https://github.com/apache/kafka all I need?

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
>Assignee: Jason Gustafson
> Attachments: 6366.v1.txt, ConverterProcessor.zip, 
> ConverterProcessor_DEBUG.zip, Screenshot-2017-12-19 21.35-22.10 processing.png
>
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clie

[jira] [Commented] (KAFKA-6096) Add concurrent tests to exercise all paths in group/transaction managers

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317394#comment-16317394
 ] 

ASF GitHub Bot commented on KAFKA-6096:
---

hachikuji closed pull request #4122: KAFKA-6096: Add multi-threaded tests for 
group coordinator, txn manager
URL: https://github.com/apache/kafka/pull/4122
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
new file mode 100644
index 000..0ecc3f538b1
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import java.util.{ Collections, Random }
+import java.util.concurrent.{ ConcurrentHashMap, Executors }
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.Lock
+
+import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
+import kafka.log.Log
+import kafka.server._
+import kafka.utils._
+import kafka.utils.timer.MockTimer
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{ MemoryRecords, RecordBatch, 
RecordsProcessingStats }
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.easymock.EasyMock
+import org.junit.{ After, Before }
+
+import scala.collection._
+import scala.collection.JavaConverters._
+
+abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] {
+
+  val nThreads = 5
+
+  val time = new MockTime
+  val timer = new MockTimer
+  val executor = Executors.newFixedThreadPool(nThreads)
+  val scheduler = new MockScheduler(time)
+  var replicaManager: TestReplicaManager = _
+  var zkClient: KafkaZkClient = _
+  val serverProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+  val random = new Random
+
+  @Before
+  def setUp() {
+
+replicaManager = 
EasyMock.partialMockBuilder(classOf[TestReplicaManager]).createMock()
+replicaManager.createDelayedProducePurgatory(timer)
+
+zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+  }
+
+  @After
+  def tearDown() {
+EasyMock.reset(replicaManager)
+if (executor != null)
+  executor.shutdownNow()
+  }
+
+  /**
+* Verify that concurrent operations run in the normal sequence produce the 
expected results.
+*/
+  def verifyConcurrentOperations(createMembers: String => Set[M], operations: 
Seq[Operation]) {
+OrderedOperationSequence(createMembers("verifyConcurrentOperations"), 
operations).run()
+  }
+
+  /**
+* Verify that arbitrary operations run in some random sequence don't leave 
the coordinator
+* in a bad state. Operations in the normal sequence should continue to 
work as expected.
+*/
+  def verifyConcurrentRandomSequences(createMembers: String => Set[M], 
operations: Seq[Operation]) {
+EasyMock.reset(replicaManager)
+for (i <- 0 to 10) {
+  // Run some random operations
+  RandomOperationSequence(createMembers(s"random$i"), operations).run()
+
+  // Check that proper sequences still work correctly
+  OrderedOperationSequence(createMembers(s"ordered$i"), operations).run()
+}
+  }
+
+  def verifyConcurrentActions(actions: Set[Action]) {
+val futures = actions.map(executor.submit)
+futures.map(_.get)
+enableCompletion()
+actions.foreach(_.await())
+  }
+
+  def enableCompletion(): Unit = {
+replicaManager.tryCompleteDelayedRequests()
+scheduler.tick()
+  }
+
+  abstract class OperationSequence(members: Set[M], operations: 
Seq[Operation]) {
+def actionSequence: Seq[Set[Action]]
+def run(): Unit = {
+  actionSequence.foreach(verifyConcurrentActions)
+}
+  }
+

[jira] [Commented] (KAFKA-6384) TransactionsTest#testFencingOnSendOffsets sometimes fails with ProducerFencedException

2018-01-08 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317566#comment-16317566
 ] 

Ted Yu commented on KAFKA-6384:
---

Haven't seen this failure for a while.

> TransactionsTest#testFencingOnSendOffsets sometimes fails with 
> ProducerFencedException
> --
>
> Key: KAFKA-6384
> URL: https://issues.apache.org/jira/browse/KAFKA-6384
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>
> From 
> https://builds.apache.org/job/kafka-trunk-jdk8/2283/testReport/junit/kafka.api/TransactionsTest/testFencingOnSendOffsets/
>  :
> {code}
> org.scalatest.junit.JUnitTestFailedError: Got an unexpected exception from a 
> fenced producer.
>   at 
> org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:100)
>   at 
> org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:71)
>   at org.scalatest.Assertions$class.fail(Assertions.scala:1105)
>   at org.scalatest.junit.JUnitSuite.fail(JUnitSuite.scala:71)
>   at 
> kafka.api.TransactionsTest.testFencingOnSendOffsets(TransactionsTest.scala:357)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy1.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnecti

[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2018-01-08 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5863:
--
Description: 
Here is the call chain:
{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}

In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}

  was:
Here is the call chain:

{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}

In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}


> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6403) Why kafka sync send message with 10 seconds delay

2018-01-08 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6403:
-
Fix Version/s: (was: 1.0.0)

[~change] Since you've only provided some configs, it'll probably be difficult 
to track down the issue. You'd probably need to provide at least more 
information about how you are measuring the timing, and even so the issue could 
be anywhere from the client to any of the brokers involved. Probably most 
useful here would be logs -- if this is reproducible, turn the logging on the 
client up to DEBUG level.

I've removed the fix version as it does not make sense for a bug that affects 
1.0.0 to also be fixable in 1.0.0. We can update the fix version if the issue 
is tracked down and we determine whether including it in a bugfix version or 
just the next major/minor release makes sense.

> Why kafka sync send message with  10 seconds delay
> --
>
> Key: KAFKA-6403
> URL: https://issues.apache.org/jira/browse/KAFKA-6403
> Project: Kafka
>  Issue Type: Test
>  Components: producer 
>Affects Versions: 1.0.0
>Reporter: change
>Priority: Blocker
>
> I have a timertask to send a message to kafka  every half an hour, Statistics 
> reports 
> ||send starttime|send successfully time|delay/ms||
> |2017-12-26 15:50:25.413 |2017-12-26 15:50:35,447|10034|
> |2017-12-26 16:20:35.419 |2017-12-26 16:20:45,483|10064|
> |2017-12-26 17:28:20.708|2017-12-26 17:28:25,743|5035 |
> |2017-12-26 18:44:20.447|2017-12-26 18:44:25,516|5069|
> |2017-12-26 19:14:25.518|2017-12-26 19:14:30,547|5029|
>  ProducerConfig values: 
>   acks = 1
>   batch.size = 16384
>   bootstrap.servers = [192.168.0.179:39092]
>   buffer.memory = 33554432
>   client.id = 
>   compression.type = none
>   connections.max.idle.ms = 54
>   enable.idempotence = false
>   interceptor.classes = null
>   key.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>   linger.ms = 0
>   max.block.ms = 6
>   max.in.flight.requests.per.connection = 5
>   max.request.size = 1048576
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partitioner.class = class 
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>   receive.buffer.bytes = 32768
>   reconnect.backoff.max.ms = 1000
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 3
>   retries = 3
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   ssl.keystore.type = JKS
>   ssl.protocol = TLS
>   ssl.provider = null
>   ssl.secure.random.implementation = null
>   ssl.trustmanager.algorithm = PKIX
>   ssl.truststore.location = null
>   ssl.truststore.password = null
>   ssl.truststore.type = JKS
>   transaction.timeout.ms = 6
>   transactional.id = null
>   value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
> [2017-12-26 03:30:58,042] INFO KafkaConfig values: 
> advertised.host.name = kafka-1.default.svc.cluster.local
> advertised.listeners = null
> advertised.port = null
> alter.config.policy.class.name = null
> authorizer.class.name = 
> auto.create.topics.enable = true
> auto.leader.rebalance.enable = true
> background.threads = 10
> broker.id = 1
> broker.id.generation.enable = true
> broker.rack = null
> compression.type = producer
> connections.max.idle.ms = 60
> controlled.shutdown.enable = true
> controlled.shutdown.max.retries = 3
> controlled.shutdown.retry.backoff.ms = 5000
> controller.socket.timeout.ms = 3
> create.topic.policy.class.name = null
> default.replication.factor = 3
> delete.records.purgatory.purge.interval.requests = 1
> delete.topic.enable = true
> fetch.purgatory.purge.interval.requests = 1000
> group.initial.rebalance.delay.ms = 3000
>

[jira] [Commented] (KAFKA-4686) Null Message payload is shutting down broker

2018-01-08 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317654#comment-16317654
 ] 

Ewen Cheslack-Postava commented on KAFKA-4686:
--

[~ijuma] This has been marked critical, but has getting bumped from version to 
version since May. Is this really something that should be slated for 1.0.1 if 
we keep bumping it down the road? (Admittedly it's "critical" and not 
"blocker", but still...)

> Null Message payload is shutting down broker
> 
>
> Key: KAFKA-4686
> URL: https://issues.apache.org/jira/browse/KAFKA-4686
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
> Environment: Amazon Linux AMI release 2016.03 kernel 
> 4.4.19-29.55.amzn1.x86_64
>Reporter: Rodrigo Queiroz Saramago
>Priority: Critical
> Fix For: 1.0.1
>
> Attachments: KAFKA-4686-NullMessagePayloadError.tar.xz, 
> kafkaServer.out
>
>
> Hello, I have a test environment with 3 brokers and 1 zookeeper nodes, in 
> which clients connect using two-way ssl authentication. I use kafka version 
> 0.10.1.1, the system works as expected for a while, but if the broker goes 
> down and then is restarted, something got corrupted and is not possible start 
> broker again, it always fails with the same error. What this error mean? What 
> can I do in this case? Is this the expected behavior?
> [2017-01-23 07:03:28,927] ERROR There was an error in one of the threads 
> during logs loading: kafka.common.KafkaException: Message payload is null: 
> Message(magic = 0, attributes = 1, crc = 4122289508, key = null, payload = 
> null) (kafka.log.LogManager)
> [2017-01-23 07:03:28,929] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> kafka.common.KafkaException: Message payload is null: Message(magic = 0, 
> attributes = 1, crc = 4122289508, key = null, payload = null)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90)
> at 
> kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
> at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33)
> at kafka.log.LogSegment.recover(LogSegment.scala:223)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
> at kafka.log.Log.loadSegments(Log.scala:179)
> at kafka.log.Log.(Log.scala:108)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> [2017-01-23 07:03:28,946] INFO shutting down (kafka.server.KafkaServer)
> [2017-01-23 07:03:28,949] INFO Terminate ZkClient event thread. 
> (org.I0Itec.zkclient.ZkEventThread)
> [2017-01-23 07:03:28,954] INFO EventThread shut down for session: 
> 0x159bd458ae70008 (org.apache.zookeeper.ClientCnxn)
> [2017-01-23 07:03:28,954] INFO Session: 0x159bd458ae70008 closed 
> (org.apache.zookeeper.ZooKeeper)
> [2017-01-23 07:03:28,957] INFO shut down completed (kafka.server.KafkaServer)
> [2017-01-23 07:03:28,959] FATAL Fatal error during KafkaServerStartable 
> startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> kafka.common.KafkaException: Message payload is null: Message(magic = 0, 
> attributes = 1, crc = 4122289508, key = null, payload = null)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90)
> at 
> kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
> at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33)
> at kafka.log.LogSegment.recover(LogSegment.scala:223)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>

[jira] [Commented] (KAFKA-6396) Possibly kafka-connect converter should be able to stop processing chain

2018-01-08 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317752#comment-16317752
 ] 

Ewen Cheslack-Postava commented on KAFKA-6396:
--

If I'm understanding your request properly, there are a couple of problems with 
what you're proposing.

First, for transformations it would be a fundamental change to how they work. 
Today they work as SMTs: single message transforms, which means you get the 
entire message. For sink connectors we a) convert the entire message, b) 
transform the entire message, c) process the entire message in the sink 
connector. To pass the data to stage (b), it *must* have fully been processed, 
key and value, by (a).

Second, what you want to do by checking the value for `null` doesn't work 
because `null` is valid for values. Transformations only remove the record if 
*the entire record* is returned as `null`. A `null` value will be written and 
potentially used for compaction if it is contained in a surrounding record.

It would certainly be possible to write a system that worked the way you 
describe, but it requires a much more complicated processing pipeline. You need 
to define the order in which each component of the message is deserialized, 
define different transformations for each (as well as potentially a 
transformation for the entire record if you want to support functionality like 
Kafka Connect currently supports like copying data between key/value). This 
gets even more complicated when you consider all the components, not all of 
which are in Kafka Connect yet: key, value, headers, timestamp, etc.

To me, this smells like trying to fit a pretty highly optimized transformation 
pipeline into Connect simply because most of the building blocks are there to 
do so without coding. (I would consider any case where you're trying to avoid 
deserializing *parts* of a record, a pretty optimized used case.) Personally, 
I'd recommend writing a small Kafka Streams app to handle this case, where you 
can carefully select how to deserialize and process the data, and interleave 
the processing of most components of the record carefully to optimize 
performance.

> Possibly kafka-connect converter should be able to stop processing chain
> 
>
> Key: KAFKA-6396
> URL: https://issues.apache.org/jira/browse/KAFKA-6396
> Project: Kafka
>  Issue Type: Wish
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Alexander Koval
>Priority: Minor
>
> At present only transformations can discard records returning null. But I 
> think sometimes it would be nice to discard processing chain after converting 
> message. For example I have some tags shipped with a message key and I want 
> to stop processing the message after converting its key (there are a lot of 
> messages and I don't want to deserialize message values that I don't need).
> At the moment to do that I should disable converters and move message 
> deserializing to the transformation chain:
> {code}
> key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
> value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
> transforms=proto,catalog
> transforms.proto.type=company.evo.kafka.ProtobufTransformation
> transforms.proto.key.protobuf.class=company.evo.uaprom.indexator.KeyProto$KeyMessage
> transforms.proto.value.protobuf.class=company.evo.uaprom.indexator.catalog.CompanyProto$UniversalCompanyMessage
> transforms.proto.tag=catalog
> {code}
> If 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/1.0.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L453]
>  checked converted values on {{null}} it would solved my problem more 
> gracefully



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4674) Frequent ISR shrinking and expanding and disconnects among brokers

2018-01-08 Thread Andrey Falko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317809#comment-16317809
 ] 

Andrey Falko commented on KAFKA-4674:
-

I just reproduced this on a 5 node test cluster by gradually creating 35k 3x 
replicated topics and holding a consumer for each topic. I'm running latest 
1.0.0.

How many partitions do you have and what is their replication factor in your 
setups?

> Frequent ISR shrinking and expanding and disconnects among brokers
> --
>
> Key: KAFKA-4674
> URL: https://issues.apache.org/jira/browse/KAFKA-4674
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core
>Affects Versions: 0.10.0.1
> Environment: OS: Redhat Linux 2.6.32-431.el6.x86_64
> JDK: 1.8.0_45
>Reporter: Kaiming Wan
> Attachments: controller.log.rar, kafkabroker.20170221.log.zip, 
> server.log.2017-01-11-14, zookeeper.out.2017-01-11.log
>
>
> We use a kafka cluster with 3 brokers in production environment. It works 
> well for several month. Recently, we get the UnderReplicatedPartitions>0 
> warning mail. When we check the log, we find that the partition is always 
> experience ISR shrinking and expanding. And the disconnection exception can 
> be found in controller's log.
> We also found some deviant output in zookeeper's log which point to a 
> consumer(using old API depends on zookeeper ) which has stopped its work with 
> many lags.
> Actually, it is not the first time we encounter this problem. When we 
> first met this problem, we also found the same phenomenon and the log output. 
> We solve the problem by deleting the consumer node info in zookeeper. Then 
> everything goes well.
> However, this time, after we deleting the consumer which already have 
> large lag, the frequent ISR shrinking and expanding didn't stop for a very 
> long time(serveral hours). Though, the issue didn't affect our consumer and 
> producer, we think it will make our cluster unstable. So at last, we solve 
> this problem by restart the controller broker.
> And now I wander what cause this problem. I check the source code and 
> only know poll timeout will cause disconnection and ISR shrinking. Is the 
> issue related to zookeeper because it will not hold too many metadata 
> modification and make the replication fetch thread take more time?
> I upload the log file in the attachment.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)