[jira] [Commented] (KAFKA-6161) Introduce new serdes interfaces with empty configure() and close()

2017-11-03 Thread Kamal Chandraprakash (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238790#comment-16238790
 ] 

Kamal Chandraprakash commented on KAFKA-6161:
-

My opinion is that we can do these cleanups when Kafka source code migrates to 
Java 8. In Java 8, we can define `default` method in the interface. Now, with 
your current patch, we cannot extend any more classes in `Serde` / `Serializer` 
and `DeSerializer`.

> Introduce new serdes interfaces with empty configure() and close()
> --
>
> Key: KAFKA-6161
> URL: https://issues.apache.org/jira/browse/KAFKA-6161
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Evgeny Veretennikov
>Assignee: Evgeny Veretennikov
>Priority: Normal
>
> {{Serializer}}, {{Deserializer}} and {{Serde}} interfaces have methods 
> {{configure()}} and {{close()}}. Pretty often one want to leave these methods 
> empty. For example, a lot of serializers inside 
> {{org.apache.kafka.common.serialization}} package have these methods empty:
> {code}
> @Override
> public void configure(Map configs, boolean isKey) {
> // nothing to do
> }
> @Override
> public void close() {
> // nothing to do
> }
> {code}
> To avoid such boilerplate, we may create new interfaces (like 
> {{UnconfiguredSerializer}}), in which we will define these methods empty.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2017-11-03 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5863:
--
Description: 
Here is the call chain:
{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}

  was:
Here is the call chain:

{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}


> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6074) Use ZookeeperClient in ReplicaManager and Partition

2017-11-03 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6074:
--
Attachment: 6074.v10.txt

> Use ZookeeperClient in ReplicaManager and Partition
> ---
>
> Key: KAFKA-6074
> URL: https://issues.apache.org/jira/browse/KAFKA-6074
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Jun Rao
>Assignee: Ted Yu
>Priority: Major
> Fix For: 1.1.0
>
> Attachments: 6074.v1.txt, 6074.v10.txt
>
>
> We want to replace the usage of ZkUtils with ZookeeperClient in 
> ReplicaManager and Partition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6168) Connect Schema comparison is slow for large schemas

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238704#comment-16238704
 ] 

ASF GitHub Bot commented on KAFKA-6168:
---

GitHub user tedyu opened a pull request:

https://github.com/apache/kafka/pull/4176

KAFKA-6168 Connect Schema comparison is slow for large schemas

Re-arrange order of comparisons in equals() to evaluate non-composite 
fields first
Cache hash code

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4176.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4176


commit 4bae62c8ad64408b20e4925977beadaa42b54619
Author: tedyu 
Date:   2017-11-04T02:03:59Z

KAFKA-6168 Connect Schema comparison is slow for large schemas




> Connect Schema comparison is slow for large schemas
> ---
>
> Key: KAFKA-6168
> URL: https://issues.apache.org/jira/browse/KAFKA-6168
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Assignee: Ted Yu
>Priority: Critical
> Attachments: 6168.v1.txt
>
>
> The {{ConnectSchema}} implementation computes the hash code every time its 
> needed, and {{equals(Object)}} is a deep equality check. This extra work can 
> be expensive for large schemas, especially in code like the {{AvroConverter}} 
> (or rather {{AvroData}} in the converter) that uses instances as keys in a 
> hash map that then requires significant use of {{hashCode}} and {{equals}}.
> The {{ConnectSchema}} is an immutable object and should at a minimum 
> precompute the hash code. Also, the order that the fields are compared in 
> {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} 
> field is one of the _last_ fields to be checked). Finally, it might be worth 
> considering having each instance precompute and cache a string or byte[] 
> representation of all fields that can be used for faster equality checking.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6168) Connect Schema comparison is slow for large schemas

2017-11-03 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238698#comment-16238698
 ] 

Randall Hauch commented on KAFKA-6168:
--

[~yuzhih...@gmail.com], thanks for offering to work on this. I think the first 
step would be to create a PR with a commit that has the changes you attached as 
a patch, and then we can look into possibly doing something about improving the 
performance of {{equals}}. 

> Connect Schema comparison is slow for large schemas
> ---
>
> Key: KAFKA-6168
> URL: https://issues.apache.org/jira/browse/KAFKA-6168
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Assignee: Ted Yu
>Priority: Critical
> Attachments: 6168.v1.txt
>
>
> The {{ConnectSchema}} implementation computes the hash code every time its 
> needed, and {{equals(Object)}} is a deep equality check. This extra work can 
> be expensive for large schemas, especially in code like the {{AvroConverter}} 
> (or rather {{AvroData}} in the converter) that uses instances as keys in a 
> hash map that then requires significant use of {{hashCode}} and {{equals}}.
> The {{ConnectSchema}} is an immutable object and should at a minimum 
> precompute the hash code. Also, the order that the fields are compared in 
> {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} 
> field is one of the _last_ fields to be checked). Finally, it might be worth 
> considering having each instance precompute and cache a string or byte[] 
> representation of all fields that can be used for faster equality checking.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6168) Connect Schema comparison is slow for large schemas

2017-11-03 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-6168:


Assignee: Ted Yu

> Connect Schema comparison is slow for large schemas
> ---
>
> Key: KAFKA-6168
> URL: https://issues.apache.org/jira/browse/KAFKA-6168
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Assignee: Ted Yu
>Priority: Critical
> Attachments: 6168.v1.txt
>
>
> The {{ConnectSchema}} implementation computes the hash code every time its 
> needed, and {{equals(Object)}} is a deep equality check. This extra work can 
> be expensive for large schemas, especially in code like the {{AvroConverter}} 
> (or rather {{AvroData}} in the converter) that uses instances as keys in a 
> hash map that then requires significant use of {{hashCode}} and {{equals}}.
> The {{ConnectSchema}} is an immutable object and should at a minimum 
> precompute the hash code. Also, the order that the fields are compared in 
> {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} 
> field is one of the _last_ fields to be checked). Finally, it might be worth 
> considering having each instance precompute and cache a string or byte[] 
> representation of all fields that can be used for faster equality checking.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6166) Streams configuration requires consumer. and producer. in order to be read

2017-11-03 Thread James Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238673#comment-16238673
 ] 

James Cheng commented on KAFKA-6166:


Thanks [~guozhang]. Just to make sure I understand, currently, if I want to 
apply my metric reporter to both the producer and consumer, what I need to do 
is:

{code}
Properties config = new Properties(); 
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); 
config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"com.mycompany.MetricReporter"); 
config.put("consumer.custom-key-for-metric-reporter", "value");
config.put("producer.custom-key-for-metric-reporter", "value");
{code}

And if I want to apply to kafka streams itself and the producer and the 
consumer, I need to do

{code}
Properties config = new Properties(); 
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); 
config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"com.mycompany.MetricReporter"); 
config.put("consumer.custom-key-for-metric-reporter", "value");
config.put("producer.custom-key-for-metric-reporter", "value");
config.put("custom-key-for-metric-reporter", "value");
{code}

Is that right?

And in the future, if this JIRA is fixed, I will simply need to do:
{code}
Properties config = new Properties(); 
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); 
config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"com.mycompany.MetricReporter"); 
config.put("custom-key-for-metric-reporter", "value");
{code}

Is that right?

> Streams configuration requires consumer. and producer. in order to be read
> --
>
> Key: KAFKA-6166
> URL: https://issues.apache.org/jira/browse/KAFKA-6166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: Kafka 0.11.0.0
> JDK 1.8
> CoreOS
>Reporter: Justin Manchester
>Priority: Minor
>
> Problem:
> In previous release you could specify a custom metrics reporter like so:
> Properties config = new Properties(); 
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); 
> config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
> "com.mycompany.MetricReporter"); 
> config.put("custom-key-for-metric-reporter", "value");
> From 0.11.0.0 onwards this is no longer possible, as you have to specify 
> consumer.custom-key-for-metric-reporter or 
> producer.custom-key-for-metric-reporter otherwise it's stripped out of the 
> configuration.
> So, if you wish to use a metrics reporter and to collect producer and 
> consumer metrics, as well as kafka-streams metrics, that you would need to 
> specify 3 distinct configs:
> 1) consumer.custom-key-for-metric-reporter 
> 2) producer.custom-key-for-metric-reporter 
> 3) custom-key-for-metric-reporter
> This appears to be a regression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6148) ClassCastException in BigQuery connector

2017-11-03 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238621#comment-16238621
 ] 

Ewen Cheslack-Postava commented on KAFKA-6148:
--

[~burd0047] Can you clarify which version of the BigQuery connector you are 
using? I don't see a branch or tag that was using 0.11.0.0 Kafka, they all 
still look like they are on 0.10.2 series.

> ClassCastException in BigQuery connector
> 
>
> Key: KAFKA-6148
> URL: https://issues.apache.org/jira/browse/KAFKA-6148
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Eugene Burd
>Assignee: Konstantine Karantasis
>Priority: Major
>
> I am trying to run a com.wepay.kafka.connect.bigquery.BigQuerySinkConnector 
> connector, but getting the following exception.  
> [2017-10-30 21:48:49,007] ERROR WorkerSinkTask{id=bigquery-connector-log-0} 
> Offset commit failed, rewinding to last committed offsets 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:311)
> java.lang.ClassCastException: 
> org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to 
> org.apache.kafka.clients.consumer.OffsetAndMetadata
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107)
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96)
>   at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> [2017-10-30 21:48:49,012] ERROR Commit of 
> WorkerSinkTask{id=bigquery-connector-log-0} offsets threw an unexpected 
> exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:205)
> java.lang.ClassCastException: 
> org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to 
> org.apache.kafka.clients.consumer.OffsetAndMetadata
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107)
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96)
>   at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> I have checked the version number of kafka client in the plug in and kafka 
> connect itself and they are the same.  
> - kafka-clients-0.11.0.0.jar matches
> I am still suspecting a type of versioning issue.  Do you have any advice? 
> Thanks. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6167) Timestamp on streams directory contains a colon, which is an illegal character

2017-11-03 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6167:
-
Labels: user-experience  (was: )

> Timestamp on streams directory contains a colon, which is an illegal character
> --
>
> Key: KAFKA-6167
> URL: https://issues.apache.org/jira/browse/KAFKA-6167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: AK 1.0.0
> Kubernetes
> CoreOS
> JDK 1.8
> Windows
>Reporter: Justin Manchester
>Priority: Normal
>  Labels: user-experience
>
> Problem:
> Development on Windows, which is not fully supported, however still a bug 
> that should be corrected.
> It looks like a timestamp was added to the streams directory using a colon as 
> separator. I believe this is an illegal character and potentially the cause 
> for the exception below.
> Error Stack:
> 2017-11-02 16:06:41 ERROR 
> [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1]
>  org.apache.kafka.streams.processor.internals.AssignedTasks:301 - 
> stream-thread 
> [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1]
>  Failed to process stream task 0_0 due to the following error: 
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=input-a_1, 
> partition=0, offset=0 
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
>  [kafka-streams-1.0.0.jar:?] 
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> opening store KSTREAM-JOINTHIS-04-store:150962400 at location 
> C:\Users\ADRIAN~1.MCC\AppData\Local\Temp\kafka3548813472740086814\StreamDeduplicatorAcceptanceTest1\0_0\KSTREAM-JOINTHIS-04-store\KSTREAM-JOINTHIS-04-store:150962400
>  
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174)
>  ~[kafka-streams-1.0.0.jar:?] 
> at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) 
> ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:89)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
>  ~[kafka-

[jira] [Assigned] (KAFKA-6148) ClassCastException in BigQuery connector

2017-11-03 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava reassigned KAFKA-6148:


Assignee: Konstantine Karantasis

> ClassCastException in BigQuery connector
> 
>
> Key: KAFKA-6148
> URL: https://issues.apache.org/jira/browse/KAFKA-6148
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Eugene Burd
>Assignee: Konstantine Karantasis
>Priority: Major
>
> I am trying to run a com.wepay.kafka.connect.bigquery.BigQuerySinkConnector 
> connector, but getting the following exception.  
> [2017-10-30 21:48:49,007] ERROR WorkerSinkTask{id=bigquery-connector-log-0} 
> Offset commit failed, rewinding to last committed offsets 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:311)
> java.lang.ClassCastException: 
> org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to 
> org.apache.kafka.clients.consumer.OffsetAndMetadata
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107)
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96)
>   at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> [2017-10-30 21:48:49,012] ERROR Commit of 
> WorkerSinkTask{id=bigquery-connector-log-0} offsets threw an unexpected 
> exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:205)
> java.lang.ClassCastException: 
> org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to 
> org.apache.kafka.clients.consumer.OffsetAndMetadata
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107)
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96)
>   at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> I have checked the version number of kafka client in the plug in and kafka 
> connect itself and they are the same.  
> - kafka-clients-0.11.0.0.jar matches
> I am still suspecting a type of versioning issue.  Do you have any advice? 
> Thanks. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6148) ClassCastException in BigQuery connector

2017-11-03 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238589#comment-16238589
 ] 

Ewen Cheslack-Postava commented on KAFKA-6148:
--

I think this is due to the new classpath isolation. I see from 
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java#L120-L121
 that we covered excluding org.apache.kafka.common and org.apache.kafka.connect 
and thought that covered the classes we needed to ensure were loaded from the 
system classloader, but SinkTask also uses a class from 
org.apache.kafka.clients.

[~kkonstantine] seem like an accurate assessment? If so, any connector that 
overrides flush() or precommit() could potentially run into issues (though I'm 
not sure if the issue is ultimately caused by having a separate method call 
there that uses the type). I think the code in the BigQuery connector is 
actually unnecessary -- just flushing the data is sufficient since the same 
offsets will be used and do not need to be set on the context object explicitly 
-- but this is also a bug in Connect itself. Looks like the fix is easy and 
should probably be backported back to the first release branch with classloader 
isolation.

> ClassCastException in BigQuery connector
> 
>
> Key: KAFKA-6148
> URL: https://issues.apache.org/jira/browse/KAFKA-6148
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Eugene Burd
>Priority: Major
>
> I am trying to run a com.wepay.kafka.connect.bigquery.BigQuerySinkConnector 
> connector, but getting the following exception.  
> [2017-10-30 21:48:49,007] ERROR WorkerSinkTask{id=bigquery-connector-log-0} 
> Offset commit failed, rewinding to last committed offsets 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:311)
> java.lang.ClassCastException: 
> org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to 
> org.apache.kafka.clients.consumer.OffsetAndMetadata
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107)
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96)
>   at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> [2017-10-30 21:48:49,012] ERROR Commit of 
> WorkerSinkTask{id=bigquery-connector-log-0} offsets threw an unexpected 
> exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:205)
> java.lang.ClassCastException: 
> org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to 
> org.apache.kafka.clients.consumer.OffsetAndMetadata
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107)
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96)
>   at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> I have checked the version number of kafka client in the plug in and kafka 
> connect itself and they are the same.  
> - kafka-clients-0.11.0.0.jar matches
> I am still suspecting a type of vers

[jira] [Created] (KAFKA-6170) Add the AdminClient in Streams' KafkaClientSupplier

2017-11-03 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6170:


 Summary: Add the AdminClient in Streams' KafkaClientSupplier
 Key: KAFKA-6170
 URL: https://issues.apache.org/jira/browse/KAFKA-6170
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Guozhang Wang
Assignee: Guozhang Wang
Priority: Major


We will add Java AdminClient to Kafka Streams, in order to replace the internal 
StreamsKafkaClient. More details can be found in KIP-220 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6168) Connect Schema comparison is slow for large schemas

2017-11-03 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238561#comment-16238561
 ] 

Ted Yu edited comment on KAFKA-6168 at 11/3/17 11:35 PM:
-

bq. considering having each instance precompute and cache a string or byte[] 
representation of all fields

Should this be done for all fields (quite a few) ?
Or just selected fields such as keySchema ?


was (Author: yuzhih...@gmail.com):
Can I work on this ?

> Connect Schema comparison is slow for large schemas
> ---
>
> Key: KAFKA-6168
> URL: https://issues.apache.org/jira/browse/KAFKA-6168
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Priority: Critical
> Attachments: 6168.v1.txt
>
>
> The {{ConnectSchema}} implementation computes the hash code every time its 
> needed, and {{equals(Object)}} is a deep equality check. This extra work can 
> be expensive for large schemas, especially in code like the {{AvroConverter}} 
> (or rather {{AvroData}} in the converter) that uses instances as keys in a 
> hash map that then requires significant use of {{hashCode}} and {{equals}}.
> The {{ConnectSchema}} is an immutable object and should at a minimum 
> precompute the hash code. Also, the order that the fields are compared in 
> {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} 
> field is one of the _last_ fields to be checked). Finally, it might be worth 
> considering having each instance precompute and cache a string or byte[] 
> representation of all fields that can be used for faster equality checking.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6168) Connect Schema comparison is slow for large schemas

2017-11-03 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6168:
--
Attachment: 6168.v1.txt

> Connect Schema comparison is slow for large schemas
> ---
>
> Key: KAFKA-6168
> URL: https://issues.apache.org/jira/browse/KAFKA-6168
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Priority: Critical
> Attachments: 6168.v1.txt
>
>
> The {{ConnectSchema}} implementation computes the hash code every time its 
> needed, and {{equals(Object)}} is a deep equality check. This extra work can 
> be expensive for large schemas, especially in code like the {{AvroConverter}} 
> (or rather {{AvroData}} in the converter) that uses instances as keys in a 
> hash map that then requires significant use of {{hashCode}} and {{equals}}.
> The {{ConnectSchema}} is an immutable object and should at a minimum 
> precompute the hash code. Also, the order that the fields are compared in 
> {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} 
> field is one of the _last_ fields to be checked). Finally, it might be worth 
> considering having each instance precompute and cache a string or byte[] 
> representation of all fields that can be used for faster equality checking.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6148) ClassCastException in BigQuery connector

2017-11-03 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-6148:
-
Summary: ClassCastException in BigQuery connector  (was: ERROR Commit of 
WorkerSinkTask{id=bigquery-connector-log-0} offsets threw an unexpected 
exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:205) 
java.lang.ClassCastException: 
org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast)

> ClassCastException in BigQuery connector
> 
>
> Key: KAFKA-6148
> URL: https://issues.apache.org/jira/browse/KAFKA-6148
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Eugene Burd
>Priority: Major
>
> I am trying to run a com.wepay.kafka.connect.bigquery.BigQuerySinkConnector 
> connector, but getting the following exception.  
> [2017-10-30 21:48:49,007] ERROR WorkerSinkTask{id=bigquery-connector-log-0} 
> Offset commit failed, rewinding to last committed offsets 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:311)
> java.lang.ClassCastException: 
> org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to 
> org.apache.kafka.clients.consumer.OffsetAndMetadata
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107)
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96)
>   at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> [2017-10-30 21:48:49,012] ERROR Commit of 
> WorkerSinkTask{id=bigquery-connector-log-0} offsets threw an unexpected 
> exception:  (org.apache.kafka.connect.runtime.WorkerSinkTask:205)
> java.lang.ClassCastException: 
> org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to 
> org.apache.kafka.clients.consumer.OffsetAndMetadata
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107)
>   at 
> com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96)
>   at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> I have checked the version number of kafka client in the plug in and kafka 
> connect itself and they are the same.  
> - kafka-clients-0.11.0.0.jar matches
> I am still suspecting a type of versioning issue.  Do you have any advice? 
> Thanks. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6168) Connect Schema comparison is slow for large schemas

2017-11-03 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238561#comment-16238561
 ] 

Ted Yu commented on KAFKA-6168:
---

Can I work on this ?

> Connect Schema comparison is slow for large schemas
> ---
>
> Key: KAFKA-6168
> URL: https://issues.apache.org/jira/browse/KAFKA-6168
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Priority: Critical
>
> The {{ConnectSchema}} implementation computes the hash code every time its 
> needed, and {{equals(Object)}} is a deep equality check. This extra work can 
> be expensive for large schemas, especially in code like the {{AvroConverter}} 
> (or rather {{AvroData}} in the converter) that uses instances as keys in a 
> hash map that then requires significant use of {{hashCode}} and {{equals}}.
> The {{ConnectSchema}} is an immutable object and should at a minimum 
> precompute the hash code. Also, the order that the fields are compared in 
> {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} 
> field is one of the _last_ fields to be checked). Finally, it might be worth 
> considering having each instance precompute and cache a string or byte[] 
> representation of all fields that can be used for faster equality checking.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6166) Streams configuration requires consumer. and producer. in order to be read

2017-11-03 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238531#comment-16238531
 ] 

Guozhang Wang commented on KAFKA-6166:
--

[~wushujames] asked about this issue as well, so just copy-pasting my replies 
here:

{code}
Thanks for bringing this up.

The original design is to let users apply the prefix ONLY if they want to 
override for the specific client. For example, for `metric.reporters` or 
`interceptor.classes` which are defined for both producer and consumer (and 
moving forward we are adding admin client as well), without the prefix they 
will be applied to all embedded clients; and if users want to have different 
config value overrides for different clients, they can use the prefix.

So is you define for `metrics.reporter` it should be applied to both producer 
and consumer (is it not the case for you? Please confirm).

For custom config k-v pairs, I think we did not discuss about this in-depth. 
But our currently implementation is, indeed, restricting to producer / consumer 
client prop definition only, for example.

getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());
props.putAll(originalsWithPrefix(prefix));

so if you do not add the prefix to your custom k-v pairs, it will be skipped. 
Thinking about it a bit, I think we should probably do what you were inclining 
to, i.e. to define custom k-v pairs also globally unless overridden by prefix. 
{code}


> Streams configuration requires consumer. and producer. in order to be read
> --
>
> Key: KAFKA-6166
> URL: https://issues.apache.org/jira/browse/KAFKA-6166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: Kafka 0.11.0.0
> JDK 1.8
> CoreOS
>Reporter: Justin Manchester
>Priority: Minor
>
> Problem:
> In previous release you could specify a custom metrics reporter like so:
> Properties config = new Properties(); 
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); 
> config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
> "com.mycompany.MetricReporter"); 
> config.put("custom-key-for-metric-reporter", "value");
> From 0.11.0.0 onwards this is no longer possible, as you have to specify 
> consumer.custom-key-for-metric-reporter or 
> producer.custom-key-for-metric-reporter otherwise it's stripped out of the 
> configuration.
> So, if you wish to use a metrics reporter and to collect producer and 
> consumer metrics, as well as kafka-streams metrics, that you would need to 
> specify 3 distinct configs:
> 1) consumer.custom-key-for-metric-reporter 
> 2) producer.custom-key-for-metric-reporter 
> 3) custom-key-for-metric-reporter
> This appears to be a regression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6169) Kafka Log should reject negative timestamps

2017-11-03 Thread Ryan P (JIRA)
Ryan P created KAFKA-6169:
-

 Summary: Kafka Log should reject negative timestamps 
 Key: KAFKA-6169
 URL: https://issues.apache.org/jira/browse/KAFKA-6169
 Project: Kafka
  Issue Type: Bug
Reporter: Ryan P
Priority: Minor


Currently it would appear we rely on the Java Producer to prevent negative 
timestamps from being appended to the log. The broker should also include a 
validation which prevents poorly written third-party clients from doing the 
same. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6168) Connect Schema comparison is slow for large schemas

2017-11-03 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-6168:


 Summary: Connect Schema comparison is slow for large schemas
 Key: KAFKA-6168
 URL: https://issues.apache.org/jira/browse/KAFKA-6168
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Randall Hauch
Priority: Critical


The {{ConnectSchema}} implementation computes the hash code every time its 
needed, and {{equals(Object)}} is a deep equality check. This extra work can be 
expensive for large schemas, especially in code like the {{AvroConverter}} (or 
rather {{AvroData}} in the converter) that uses instances as keys in a hash map 
that then requires significant use of {{hashCode}} and {{equals}}.

The {{ConnectSchema}} is an immutable object and should at a minimum precompute 
the hash code. Also, the order that the fields are compared in {{equals(...)}} 
should use the cheapest comparisons first (e.g., the {{name}} field is one of 
the _last_ fields to be checked). Finally, it might be worth considering having 
each instance precompute and cache a string or byte[] representation of all 
fields that can be used for faster equality checking.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)

2017-11-03 Thread Drew Kutcharian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238126#comment-16238126
 ] 

Drew Kutcharian commented on KAFKA-4682:


This just happened to us and I just stumbled upon this JIRA while trying to 
figure out the cause. A few questions:

1. Aren't consumer offset topics compacted? Shouldn't at least the last entry 
stay on disk after cleanup?

2.  Considering that they are compacted, what is the real concern with 
workaround 2 in the description: "2. Turn the value of 
offsets.retention.minutes up really really high"?

3. As a workaround, would it make sense to set {{offsets.retention.ms}} to the 
same value as {{logs.retention.ms}} and {{auto.offset.reset}} to {{earliest}}? 
That way consumers and logs would "reset" the same time?

4. Is there a timeline for the release of KIP-211?


> Committed offsets should not be deleted if a consumer is still active 
> (KIP-211)
> ---
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: kip
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6163) Broker should fail fast on startup if an error occurs while loading logs

2017-11-03 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237984#comment-16237984
 ] 

Xavier Léauté commented on KAFKA-6163:
--

[~ijuma] in this case it was a different fatal error {{java.lang.InternalError: 
a fault occurred in an unsafe memory access operation}}
You probably don't want to try to recover from that.

> Broker should fail fast on startup if an error occurs while loading logs
> 
>
> Key: KAFKA-6163
> URL: https://issues.apache.org/jira/browse/KAFKA-6163
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Xavier Léauté
>Priority: Normal
>
> If the broker fails to load one of the logs during startup, we currently 
> don't fail fast. The {{LogManager}} will log an error and initiate the 
> shutdown sequence, but continue loading all the remaining sequence before 
> shutting down.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6164) ClientQuotaManager threads prevent shutdown when encountering an error loading logs

2017-11-03 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237977#comment-16237977
 ] 

Xavier Léauté commented on KAFKA-6164:
--

[~ted_yu] that's certainly an option assuming we don't care about orderly 
shutdown of the quota manager. It would probably be nicer though to properly 
manage the quota manager lifecycle.

> ClientQuotaManager threads prevent shutdown when encountering an error 
> loading logs
> ---
>
> Key: KAFKA-6164
> URL: https://issues.apache.org/jira/browse/KAFKA-6164
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Xavier Léauté
>Priority: Major
>
> While diagnosing KAFKA-6163, we noticed that when the broker initiates a 
> shutdown sequence in response to an error loading the logs, the process never 
> exits. The JVM appears to be waiting indefinitely for several non-deamon 
> threads to terminate.
> The threads in question are {{ThrottledRequestReaper-Request}}, 
> {{ThrottledRequestReaper-Produce}}, and {{ThrottledRequestReaper-Fetch}}, so 
> it appears we don't properly shutdown {{ClientQuotaManager}} in this 
> situation.
> QuotaManager shutdown is currently handled by KafkaApis, however KafkaApis 
> will never be instantiated in the first place if we encounter an error 
> loading the logs, so quotamangers are left dangling in that case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6167) Timestamp on streams directory contains a colon, which is an illegal character

2017-11-03 Thread Adrian McCague (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237875#comment-16237875
 ] 

Adrian McCague commented on KAFKA-6167:
---

`.` feels quite standard here

> Timestamp on streams directory contains a colon, which is an illegal character
> --
>
> Key: KAFKA-6167
> URL: https://issues.apache.org/jira/browse/KAFKA-6167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: AK 1.0.0
> Kubernetes
> CoreOS
> JDK 1.8
> Windows
>Reporter: Justin Manchester
>Priority: Normal
>
> Problem:
> Development on Windows, which is not fully supported, however still a bug 
> that should be corrected.
> It looks like a timestamp was added to the streams directory using a colon as 
> separator. I believe this is an illegal character and potentially the cause 
> for the exception below.
> Error Stack:
> 2017-11-02 16:06:41 ERROR 
> [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1]
>  org.apache.kafka.streams.processor.internals.AssignedTasks:301 - 
> stream-thread 
> [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1]
>  Failed to process stream task 0_0 due to the following error: 
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=input-a_1, 
> partition=0, offset=0 
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
>  [kafka-streams-1.0.0.jar:?] 
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> opening store KSTREAM-JOINTHIS-04-store:150962400 at location 
> C:\Users\ADRIAN~1.MCC\AppData\Local\Temp\kafka3548813472740086814\StreamDeduplicatorAcceptanceTest1\0_0\KSTREAM-JOINTHIS-04-store\KSTREAM-JOINTHIS-04-store:150962400
>  
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174)
>  ~[kafka-streams-1.0.0.jar:?] 
> at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) 
> ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:89)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:12

[jira] [Comment Edited] (KAFKA-6167) Timestamp on streams directory contains a colon, which is an illegal character

2017-11-03 Thread Adrian McCague (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237875#comment-16237875
 ] 

Adrian McCague edited comment on KAFKA-6167 at 11/3/17 4:39 PM:


{code:java}.{code} feels quite standard here


was (Author: amccague):
`.` feels quite standard here

> Timestamp on streams directory contains a colon, which is an illegal character
> --
>
> Key: KAFKA-6167
> URL: https://issues.apache.org/jira/browse/KAFKA-6167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: AK 1.0.0
> Kubernetes
> CoreOS
> JDK 1.8
> Windows
>Reporter: Justin Manchester
>Priority: Normal
>
> Problem:
> Development on Windows, which is not fully supported, however still a bug 
> that should be corrected.
> It looks like a timestamp was added to the streams directory using a colon as 
> separator. I believe this is an illegal character and potentially the cause 
> for the exception below.
> Error Stack:
> 2017-11-02 16:06:41 ERROR 
> [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1]
>  org.apache.kafka.streams.processor.internals.AssignedTasks:301 - 
> stream-thread 
> [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1]
>  Failed to process stream task 0_0 due to the following error: 
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=input-a_1, 
> partition=0, offset=0 
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
>  [kafka-streams-1.0.0.jar:?] 
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> opening store KSTREAM-JOINTHIS-04-store:150962400 at location 
> C:\Users\ADRIAN~1.MCC\AppData\Local\Temp\kafka3548813472740086814\StreamDeduplicatorAcceptanceTest1\0_0\KSTREAM-JOINTHIS-04-store\KSTREAM-JOINTHIS-04-store:150962400
>  
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174)
>  ~[kafka-streams-1.0.0.jar:?] 
> at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) 
> ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:89)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>  ~[kafk

[jira] [Commented] (KAFKA-6167) Timestamp on streams directory contains a colon, which is an illegal character

2017-11-03 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237856#comment-16237856
 ] 

Ted Yu commented on KAFKA-6167:
---

Any suggestion for a replacement ? Semicolon ?

> Timestamp on streams directory contains a colon, which is an illegal character
> --
>
> Key: KAFKA-6167
> URL: https://issues.apache.org/jira/browse/KAFKA-6167
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: AK 1.0.0
> Kubernetes
> CoreOS
> JDK 1.8
> Windows
>Reporter: Justin Manchester
>Priority: Normal
>
> Problem:
> Development on Windows, which is not fully supported, however still a bug 
> that should be corrected.
> It looks like a timestamp was added to the streams directory using a colon as 
> separator. I believe this is an illegal character and potentially the cause 
> for the exception below.
> Error Stack:
> 2017-11-02 16:06:41 ERROR 
> [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1]
>  org.apache.kafka.streams.processor.internals.AssignedTasks:301 - 
> stream-thread 
> [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1]
>  Failed to process stream task 0_0 due to the following error: 
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=input-a_1, 
> partition=0, offset=0 
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>  [kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
>  [kafka-streams-1.0.0.jar:?] 
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> opening store KSTREAM-JOINTHIS-04-store:150962400 at location 
> C:\Users\ADRIAN~1.MCC\AppData\Local\Temp\kafka3548813472740086814\StreamDeduplicatorAcceptanceTest1\0_0\KSTREAM-JOINTHIS-04-store\KSTREAM-JOINTHIS-04-store:150962400
>  
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174)
>  ~[kafka-streams-1.0.0.jar:?] 
> at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) 
> ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:89)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>  ~[kafka-streams-1.0.0.jar:?] 
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:1

[jira] [Commented] (KAFKA-6166) Streams configuration requires consumer. and producer. in order to be read

2017-11-03 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237846#comment-16237846
 ] 

Ted Yu commented on KAFKA-6166:
---

How about populating the first two configs with the value for 
custom-key-for-metric-reporter, by default ?

> Streams configuration requires consumer. and producer. in order to be read
> --
>
> Key: KAFKA-6166
> URL: https://issues.apache.org/jira/browse/KAFKA-6166
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: Kafka 0.11.0.0
> JDK 1.8
> CoreOS
>Reporter: Justin Manchester
>Priority: Minor
>
> Problem:
> In previous release you could specify a custom metrics reporter like so:
> Properties config = new Properties(); 
> config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); 
> config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
> "com.mycompany.MetricReporter"); 
> config.put("custom-key-for-metric-reporter", "value");
> From 0.11.0.0 onwards this is no longer possible, as you have to specify 
> consumer.custom-key-for-metric-reporter or 
> producer.custom-key-for-metric-reporter otherwise it's stripped out of the 
> configuration.
> So, if you wish to use a metrics reporter and to collect producer and 
> consumer metrics, as well as kafka-streams metrics, that you would need to 
> specify 3 distinct configs:
> 1) consumer.custom-key-for-metric-reporter 
> 2) producer.custom-key-for-metric-reporter 
> 3) custom-key-for-metric-reporter
> This appears to be a regression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6167) Timestamp on streams directory contains a colon, which is an illegal character

2017-11-03 Thread Justin Manchester (JIRA)
Justin Manchester created KAFKA-6167:


 Summary: Timestamp on streams directory contains a colon, which is 
an illegal character
 Key: KAFKA-6167
 URL: https://issues.apache.org/jira/browse/KAFKA-6167
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
 Environment: AK 1.0.0
Kubernetes
CoreOS
JDK 1.8
Windows
Reporter: Justin Manchester
Priority: Normal


Problem:

Development on Windows, which is not fully supported, however still a bug that 
should be corrected.

It looks like a timestamp was added to the streams directory using a colon as 
separator. I believe this is an illegal character and potentially the cause for 
the exception below.

Error Stack:

2017-11-02 16:06:41 ERROR 
[StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1]
 org.apache.kafka.streams.processor.internals.AssignedTasks:301 - stream-thread 
[StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1]
 Failed to process stream task 0_0 due to the following error: 
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=0_0, processor=KSTREAM-SOURCE-00, topic=input-a_1, partition=0, 
offset=0 
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
 [kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
 [kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
 [kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
 [kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
 [kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
 [kafka-streams-1.0.0.jar:?] 
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
opening store KSTREAM-JOINTHIS-04-store:150962400 at location 
C:\Users\ADRIAN~1.MCC\AppData\Local\Temp\kafka3548813472740086814\StreamDeduplicatorAcceptanceTest1\0_0\KSTREAM-JOINTHIS-04-store\KSTREAM-JOINTHIS-04-store:150962400
 
at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174)
 ~[kafka-streams-1.0.0.jar:?] 
at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) 
~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:89)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
 ~[kafka-streams-1.0.0.jar:?] 
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
 ~[kafka-streams-1.0.0.jar:?] 
... 6 more 
Caused by: org.rocksdb.Rock

[jira] [Commented] (KAFKA-6161) Introduce new serdes interfaces with empty configure() and close()

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237785#comment-16237785
 ] 

ASF GitHub Bot commented on KAFKA-6161:
---

GitHub user evis opened a pull request:

https://github.com/apache/kafka/pull/4175

KAFKA-6161 add base classes for (De)Serializers with empty conf methods

All (de)serializers, which have empty configure() and/or close() methods, 
are now inherit NoConf(De)Serializer. Also, such classes are created for 
extended (de)serializers.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/evis/kafka 
KAFKA-6161-serde-empty-conf-close-methods

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4175.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4175


commit 297115a53f6ea6c1bfd2383c0f73f011cb94c505
Author: Evgeny Veretennikov 
Date:   2017-11-03T15:38:18Z

KAFKA-6161 add base classes for (De)Serializers with empty conf methods

All (de)serializers, which have empty configure() and/or close()
methods, are now inherit NoConf(De)Serializer. Also, such classes are
created for extended (de)serializers.




> Introduce new serdes interfaces with empty configure() and close()
> --
>
> Key: KAFKA-6161
> URL: https://issues.apache.org/jira/browse/KAFKA-6161
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Evgeny Veretennikov
>Assignee: Evgeny Veretennikov
>Priority: Normal
>
> {{Serializer}}, {{Deserializer}} and {{Serde}} interfaces have methods 
> {{configure()}} and {{close()}}. Pretty often one want to leave these methods 
> empty. For example, a lot of serializers inside 
> {{org.apache.kafka.common.serialization}} package have these methods empty:
> {code}
> @Override
> public void configure(Map configs, boolean isKey) {
> // nothing to do
> }
> @Override
> public void close() {
> // nothing to do
> }
> {code}
> To avoid such boilerplate, we may create new interfaces (like 
> {{UnconfiguredSerializer}}), in which we will define these methods empty.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6159) Link to upgrade docs in 1.0.0 release notes is broken

2017-11-03 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6159.
--
Resolution: Fixed

> Link to upgrade docs in 1.0.0 release notes is broken
> -
>
> Key: KAFKA-6159
> URL: https://issues.apache.org/jira/browse/KAFKA-6159
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 1.0.0
>Reporter: Martin Schröder
>Assignee: Guozhang Wang
>  Labels: release-notes
>
> The release notes for 1.0.0 
> (https://dist.apache.org/repos/dist/release/kafka/1.0.0/RELEASE_NOTES.html) 
> point to http://kafka.apache.org/100/documentation.html#upgrade for "upgrade 
> documentation", but that gives a 404.
> Maybe you mean http://kafka.apache.org/documentation.html#upgrade ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237567#comment-16237567
 ] 

kaushik srinivas commented on KAFKA-6165:
-

Update for my previous comment,

Once the brokers went down and recovered,
i see below exception in 2 of the brokers,

[2017-11-03 08:28:00,772] WARN [ReplicaFetcherThread-0-2], Error in fetch 
kafka.server.ReplicaFetcherThread$FetchRequest@7da5bee5 
(kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 2 was disconnected before the response was 
read
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
at scala.Option.foreach(Option.scala:257)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
at 
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
at 
kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
at 
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
at 
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
at 
kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
at 
kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> s

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237563#comment-16237563
 ] 

Ismael Juma commented on KAFKA-6165:


The memory map is failing. Maybe you're running into a file descriptors limit?

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>   at kafka.log.Log.roll(Log.scala:771)
>   at kafka.log.Log.maybeRoll(Log.scala:742)
>   at kafka.log.Log.append(Log.scala:405)
>   ... 22 more
>

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237555#comment-16237555
 ] 

kaushik srinivas commented on KAFKA-6165:
-

Tried with 12GB of Heap space.
Observed kafka brokers crashing again with,

[2017-11-03 08:02:12,424] FATAL [ReplicaFetcherThread-0-0], Disk error while 
replicating data for store_sales-15 (kafka.server.ReplicaFetcherThread)
kafka.common.KafkaStorageException: I/O exception in append to log 
'store_sales-15'
at kafka.log.Log.append(Log.scala:349)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130)
at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:159)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:141)
at scala.Option.foreach(Option.scala:257)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:141)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:138)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:138)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:136)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
Caused by: java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
at kafka.log.AbstractIndex.(AbstractIndex.scala:61)
at kafka.log.TimeIndex.(TimeIndex.scala:55)
at kafka.log.LogSegment.(LogSegment.scala:68)
at kafka.log.Log.roll(Log.scala:776)
at kafka.log.Log.maybeRoll(Log.scala:742)
at kafka.log.Log.append(Log.scala:405)
... 16 more
Caused by: java.lang.OutOfMemoryError: Map failed
at sun.nio.ch.FileChannelImpl.map0(Native Method)
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937)

Observing 300k messages/sec on each broker (3 brokers) at the time of broker 
crash.

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>

[jira] [Created] (KAFKA-6166) Streams configuration requires consumer. and producer. in order to be read

2017-11-03 Thread Justin Manchester (JIRA)
Justin Manchester created KAFKA-6166:


 Summary: Streams configuration requires consumer. and producer. in 
order to be read
 Key: KAFKA-6166
 URL: https://issues.apache.org/jira/browse/KAFKA-6166
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.0
 Environment: Kafka 0.11.0.0
JDK 1.8
CoreOS
Reporter: Justin Manchester
Priority: Minor


Problem:

In previous release you could specify a custom metrics reporter like so:

Properties config = new Properties(); 
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); 
config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"com.mycompany.MetricReporter"); 
config.put("custom-key-for-metric-reporter", "value");

>From 0.11.0.0 onwards this is no longer possible, as you have to specify 
>consumer.custom-key-for-metric-reporter or 
>producer.custom-key-for-metric-reporter otherwise it's stripped out of the 
>configuration.

So, if you wish to use a metrics reporter and to collect producer and consumer 
metrics, as well as kafka-streams metrics, that you would need to specify 3 
distinct configs:

1) consumer.custom-key-for-metric-reporter 
2) producer.custom-key-for-metric-reporter 
3) custom-key-for-metric-reporter

This appears to be a regression.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6060) Add workload generation capabilities to Trogdor

2017-11-03 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-6060.
---
   Resolution: Fixed
Fix Version/s: 1.1.0

Issue resolved by pull request 4073
https://github.com/apache/kafka/pull/4073

> Add workload generation capabilities to Trogdor
> ---
>
> Key: KAFKA-6060
> URL: https://issues.apache.org/jira/browse/KAFKA-6060
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
> Fix For: 1.1.0
>
>
> Add workload generation capabilities to Trogdor



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6060) Add workload generation capabilities to Trogdor

2017-11-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237359#comment-16237359
 ] 

ASF GitHub Bot commented on KAFKA-6060:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4073


> Add workload generation capabilities to Trogdor
> ---
>
> Key: KAFKA-6060
> URL: https://issues.apache.org/jira/browse/KAFKA-6060
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> Add workload generation capabilities to Trogdor



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237358#comment-16237358
 ] 

kaushik srinivas commented on KAFKA-6165:
-

Sure will try to reduce the heap size to 12gb.
Initially the config was 8gb heap.
But then observed outOfMemory issues more frequently.
Actually it was consuming around 10gb heap, that was the reason heap was 
increased to 16gb.

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>  

[jira] [Commented] (KAFKA-6110) Warning when running the broker on Windows

2017-11-03 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237236#comment-16237236
 ] 

huxihx commented on KAFKA-6110:
---

Seems it's a duplicate of 
[KAFKA-6156|https://issues.apache.org/jira/browse/KAFKA-6156].

> Warning when running the broker on Windows
> --
>
> Key: KAFKA-6110
> URL: https://issues.apache.org/jira/browse/KAFKA-6110
> Project: Kafka
>  Issue Type: Bug
> Environment: Windows 10 VM
>Reporter: Vahid Hashemian
>Priority: Minor
>
> *This issue exists in 1.0.0-RC2.*
> The following warning appears in the broker log at startup:
> {code}
> [2017-10-23 15:29:49,370] WARN Error processing 
> kafka.log:type=LogManager,name=LogDirectoryOffline,logDirectory=C:\tmp\kafka-logs
>  (com.yammer.metrics.reporting.JmxReporter)
> javax.management.MalformedObjectNameException: Invalid character ':' in value 
> part of property
> at javax.management.ObjectName.construct(ObjectName.java:618)
> at javax.management.ObjectName.(ObjectName.java:1382)
> at 
> com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)
> at 
> com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)
> at 
> com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)
> at 
> com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79)
> at 
> kafka.metrics.KafkaMetricsGroup$class.newGauge(KafkaMetricsGroup.scala:80)
> at kafka.log.LogManager.newGauge(LogManager.scala:50)
> at kafka.log.LogManager$$anonfun$6.apply(LogManager.scala:117)
> at kafka.log.LogManager$$anonfun$6.apply(LogManager.scala:116)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at kafka.log.LogManager.(LogManager.scala:116)
> at kafka.log.LogManager$.apply(LogManager.scala:799)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:222)
> at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
> at kafka.Kafka$.main(Kafka.scala:92)
> at kafka.Kafka.main(Kafka.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3049) VerifiableProperties does not respect 'default' properties of underlying java.util.Properties instance

2017-11-03 Thread Jeffrey Olchovy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237209#comment-16237209
 ] 

Jeffrey Olchovy commented on KAFKA-3049:


Saw this was updated. I believe the existing PR can still merge cleanly, but 
let me know if you want an updated patch / PR. 

> VerifiableProperties does not respect 'default' properties of underlying 
> java.util.Properties instance
> --
>
> Key: KAFKA-3049
> URL: https://issues.apache.org/jira/browse/KAFKA-3049
> Project: Kafka
>  Issue Type: Bug
>  Components: config, core
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
>Reporter: Jeffrey Olchovy
>Priority: Minor
>  Labels: easyfix
>
> When retrieving values from the underlying {{Properties}} instance with the 
> {{getString}}, {{get}}, etc. methods of a {{VerifiableProperties}} 
> instance, a call to the underlying {{Properties.containsKey}} method is made. 
> This method will not search the default properties values of the instance, 
> rendering any default properties defined on the {{Properties}} instance 
> useless.
> A practical example is shown below:
> {noformat}
> // suppose we have a base, default set of properties to supply to all 
> consumer groups
> val baseProps = new Properties
> baseProps.setProperty("zookeeper.connect", "localhost:2181/kafka")
> baseProps.setProperty("zookeeper.connection.timeout.ms", "2000")
> // additional we have discrete properties instances for each consumer group 
> that utilize these defaults
> val groupProps1 = new Properties(baseProps)
> groupProps1.setProperty("group.id", "test-1")
> val groupProps2 = new Properties(baseProps)
> groupProps2.setProperty("group.id", "test-2")
> // when attempting to create an instance of a high-level Consumer with the 
> above properties an exception will be thrown due to the aforementioned 
> problem description
> java.lang.IllegalArgumentException: requirement failed: Missing required 
> property 'zookeeper.connect'
> at scala.Predef$.require(Predef.scala:233)
> at 
> kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177)
> at kafka.utils.ZKConfig.(ZkUtils.scala:879)
> at kafka.consumer.ConsumerConfig.(ConsumerConfig.scala:100)
> at kafka.consumer.ConsumerConfig.(ConsumerConfig.scala:104)
> // however, the groupProps instances will return the correct value for 
> "zookeeper.connect" when using `Properties.getProperty`
> assert(groupProps1.getProperty("zookeeper.connect", "localhost:2181/kafka"))
> assert(groupProps2.getProperty("zookeeper.connect", "localhost:2181/kafka"))
> {noformat}
> I believe it is worthwhile for Kafka to respect the default properties 
> feature of {{java.util.Properties}}, and further, that Kafka should 
> discourage the use of the methods on {{Properties}} that are inherited from 
> {{Hashtable}} (e.g. {{containsKey}}). One can argue that 
> {{VerifiableProperties}} is not 'correct' due to this behavior, but a user 
> can always workaround this by creating discrete instances of {{Properties}} 
> with a set of default properties manually added to each instance. However, 
> this is inconvenient and may only encourage the use of the discouraged 
> {{Hashtable}} methods like {{putAll}}.
> Two proposed solutions follow:
> 1. Do not delegate to the {{Properties.containsKey}} method during the 
> invocation of {{VerifiableProperties.containsKey}}. One can use a null check 
> in conjunction with {{getProperty}} in its place.
> 2. Treat the underlying {{Properties}} instance as immutable and assign the 
> result of {{Properties.stringPropertyNames()}} to a member of 
> {{VerifiableProperties}}. One can check this set of known, available property 
> names, which respects the optional default properties, when 
> {{VerifiableProperties.containsKey}} is invoked.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237205#comment-16237205
 ] 

huxihx commented on KAFKA-6165:
---

Could you try to decrease the heap size a little bit? 16GB seems to be quite 
large. 

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>   at kafka.log.Log.roll(Log.scala:771)
>   at kafka.log.Log.maybeRoll(Log.scala:742)
>   at kafka.log.Log.append(Log.scala:405)
>   ... 22 more
> Ca

[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.

2017-11-03 Thread kaushik srinivas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237189#comment-16237189
 ] 

kaushik srinivas commented on KAFKA-6165:
-

java version "1.8.0_112"
Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)

> Kafka Brokers goes down with outOfMemoryError.
> --
>
> Key: KAFKA-6165
> URL: https://issues.apache.org/jira/browse/KAFKA-6165
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
> Environment: DCOS cluster with 4 agent nodes and 3 masters.
> agent machine config :
> RAM : 384 GB
> DISK : 4TB
>Reporter: kaushik srinivas
>Priority: Major
> Attachments: kafka_config.txt, stderr_broker1.txt, 
> stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt
>
>
> Performance testing kafka with end to end pipe lines of,
> Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1
> Kafka Data Producer -> kafka -> flume -> hdfs -- stream2
> stream1 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> stream2 kafka configs :
> No of topics : 10
> No of partitions : 20 for all the topics
> Some important Kafka Configuration :
> "BROKER_MEM": "32768"(32GB)
> "BROKER_JAVA_HEAP": "16384"(16GB)
> "BROKER_COUNT": "3"
> "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB)
> "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB)
> "KAFKA_NUM_PARTITIONS": "20"
> "BROKER_DISK_SIZE": "5000" (5GB)
> "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB)
> "KAFKA_LOG_RETENTION_BYTES": "50"(5GB)
> Data Producer to kafka Throughput:
> message rate : 5 lakhs messages/sec approx across all the 3 brokers and 
> topics/partitions.
> message size : approx 300 to 400 bytes.
> Issues observed with this configs:
> Issue 1:
> stack trace:
> [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to 
> unrecoverable I/O error while handling produce request:  
> (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log 
> 'store_sales-16'
>   at kafka.log.Log.append(Log.scala:349)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
>   at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
>   at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
>   at 
> kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
>   at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
>   at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
>   at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:78)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Map failed
>   at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116)
>   at 
> kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at 
> kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159)
>   at kafka.log.Log.roll(Log.scala:771)
>   at kafka.log.Log.maybeRoll(Log.scal