[jira] [Commented] (KAFKA-6161) Introduce new serdes interfaces with empty configure() and close()
[ https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238790#comment-16238790 ] Kamal Chandraprakash commented on KAFKA-6161: - My opinion is that we can do these cleanups when Kafka source code migrates to Java 8. In Java 8, we can define `default` method in the interface. Now, with your current patch, we cannot extend any more classes in `Serde` / `Serializer` and `DeSerializer`. > Introduce new serdes interfaces with empty configure() and close() > -- > > Key: KAFKA-6161 > URL: https://issues.apache.org/jira/browse/KAFKA-6161 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Evgeny Veretennikov >Assignee: Evgeny Veretennikov >Priority: Normal > > {{Serializer}}, {{Deserializer}} and {{Serde}} interfaces have methods > {{configure()}} and {{close()}}. Pretty often one want to leave these methods > empty. For example, a lot of serializers inside > {{org.apache.kafka.common.serialization}} package have these methods empty: > {code} > @Override > public void configure(Map configs, boolean isKey) { > // nothing to do > } > @Override > public void close() { > // nothing to do > } > {code} > To avoid such boilerplate, we may create new interfaces (like > {{UnconfiguredSerializer}}), in which we will define these methods empty. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()
[ https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5863: -- Description: Here is the call chain: {code} RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); {code} In httpRequest(): {code} } else if (responseCode >= 200 && responseCode < 300) { InputStream is = connection.getInputStream(); T result = JSON_SERDE.readValue(is, responseFormat); {code} For readValue(): {code} public T readValue(InputStream src, TypeReference valueTypeRef) throws IOException, JsonParseException, JsonMappingException { return (T) _readMapAndClose(_jsonFactory.createParser(src), _typeFactory.constructType(valueTypeRef)); {code} Then there would be NPE in constructType(): {code} public JavaType constructType(TypeReference typeRef) { // 19-Oct-2015, tatu: Simpler variant like so should work return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); {code} was: Here is the call chain: {code} RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); {code} In httpRequest(): {code} } else if (responseCode >= 200 && responseCode < 300) { InputStream is = connection.getInputStream(); T result = JSON_SERDE.readValue(is, responseFormat); {code} For readValue(): {code} public T readValue(InputStream src, TypeReference valueTypeRef) throws IOException, JsonParseException, JsonMappingException { return (T) _readMapAndClose(_jsonFactory.createParser(src), _typeFactory.constructType(valueTypeRef)); {code} Then there would be NPE in constructType(): {code} public JavaType constructType(TypeReference typeRef) { // 19-Oct-2015, tatu: Simpler variant like so should work return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); {code} > Potential null dereference in DistributedHerder#reconfigureConnector() > -- > > Key: KAFKA-5863 > URL: https://issues.apache.org/jira/browse/KAFKA-5863 > Project: Kafka > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > Here is the call chain: > {code} > RestServer.httpRequest(reconfigUrl, "POST", > taskProps, null); > {code} > In httpRequest(): > {code} > } else if (responseCode >= 200 && responseCode < 300) { > InputStream is = connection.getInputStream(); > T result = JSON_SERDE.readValue(is, responseFormat); > {code} > For readValue(): > {code} > public T readValue(InputStream src, TypeReference valueTypeRef) > throws IOException, JsonParseException, JsonMappingException > { > return (T) _readMapAndClose(_jsonFactory.createParser(src), > _typeFactory.constructType(valueTypeRef)); > {code} > Then there would be NPE in constructType(): > {code} > public JavaType constructType(TypeReference typeRef) > { > // 19-Oct-2015, tatu: Simpler variant like so should work > return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6074) Use ZookeeperClient in ReplicaManager and Partition
[ https://issues.apache.org/jira/browse/KAFKA-6074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6074: -- Attachment: 6074.v10.txt > Use ZookeeperClient in ReplicaManager and Partition > --- > > Key: KAFKA-6074 > URL: https://issues.apache.org/jira/browse/KAFKA-6074 > Project: Kafka > Issue Type: Sub-task > Components: core >Affects Versions: 1.1.0 >Reporter: Jun Rao >Assignee: Ted Yu >Priority: Major > Fix For: 1.1.0 > > Attachments: 6074.v1.txt, 6074.v10.txt > > > We want to replace the usage of ZkUtils with ZookeeperClient in > ReplicaManager and Partition. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6168) Connect Schema comparison is slow for large schemas
[ https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238704#comment-16238704 ] ASF GitHub Bot commented on KAFKA-6168: --- GitHub user tedyu opened a pull request: https://github.com/apache/kafka/pull/4176 KAFKA-6168 Connect Schema comparison is slow for large schemas Re-arrange order of comparisons in equals() to evaluate non-composite fields first Cache hash code You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/kafka trunk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4176.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4176 commit 4bae62c8ad64408b20e4925977beadaa42b54619 Author: tedyu Date: 2017-11-04T02:03:59Z KAFKA-6168 Connect Schema comparison is slow for large schemas > Connect Schema comparison is slow for large schemas > --- > > Key: KAFKA-6168 > URL: https://issues.apache.org/jira/browse/KAFKA-6168 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Assignee: Ted Yu >Priority: Critical > Attachments: 6168.v1.txt > > > The {{ConnectSchema}} implementation computes the hash code every time its > needed, and {{equals(Object)}} is a deep equality check. This extra work can > be expensive for large schemas, especially in code like the {{AvroConverter}} > (or rather {{AvroData}} in the converter) that uses instances as keys in a > hash map that then requires significant use of {{hashCode}} and {{equals}}. > The {{ConnectSchema}} is an immutable object and should at a minimum > precompute the hash code. Also, the order that the fields are compared in > {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} > field is one of the _last_ fields to be checked). Finally, it might be worth > considering having each instance precompute and cache a string or byte[] > representation of all fields that can be used for faster equality checking. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6168) Connect Schema comparison is slow for large schemas
[ https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238698#comment-16238698 ] Randall Hauch commented on KAFKA-6168: -- [~yuzhih...@gmail.com], thanks for offering to work on this. I think the first step would be to create a PR with a commit that has the changes you attached as a patch, and then we can look into possibly doing something about improving the performance of {{equals}}. > Connect Schema comparison is slow for large schemas > --- > > Key: KAFKA-6168 > URL: https://issues.apache.org/jira/browse/KAFKA-6168 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Assignee: Ted Yu >Priority: Critical > Attachments: 6168.v1.txt > > > The {{ConnectSchema}} implementation computes the hash code every time its > needed, and {{equals(Object)}} is a deep equality check. This extra work can > be expensive for large schemas, especially in code like the {{AvroConverter}} > (or rather {{AvroData}} in the converter) that uses instances as keys in a > hash map that then requires significant use of {{hashCode}} and {{equals}}. > The {{ConnectSchema}} is an immutable object and should at a minimum > precompute the hash code. Also, the order that the fields are compared in > {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} > field is one of the _last_ fields to be checked). Finally, it might be worth > considering having each instance precompute and cache a string or byte[] > representation of all fields that can be used for faster equality checking. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-6168) Connect Schema comparison is slow for large schemas
[ https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch reassigned KAFKA-6168: Assignee: Ted Yu > Connect Schema comparison is slow for large schemas > --- > > Key: KAFKA-6168 > URL: https://issues.apache.org/jira/browse/KAFKA-6168 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Assignee: Ted Yu >Priority: Critical > Attachments: 6168.v1.txt > > > The {{ConnectSchema}} implementation computes the hash code every time its > needed, and {{equals(Object)}} is a deep equality check. This extra work can > be expensive for large schemas, especially in code like the {{AvroConverter}} > (or rather {{AvroData}} in the converter) that uses instances as keys in a > hash map that then requires significant use of {{hashCode}} and {{equals}}. > The {{ConnectSchema}} is an immutable object and should at a minimum > precompute the hash code. Also, the order that the fields are compared in > {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} > field is one of the _last_ fields to be checked). Finally, it might be worth > considering having each instance precompute and cache a string or byte[] > representation of all fields that can be used for faster equality checking. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6166) Streams configuration requires consumer. and producer. in order to be read
[ https://issues.apache.org/jira/browse/KAFKA-6166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238673#comment-16238673 ] James Cheng commented on KAFKA-6166: Thanks [~guozhang]. Just to make sure I understand, currently, if I want to apply my metric reporter to both the producer and consumer, what I need to do is: {code} Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.mycompany.MetricReporter"); config.put("consumer.custom-key-for-metric-reporter", "value"); config.put("producer.custom-key-for-metric-reporter", "value"); {code} And if I want to apply to kafka streams itself and the producer and the consumer, I need to do {code} Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.mycompany.MetricReporter"); config.put("consumer.custom-key-for-metric-reporter", "value"); config.put("producer.custom-key-for-metric-reporter", "value"); config.put("custom-key-for-metric-reporter", "value"); {code} Is that right? And in the future, if this JIRA is fixed, I will simply need to do: {code} Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.mycompany.MetricReporter"); config.put("custom-key-for-metric-reporter", "value"); {code} Is that right? > Streams configuration requires consumer. and producer. in order to be read > -- > > Key: KAFKA-6166 > URL: https://issues.apache.org/jira/browse/KAFKA-6166 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: Kafka 0.11.0.0 > JDK 1.8 > CoreOS >Reporter: Justin Manchester >Priority: Minor > > Problem: > In previous release you could specify a custom metrics reporter like so: > Properties config = new Properties(); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); > config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, > "com.mycompany.MetricReporter"); > config.put("custom-key-for-metric-reporter", "value"); > From 0.11.0.0 onwards this is no longer possible, as you have to specify > consumer.custom-key-for-metric-reporter or > producer.custom-key-for-metric-reporter otherwise it's stripped out of the > configuration. > So, if you wish to use a metrics reporter and to collect producer and > consumer metrics, as well as kafka-streams metrics, that you would need to > specify 3 distinct configs: > 1) consumer.custom-key-for-metric-reporter > 2) producer.custom-key-for-metric-reporter > 3) custom-key-for-metric-reporter > This appears to be a regression. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6148) ClassCastException in BigQuery connector
[ https://issues.apache.org/jira/browse/KAFKA-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238621#comment-16238621 ] Ewen Cheslack-Postava commented on KAFKA-6148: -- [~burd0047] Can you clarify which version of the BigQuery connector you are using? I don't see a branch or tag that was using 0.11.0.0 Kafka, they all still look like they are on 0.10.2 series. > ClassCastException in BigQuery connector > > > Key: KAFKA-6148 > URL: https://issues.apache.org/jira/browse/KAFKA-6148 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Eugene Burd >Assignee: Konstantine Karantasis >Priority: Major > > I am trying to run a com.wepay.kafka.connect.bigquery.BigQuerySinkConnector > connector, but getting the following exception. > [2017-10-30 21:48:49,007] ERROR WorkerSinkTask{id=bigquery-connector-log-0} > Offset commit failed, rewinding to last committed offsets > (org.apache.kafka.connect.runtime.WorkerSinkTask:311) > java.lang.ClassCastException: > org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to > org.apache.kafka.clients.consumer.OffsetAndMetadata > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107) > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96) > at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > [2017-10-30 21:48:49,012] ERROR Commit of > WorkerSinkTask{id=bigquery-connector-log-0} offsets threw an unexpected > exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:205) > java.lang.ClassCastException: > org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to > org.apache.kafka.clients.consumer.OffsetAndMetadata > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107) > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96) > at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > I have checked the version number of kafka client in the plug in and kafka > connect itself and they are the same. > - kafka-clients-0.11.0.0.jar matches > I am still suspecting a type of versioning issue. Do you have any advice? > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6167) Timestamp on streams directory contains a colon, which is an illegal character
[ https://issues.apache.org/jira/browse/KAFKA-6167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6167: - Labels: user-experience (was: ) > Timestamp on streams directory contains a colon, which is an illegal character > -- > > Key: KAFKA-6167 > URL: https://issues.apache.org/jira/browse/KAFKA-6167 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: AK 1.0.0 > Kubernetes > CoreOS > JDK 1.8 > Windows >Reporter: Justin Manchester >Priority: Normal > Labels: user-experience > > Problem: > Development on Windows, which is not fully supported, however still a bug > that should be corrected. > It looks like a timestamp was added to the streams directory using a colon as > separator. I believe this is an illegal character and potentially the cause > for the exception below. > Error Stack: > 2017-11-02 16:06:41 ERROR > [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1] > org.apache.kafka.streams.processor.internals.AssignedTasks:301 - > stream-thread > [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1] > Failed to process stream task 0_0 due to the following error: > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=input-a_1, > partition=0, offset=0 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > [kafka-streams-1.0.0.jar:?] > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error > opening store KSTREAM-JOINTHIS-04-store:150962400 at location > C:\Users\ADRIAN~1.MCC\AppData\Local\Temp\kafka3548813472740086814\StreamDeduplicatorAcceptanceTest1\0_0\KSTREAM-JOINTHIS-04-store\KSTREAM-JOINTHIS-04-store:150962400 > > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174) > ~[kafka-streams-1.0.0.jar:?] > at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:89) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) > ~[kafka-
[jira] [Assigned] (KAFKA-6148) ClassCastException in BigQuery connector
[ https://issues.apache.org/jira/browse/KAFKA-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava reassigned KAFKA-6148: Assignee: Konstantine Karantasis > ClassCastException in BigQuery connector > > > Key: KAFKA-6148 > URL: https://issues.apache.org/jira/browse/KAFKA-6148 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Eugene Burd >Assignee: Konstantine Karantasis >Priority: Major > > I am trying to run a com.wepay.kafka.connect.bigquery.BigQuerySinkConnector > connector, but getting the following exception. > [2017-10-30 21:48:49,007] ERROR WorkerSinkTask{id=bigquery-connector-log-0} > Offset commit failed, rewinding to last committed offsets > (org.apache.kafka.connect.runtime.WorkerSinkTask:311) > java.lang.ClassCastException: > org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to > org.apache.kafka.clients.consumer.OffsetAndMetadata > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107) > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96) > at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > [2017-10-30 21:48:49,012] ERROR Commit of > WorkerSinkTask{id=bigquery-connector-log-0} offsets threw an unexpected > exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:205) > java.lang.ClassCastException: > org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to > org.apache.kafka.clients.consumer.OffsetAndMetadata > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107) > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96) > at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > I have checked the version number of kafka client in the plug in and kafka > connect itself and they are the same. > - kafka-clients-0.11.0.0.jar matches > I am still suspecting a type of versioning issue. Do you have any advice? > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6148) ClassCastException in BigQuery connector
[ https://issues.apache.org/jira/browse/KAFKA-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238589#comment-16238589 ] Ewen Cheslack-Postava commented on KAFKA-6148: -- I think this is due to the new classpath isolation. I see from https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java#L120-L121 that we covered excluding org.apache.kafka.common and org.apache.kafka.connect and thought that covered the classes we needed to ensure were loaded from the system classloader, but SinkTask also uses a class from org.apache.kafka.clients. [~kkonstantine] seem like an accurate assessment? If so, any connector that overrides flush() or precommit() could potentially run into issues (though I'm not sure if the issue is ultimately caused by having a separate method call there that uses the type). I think the code in the BigQuery connector is actually unnecessary -- just flushing the data is sufficient since the same offsets will be used and do not need to be set on the context object explicitly -- but this is also a bug in Connect itself. Looks like the fix is easy and should probably be backported back to the first release branch with classloader isolation. > ClassCastException in BigQuery connector > > > Key: KAFKA-6148 > URL: https://issues.apache.org/jira/browse/KAFKA-6148 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Eugene Burd >Priority: Major > > I am trying to run a com.wepay.kafka.connect.bigquery.BigQuerySinkConnector > connector, but getting the following exception. > [2017-10-30 21:48:49,007] ERROR WorkerSinkTask{id=bigquery-connector-log-0} > Offset commit failed, rewinding to last committed offsets > (org.apache.kafka.connect.runtime.WorkerSinkTask:311) > java.lang.ClassCastException: > org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to > org.apache.kafka.clients.consumer.OffsetAndMetadata > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107) > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96) > at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > [2017-10-30 21:48:49,012] ERROR Commit of > WorkerSinkTask{id=bigquery-connector-log-0} offsets threw an unexpected > exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:205) > java.lang.ClassCastException: > org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to > org.apache.kafka.clients.consumer.OffsetAndMetadata > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107) > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96) > at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > I have checked the version number of kafka client in the plug in and kafka > connect itself and they are the same. > - kafka-clients-0.11.0.0.jar matches > I am still suspecting a type of vers
[jira] [Created] (KAFKA-6170) Add the AdminClient in Streams' KafkaClientSupplier
Guozhang Wang created KAFKA-6170: Summary: Add the AdminClient in Streams' KafkaClientSupplier Key: KAFKA-6170 URL: https://issues.apache.org/jira/browse/KAFKA-6170 Project: Kafka Issue Type: New Feature Components: streams Reporter: Guozhang Wang Assignee: Guozhang Wang Priority: Major We will add Java AdminClient to Kafka Streams, in order to replace the internal StreamsKafkaClient. More details can be found in KIP-220 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-220%3A+Add+AdminClient+into+Kafka+Streams%27+ClientSupplier) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6168) Connect Schema comparison is slow for large schemas
[ https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238561#comment-16238561 ] Ted Yu edited comment on KAFKA-6168 at 11/3/17 11:35 PM: - bq. considering having each instance precompute and cache a string or byte[] representation of all fields Should this be done for all fields (quite a few) ? Or just selected fields such as keySchema ? was (Author: yuzhih...@gmail.com): Can I work on this ? > Connect Schema comparison is slow for large schemas > --- > > Key: KAFKA-6168 > URL: https://issues.apache.org/jira/browse/KAFKA-6168 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Priority: Critical > Attachments: 6168.v1.txt > > > The {{ConnectSchema}} implementation computes the hash code every time its > needed, and {{equals(Object)}} is a deep equality check. This extra work can > be expensive for large schemas, especially in code like the {{AvroConverter}} > (or rather {{AvroData}} in the converter) that uses instances as keys in a > hash map that then requires significant use of {{hashCode}} and {{equals}}. > The {{ConnectSchema}} is an immutable object and should at a minimum > precompute the hash code. Also, the order that the fields are compared in > {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} > field is one of the _last_ fields to be checked). Finally, it might be worth > considering having each instance precompute and cache a string or byte[] > representation of all fields that can be used for faster equality checking. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6168) Connect Schema comparison is slow for large schemas
[ https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6168: -- Attachment: 6168.v1.txt > Connect Schema comparison is slow for large schemas > --- > > Key: KAFKA-6168 > URL: https://issues.apache.org/jira/browse/KAFKA-6168 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Priority: Critical > Attachments: 6168.v1.txt > > > The {{ConnectSchema}} implementation computes the hash code every time its > needed, and {{equals(Object)}} is a deep equality check. This extra work can > be expensive for large schemas, especially in code like the {{AvroConverter}} > (or rather {{AvroData}} in the converter) that uses instances as keys in a > hash map that then requires significant use of {{hashCode}} and {{equals}}. > The {{ConnectSchema}} is an immutable object and should at a minimum > precompute the hash code. Also, the order that the fields are compared in > {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} > field is one of the _last_ fields to be checked). Finally, it might be worth > considering having each instance precompute and cache a string or byte[] > representation of all fields that can be used for faster equality checking. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6148) ClassCastException in BigQuery connector
[ https://issues.apache.org/jira/browse/KAFKA-6148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6148: - Summary: ClassCastException in BigQuery connector (was: ERROR Commit of WorkerSinkTask{id=bigquery-connector-log-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:205) java.lang.ClassCastException: org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast) > ClassCastException in BigQuery connector > > > Key: KAFKA-6148 > URL: https://issues.apache.org/jira/browse/KAFKA-6148 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Eugene Burd >Priority: Major > > I am trying to run a com.wepay.kafka.connect.bigquery.BigQuerySinkConnector > connector, but getting the following exception. > [2017-10-30 21:48:49,007] ERROR WorkerSinkTask{id=bigquery-connector-log-0} > Offset commit failed, rewinding to last committed offsets > (org.apache.kafka.connect.runtime.WorkerSinkTask:311) > java.lang.ClassCastException: > org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to > org.apache.kafka.clients.consumer.OffsetAndMetadata > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107) > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96) > at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > [2017-10-30 21:48:49,012] ERROR Commit of > WorkerSinkTask{id=bigquery-connector-log-0} offsets threw an unexpected > exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:205) > java.lang.ClassCastException: > org.apache.kafka.clients.consumer.OffsetAndMetadata cannot be cast to > org.apache.kafka.clients.consumer.OffsetAndMetadata > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.updateOffsets(BigQuerySinkTask.java:107) > at > com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:96) > at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:117) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:305) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:164) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > I have checked the version number of kafka client in the plug in and kafka > connect itself and they are the same. > - kafka-clients-0.11.0.0.jar matches > I am still suspecting a type of versioning issue. Do you have any advice? > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6168) Connect Schema comparison is slow for large schemas
[ https://issues.apache.org/jira/browse/KAFKA-6168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238561#comment-16238561 ] Ted Yu commented on KAFKA-6168: --- Can I work on this ? > Connect Schema comparison is slow for large schemas > --- > > Key: KAFKA-6168 > URL: https://issues.apache.org/jira/browse/KAFKA-6168 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Priority: Critical > > The {{ConnectSchema}} implementation computes the hash code every time its > needed, and {{equals(Object)}} is a deep equality check. This extra work can > be expensive for large schemas, especially in code like the {{AvroConverter}} > (or rather {{AvroData}} in the converter) that uses instances as keys in a > hash map that then requires significant use of {{hashCode}} and {{equals}}. > The {{ConnectSchema}} is an immutable object and should at a minimum > precompute the hash code. Also, the order that the fields are compared in > {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} > field is one of the _last_ fields to be checked). Finally, it might be worth > considering having each instance precompute and cache a string or byte[] > representation of all fields that can be used for faster equality checking. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6166) Streams configuration requires consumer. and producer. in order to be read
[ https://issues.apache.org/jira/browse/KAFKA-6166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238531#comment-16238531 ] Guozhang Wang commented on KAFKA-6166: -- [~wushujames] asked about this issue as well, so just copy-pasting my replies here: {code} Thanks for bringing this up. The original design is to let users apply the prefix ONLY if they want to override for the specific client. For example, for `metric.reporters` or `interceptor.classes` which are defined for both producer and consumer (and moving forward we are adding admin client as well), without the prefix they will be applied to all embedded clients; and if users want to have different config value overrides for different clients, they can use the prefix. So is you define for `metrics.reporter` it should be applied to both producer and consumer (is it not the case for you? Please confirm). For custom config k-v pairs, I think we did not discuss about this in-depth. But our currently implementation is, indeed, restricting to producer / consumer client prop definition only, for example. getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames()); props.putAll(originalsWithPrefix(prefix)); so if you do not add the prefix to your custom k-v pairs, it will be skipped. Thinking about it a bit, I think we should probably do what you were inclining to, i.e. to define custom k-v pairs also globally unless overridden by prefix. {code} > Streams configuration requires consumer. and producer. in order to be read > -- > > Key: KAFKA-6166 > URL: https://issues.apache.org/jira/browse/KAFKA-6166 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: Kafka 0.11.0.0 > JDK 1.8 > CoreOS >Reporter: Justin Manchester >Priority: Minor > > Problem: > In previous release you could specify a custom metrics reporter like so: > Properties config = new Properties(); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); > config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, > "com.mycompany.MetricReporter"); > config.put("custom-key-for-metric-reporter", "value"); > From 0.11.0.0 onwards this is no longer possible, as you have to specify > consumer.custom-key-for-metric-reporter or > producer.custom-key-for-metric-reporter otherwise it's stripped out of the > configuration. > So, if you wish to use a metrics reporter and to collect producer and > consumer metrics, as well as kafka-streams metrics, that you would need to > specify 3 distinct configs: > 1) consumer.custom-key-for-metric-reporter > 2) producer.custom-key-for-metric-reporter > 3) custom-key-for-metric-reporter > This appears to be a regression. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6169) Kafka Log should reject negative timestamps
Ryan P created KAFKA-6169: - Summary: Kafka Log should reject negative timestamps Key: KAFKA-6169 URL: https://issues.apache.org/jira/browse/KAFKA-6169 Project: Kafka Issue Type: Bug Reporter: Ryan P Priority: Minor Currently it would appear we rely on the Java Producer to prevent negative timestamps from being appended to the log. The broker should also include a validation which prevents poorly written third-party clients from doing the same. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6168) Connect Schema comparison is slow for large schemas
Randall Hauch created KAFKA-6168: Summary: Connect Schema comparison is slow for large schemas Key: KAFKA-6168 URL: https://issues.apache.org/jira/browse/KAFKA-6168 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Randall Hauch Priority: Critical The {{ConnectSchema}} implementation computes the hash code every time its needed, and {{equals(Object)}} is a deep equality check. This extra work can be expensive for large schemas, especially in code like the {{AvroConverter}} (or rather {{AvroData}} in the converter) that uses instances as keys in a hash map that then requires significant use of {{hashCode}} and {{equals}}. The {{ConnectSchema}} is an immutable object and should at a minimum precompute the hash code. Also, the order that the fields are compared in {{equals(...)}} should use the cheapest comparisons first (e.g., the {{name}} field is one of the _last_ fields to be checked). Finally, it might be worth considering having each instance precompute and cache a string or byte[] representation of all fields that can be used for faster equality checking. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)
[ https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16238126#comment-16238126 ] Drew Kutcharian commented on KAFKA-4682: This just happened to us and I just stumbled upon this JIRA while trying to figure out the cause. A few questions: 1. Aren't consumer offset topics compacted? Shouldn't at least the last entry stay on disk after cleanup? 2. Considering that they are compacted, what is the real concern with workaround 2 in the description: "2. Turn the value of offsets.retention.minutes up really really high"? 3. As a workaround, would it make sense to set {{offsets.retention.ms}} to the same value as {{logs.retention.ms}} and {{auto.offset.reset}} to {{earliest}}? That way consumers and logs would "reset" the same time? 4. Is there a timeline for the release of KIP-211? > Committed offsets should not be deleted if a consumer is still active > (KIP-211) > --- > > Key: KAFKA-4682 > URL: https://issues.apache.org/jira/browse/KAFKA-4682 > Project: Kafka > Issue Type: Bug >Reporter: James Cheng >Assignee: Vahid Hashemian >Priority: Major > Labels: kip > > Kafka will delete committed offsets that are older than > offsets.retention.minutes > If there is an active consumer on a low traffic partition, it is possible > that Kafka will delete the committed offset for that consumer. Once the > offset is deleted, a restart or a rebalance of that consumer will cause the > consumer to not find any committed offset and start consuming from > earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker > failover might also cause you to start reading from auto.offset.reset (due to > broker restart, or coordinator failover). > I think that Kafka should only delete offsets for inactive consumers. The > timer should only start after a consumer group goes inactive. For example, if > a consumer group goes inactive, then after 1 week, delete the offsets for > that consumer group. This is a solution that [~junrao] mentioned in > https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521 > The current workarounds are to: > # Commit an offset on every partition you own on a regular basis, making sure > that it is more frequent than offsets.retention.minutes (a broker-side > setting that a consumer might not be aware of) > or > # Turn the value of offsets.retention.minutes up really really high. You have > to make sure it is higher than any valid low-traffic rate that you want to > support. For example, if you want to support a topic where someone produces > once a month, you would have to set offsetes.retention.mintues to 1 month. > or > # Turn on enable.auto.commit (this is essentially #1, but easier to > implement). > None of these are ideal. > #1 can be spammy. It requires your consumers know something about how the > brokers are configured. Sometimes it is out of your control. Mirrormaker, for > example, only commits offsets on partitions where it receives data. And it is > duplication that you need to put into all of your consumers. > #2 has disk-space impact on the broker (in __consumer_offsets) as well as > memory-size on the broker (to answer OffsetFetch). > #3 I think has the potential for message loss (the consumer might commit on > messages that are not yet fully processed) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6163) Broker should fail fast on startup if an error occurs while loading logs
[ https://issues.apache.org/jira/browse/KAFKA-6163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237984#comment-16237984 ] Xavier Léauté commented on KAFKA-6163: -- [~ijuma] in this case it was a different fatal error {{java.lang.InternalError: a fault occurred in an unsafe memory access operation}} You probably don't want to try to recover from that. > Broker should fail fast on startup if an error occurs while loading logs > > > Key: KAFKA-6163 > URL: https://issues.apache.org/jira/browse/KAFKA-6163 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Xavier Léauté >Priority: Normal > > If the broker fails to load one of the logs during startup, we currently > don't fail fast. The {{LogManager}} will log an error and initiate the > shutdown sequence, but continue loading all the remaining sequence before > shutting down. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6164) ClientQuotaManager threads prevent shutdown when encountering an error loading logs
[ https://issues.apache.org/jira/browse/KAFKA-6164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237977#comment-16237977 ] Xavier Léauté commented on KAFKA-6164: -- [~ted_yu] that's certainly an option assuming we don't care about orderly shutdown of the quota manager. It would probably be nicer though to properly manage the quota manager lifecycle. > ClientQuotaManager threads prevent shutdown when encountering an error > loading logs > --- > > Key: KAFKA-6164 > URL: https://issues.apache.org/jira/browse/KAFKA-6164 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Xavier Léauté >Priority: Major > > While diagnosing KAFKA-6163, we noticed that when the broker initiates a > shutdown sequence in response to an error loading the logs, the process never > exits. The JVM appears to be waiting indefinitely for several non-deamon > threads to terminate. > The threads in question are {{ThrottledRequestReaper-Request}}, > {{ThrottledRequestReaper-Produce}}, and {{ThrottledRequestReaper-Fetch}}, so > it appears we don't properly shutdown {{ClientQuotaManager}} in this > situation. > QuotaManager shutdown is currently handled by KafkaApis, however KafkaApis > will never be instantiated in the first place if we encounter an error > loading the logs, so quotamangers are left dangling in that case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6167) Timestamp on streams directory contains a colon, which is an illegal character
[ https://issues.apache.org/jira/browse/KAFKA-6167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237875#comment-16237875 ] Adrian McCague commented on KAFKA-6167: --- `.` feels quite standard here > Timestamp on streams directory contains a colon, which is an illegal character > -- > > Key: KAFKA-6167 > URL: https://issues.apache.org/jira/browse/KAFKA-6167 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: AK 1.0.0 > Kubernetes > CoreOS > JDK 1.8 > Windows >Reporter: Justin Manchester >Priority: Normal > > Problem: > Development on Windows, which is not fully supported, however still a bug > that should be corrected. > It looks like a timestamp was added to the streams directory using a colon as > separator. I believe this is an illegal character and potentially the cause > for the exception below. > Error Stack: > 2017-11-02 16:06:41 ERROR > [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1] > org.apache.kafka.streams.processor.internals.AssignedTasks:301 - > stream-thread > [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1] > Failed to process stream task 0_0 due to the following error: > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=input-a_1, > partition=0, offset=0 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > [kafka-streams-1.0.0.jar:?] > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error > opening store KSTREAM-JOINTHIS-04-store:150962400 at location > C:\Users\ADRIAN~1.MCC\AppData\Local\Temp\kafka3548813472740086814\StreamDeduplicatorAcceptanceTest1\0_0\KSTREAM-JOINTHIS-04-store\KSTREAM-JOINTHIS-04-store:150962400 > > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174) > ~[kafka-streams-1.0.0.jar:?] > at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:89) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:12
[jira] [Comment Edited] (KAFKA-6167) Timestamp on streams directory contains a colon, which is an illegal character
[ https://issues.apache.org/jira/browse/KAFKA-6167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237875#comment-16237875 ] Adrian McCague edited comment on KAFKA-6167 at 11/3/17 4:39 PM: {code:java}.{code} feels quite standard here was (Author: amccague): `.` feels quite standard here > Timestamp on streams directory contains a colon, which is an illegal character > -- > > Key: KAFKA-6167 > URL: https://issues.apache.org/jira/browse/KAFKA-6167 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: AK 1.0.0 > Kubernetes > CoreOS > JDK 1.8 > Windows >Reporter: Justin Manchester >Priority: Normal > > Problem: > Development on Windows, which is not fully supported, however still a bug > that should be corrected. > It looks like a timestamp was added to the streams directory using a colon as > separator. I believe this is an illegal character and potentially the cause > for the exception below. > Error Stack: > 2017-11-02 16:06:41 ERROR > [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1] > org.apache.kafka.streams.processor.internals.AssignedTasks:301 - > stream-thread > [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1] > Failed to process stream task 0_0 due to the following error: > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=input-a_1, > partition=0, offset=0 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > [kafka-streams-1.0.0.jar:?] > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error > opening store KSTREAM-JOINTHIS-04-store:150962400 at location > C:\Users\ADRIAN~1.MCC\AppData\Local\Temp\kafka3548813472740086814\StreamDeduplicatorAcceptanceTest1\0_0\KSTREAM-JOINTHIS-04-store\KSTREAM-JOINTHIS-04-store:150962400 > > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174) > ~[kafka-streams-1.0.0.jar:?] > at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:89) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > ~[kafk
[jira] [Commented] (KAFKA-6167) Timestamp on streams directory contains a colon, which is an illegal character
[ https://issues.apache.org/jira/browse/KAFKA-6167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237856#comment-16237856 ] Ted Yu commented on KAFKA-6167: --- Any suggestion for a replacement ? Semicolon ? > Timestamp on streams directory contains a colon, which is an illegal character > -- > > Key: KAFKA-6167 > URL: https://issues.apache.org/jira/browse/KAFKA-6167 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: AK 1.0.0 > Kubernetes > CoreOS > JDK 1.8 > Windows >Reporter: Justin Manchester >Priority: Normal > > Problem: > Development on Windows, which is not fully supported, however still a bug > that should be corrected. > It looks like a timestamp was added to the streams directory using a colon as > separator. I believe this is an illegal character and potentially the cause > for the exception below. > Error Stack: > 2017-11-02 16:06:41 ERROR > [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1] > org.apache.kafka.streams.processor.internals.AssignedTasks:301 - > stream-thread > [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1] > Failed to process stream task 0_0 due to the following error: > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=input-a_1, > partition=0, offset=0 > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > [kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > [kafka-streams-1.0.0.jar:?] > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error > opening store KSTREAM-JOINTHIS-04-store:150962400 at location > C:\Users\ADRIAN~1.MCC\AppData\Local\Temp\kafka3548813472740086814\StreamDeduplicatorAcceptanceTest1\0_0\KSTREAM-JOINTHIS-04-store\KSTREAM-JOINTHIS-04-store:150962400 > > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174) > ~[kafka-streams-1.0.0.jar:?] > at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:89) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > ~[kafka-streams-1.0.0.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:1
[jira] [Commented] (KAFKA-6166) Streams configuration requires consumer. and producer. in order to be read
[ https://issues.apache.org/jira/browse/KAFKA-6166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237846#comment-16237846 ] Ted Yu commented on KAFKA-6166: --- How about populating the first two configs with the value for custom-key-for-metric-reporter, by default ? > Streams configuration requires consumer. and producer. in order to be read > -- > > Key: KAFKA-6166 > URL: https://issues.apache.org/jira/browse/KAFKA-6166 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 > Environment: Kafka 0.11.0.0 > JDK 1.8 > CoreOS >Reporter: Justin Manchester >Priority: Minor > > Problem: > In previous release you could specify a custom metrics reporter like so: > Properties config = new Properties(); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); > config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, > "com.mycompany.MetricReporter"); > config.put("custom-key-for-metric-reporter", "value"); > From 0.11.0.0 onwards this is no longer possible, as you have to specify > consumer.custom-key-for-metric-reporter or > producer.custom-key-for-metric-reporter otherwise it's stripped out of the > configuration. > So, if you wish to use a metrics reporter and to collect producer and > consumer metrics, as well as kafka-streams metrics, that you would need to > specify 3 distinct configs: > 1) consumer.custom-key-for-metric-reporter > 2) producer.custom-key-for-metric-reporter > 3) custom-key-for-metric-reporter > This appears to be a regression. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6167) Timestamp on streams directory contains a colon, which is an illegal character
Justin Manchester created KAFKA-6167: Summary: Timestamp on streams directory contains a colon, which is an illegal character Key: KAFKA-6167 URL: https://issues.apache.org/jira/browse/KAFKA-6167 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Environment: AK 1.0.0 Kubernetes CoreOS JDK 1.8 Windows Reporter: Justin Manchester Priority: Normal Problem: Development on Windows, which is not fully supported, however still a bug that should be corrected. It looks like a timestamp was added to the streams directory using a colon as separator. I believe this is an illegal character and potentially the cause for the exception below. Error Stack: 2017-11-02 16:06:41 ERROR [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1] org.apache.kafka.streams.processor.internals.AssignedTasks:301 - stream-thread [StreamDeduplicatorAcceptanceTest1-a3ae0ac6-a024-4006-bcb1-01ff0f433f6e-StreamThread-1] Failed to process stream task 0_0 due to the following error: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=input-a_1, partition=0, offset=0 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) [kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) [kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) [kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) [kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) [kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) [kafka-streams-1.0.0.jar:?] Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store KSTREAM-JOINTHIS-04-store:150962400 at location C:\Users\ADRIAN~1.MCC\AppData\Local\Temp\kafka3548813472740086814\StreamDeduplicatorAcceptanceTest1\0_0\KSTREAM-JOINTHIS-04-store\KSTREAM-JOINTHIS-04-store:150962400 at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:89) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) ~[kafka-streams-1.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) ~[kafka-streams-1.0.0.jar:?] ... 6 more Caused by: org.rocksdb.Rock
[jira] [Commented] (KAFKA-6161) Introduce new serdes interfaces with empty configure() and close()
[ https://issues.apache.org/jira/browse/KAFKA-6161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237785#comment-16237785 ] ASF GitHub Bot commented on KAFKA-6161: --- GitHub user evis opened a pull request: https://github.com/apache/kafka/pull/4175 KAFKA-6161 add base classes for (De)Serializers with empty conf methods All (de)serializers, which have empty configure() and/or close() methods, are now inherit NoConf(De)Serializer. Also, such classes are created for extended (de)serializers. You can merge this pull request into a Git repository by running: $ git pull https://github.com/evis/kafka KAFKA-6161-serde-empty-conf-close-methods Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4175.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4175 commit 297115a53f6ea6c1bfd2383c0f73f011cb94c505 Author: Evgeny Veretennikov Date: 2017-11-03T15:38:18Z KAFKA-6161 add base classes for (De)Serializers with empty conf methods All (de)serializers, which have empty configure() and/or close() methods, are now inherit NoConf(De)Serializer. Also, such classes are created for extended (de)serializers. > Introduce new serdes interfaces with empty configure() and close() > -- > > Key: KAFKA-6161 > URL: https://issues.apache.org/jira/browse/KAFKA-6161 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Evgeny Veretennikov >Assignee: Evgeny Veretennikov >Priority: Normal > > {{Serializer}}, {{Deserializer}} and {{Serde}} interfaces have methods > {{configure()}} and {{close()}}. Pretty often one want to leave these methods > empty. For example, a lot of serializers inside > {{org.apache.kafka.common.serialization}} package have these methods empty: > {code} > @Override > public void configure(Map configs, boolean isKey) { > // nothing to do > } > @Override > public void close() { > // nothing to do > } > {code} > To avoid such boilerplate, we may create new interfaces (like > {{UnconfiguredSerializer}}), in which we will define these methods empty. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6159) Link to upgrade docs in 1.0.0 release notes is broken
[ https://issues.apache.org/jira/browse/KAFKA-6159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6159. -- Resolution: Fixed > Link to upgrade docs in 1.0.0 release notes is broken > - > > Key: KAFKA-6159 > URL: https://issues.apache.org/jira/browse/KAFKA-6159 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 1.0.0 >Reporter: Martin Schröder >Assignee: Guozhang Wang > Labels: release-notes > > The release notes for 1.0.0 > (https://dist.apache.org/repos/dist/release/kafka/1.0.0/RELEASE_NOTES.html) > point to http://kafka.apache.org/100/documentation.html#upgrade for "upgrade > documentation", but that gives a 404. > Maybe you mean http://kafka.apache.org/documentation.html#upgrade ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.
[ https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237567#comment-16237567 ] kaushik srinivas commented on KAFKA-6165: - Update for my previous comment, Once the brokers went down and recovered, i see below exception in 2 of the brokers, [2017-11-03 08:28:00,772] WARN [ReplicaFetcherThread-0-2], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@7da5bee5 (kafka.server.ReplicaFetcherThread) java.io.IOException: Connection to 2 was disconnected before the response was read at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112) at scala.Option.foreach(Option.scala:257) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112) at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108) at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137) at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143) at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108) at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238) at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > Kafka Brokers goes down with outOfMemoryError. > -- > > Key: KAFKA-6165 > URL: https://issues.apache.org/jira/browse/KAFKA-6165 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 > Environment: DCOS cluster with 4 agent nodes and 3 masters. > agent machine config : > RAM : 384 GB > DISK : 4TB >Reporter: kaushik srinivas >Priority: Major > Attachments: kafka_config.txt, stderr_broker1.txt, > stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt > > > Performance testing kafka with end to end pipe lines of, > Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1 > Kafka Data Producer -> kafka -> flume -> hdfs -- stream2 > stream1 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > stream2 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > Some important Kafka Configuration : > "BROKER_MEM": "32768"(32GB) > "BROKER_JAVA_HEAP": "16384"(16GB) > "BROKER_COUNT": "3" > "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB) > "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB) > "KAFKA_NUM_PARTITIONS": "20" > "BROKER_DISK_SIZE": "5000" (5GB) > "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB) > "KAFKA_LOG_RETENTION_BYTES": "50"(5GB) > Data Producer to kafka Throughput: > message rate : 5 lakhs messages/sec approx across all the 3 brokers and > topics/partitions. > message size : approx 300 to 400 bytes. > Issues observed with this configs: > Issue 1: > stack trace: > [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to > unrecoverable I/O error while handling produce request: > (kafka.server.ReplicaManager) > kafka.common.KafkaStorageException: I/O exception in append to log > 'store_sales-16' > at kafka.log.Log.append(Log.scala:349) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > s
[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.
[ https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237563#comment-16237563 ] Ismael Juma commented on KAFKA-6165: The memory map is failing. Maybe you're running into a file descriptors limit? > Kafka Brokers goes down with outOfMemoryError. > -- > > Key: KAFKA-6165 > URL: https://issues.apache.org/jira/browse/KAFKA-6165 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 > Environment: DCOS cluster with 4 agent nodes and 3 masters. > agent machine config : > RAM : 384 GB > DISK : 4TB >Reporter: kaushik srinivas >Priority: Major > Attachments: kafka_config.txt, stderr_broker1.txt, > stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt > > > Performance testing kafka with end to end pipe lines of, > Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1 > Kafka Data Producer -> kafka -> flume -> hdfs -- stream2 > stream1 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > stream2 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > Some important Kafka Configuration : > "BROKER_MEM": "32768"(32GB) > "BROKER_JAVA_HEAP": "16384"(16GB) > "BROKER_COUNT": "3" > "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB) > "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB) > "KAFKA_NUM_PARTITIONS": "20" > "BROKER_DISK_SIZE": "5000" (5GB) > "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB) > "KAFKA_LOG_RETENTION_BYTES": "50"(5GB) > Data Producer to kafka Throughput: > message rate : 5 lakhs messages/sec approx across all the 3 brokers and > topics/partitions. > message size : approx 300 to 400 bytes. > Issues observed with this configs: > Issue 1: > stack trace: > [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to > unrecoverable I/O error while handling produce request: > (kafka.server.ReplicaManager) > kafka.common.KafkaStorageException: I/O exception in append to log > 'store_sales-16' > at kafka.log.Log.append(Log.scala:349) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393) > at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330) > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425) > at kafka.server.KafkaApis.handle(KafkaApis.scala:78) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Map failed > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940) > at > kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116) > at > kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159) > at kafka.log.Log.roll(Log.scala:771) > at kafka.log.Log.maybeRoll(Log.scala:742) > at kafka.log.Log.append(Log.scala:405) > ... 22 more >
[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.
[ https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237555#comment-16237555 ] kaushik srinivas commented on KAFKA-6165: - Tried with 12GB of Heap space. Observed kafka brokers crashing again with, [2017-11-03 08:02:12,424] FATAL [ReplicaFetcherThread-0-0], Disk error while replicating data for store_sales-15 (kafka.server.ReplicaFetcherThread) kafka.common.KafkaStorageException: I/O exception in append to log 'store_sales-15' at kafka.log.Log.append(Log.scala:349) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:130) at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:42) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:159) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:141) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:141) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:138) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:138) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:138) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:136) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Caused by: java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940) at kafka.log.AbstractIndex.(AbstractIndex.scala:61) at kafka.log.TimeIndex.(TimeIndex.scala:55) at kafka.log.LogSegment.(LogSegment.scala:68) at kafka.log.Log.roll(Log.scala:776) at kafka.log.Log.maybeRoll(Log.scala:742) at kafka.log.Log.append(Log.scala:405) ... 16 more Caused by: java.lang.OutOfMemoryError: Map failed at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:937) Observing 300k messages/sec on each broker (3 brokers) at the time of broker crash. > Kafka Brokers goes down with outOfMemoryError. > -- > > Key: KAFKA-6165 > URL: https://issues.apache.org/jira/browse/KAFKA-6165 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 > Environment: DCOS cluster with 4 agent nodes and 3 masters. > agent machine config : > RAM : 384 GB > DISK : 4TB >Reporter: kaushik srinivas >Priority: Major > Attachments: kafka_config.txt, stderr_broker1.txt, > stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt > > > Performance testing kafka with end to end pipe lines of, > Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1 > Kafka Data Producer -> kafka -> flume -> hdfs -- stream2 > stream1 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > stream2 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > Some important Kafka Configuration : > "BROKER_MEM": "32768"(32GB) > "BROKER_JAVA_HEAP": "16384"(16GB) > "BROKER_COUNT": "3" > "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB) > "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB) > "KAFKA_NUM_PARTITIONS": "20" > "BROKER_DISK_SIZE": "5000" (5GB) > "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB) > "KAFKA_LOG_RETENTION_BYTES": "50"(5GB) > Data Producer to kafka Throughput: > message rate : 5 lakhs messages/sec approx across all the 3 brokers and > topics/partitions. > message size : approx 300 to 400 bytes. > Issues observed with this configs: > Issue 1: > stack trace: > [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to > unrecoverable I/O error while handling produce request: > (kafka.server.ReplicaManager) > kafka.common.KafkaStorageException: I/O exception in append to log > 'store_sales-16' > at kafka.log.Log.append(Log.scala:349) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443) >
[jira] [Created] (KAFKA-6166) Streams configuration requires consumer. and producer. in order to be read
Justin Manchester created KAFKA-6166: Summary: Streams configuration requires consumer. and producer. in order to be read Key: KAFKA-6166 URL: https://issues.apache.org/jira/browse/KAFKA-6166 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.11.0.0 Environment: Kafka 0.11.0.0 JDK 1.8 CoreOS Reporter: Justin Manchester Priority: Minor Problem: In previous release you could specify a custom metrics reporter like so: Properties config = new Properties(); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.mycompany.MetricReporter"); config.put("custom-key-for-metric-reporter", "value"); >From 0.11.0.0 onwards this is no longer possible, as you have to specify >consumer.custom-key-for-metric-reporter or >producer.custom-key-for-metric-reporter otherwise it's stripped out of the >configuration. So, if you wish to use a metrics reporter and to collect producer and consumer metrics, as well as kafka-streams metrics, that you would need to specify 3 distinct configs: 1) consumer.custom-key-for-metric-reporter 2) producer.custom-key-for-metric-reporter 3) custom-key-for-metric-reporter This appears to be a regression. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6060) Add workload generation capabilities to Trogdor
[ https://issues.apache.org/jira/browse/KAFKA-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6060. --- Resolution: Fixed Fix Version/s: 1.1.0 Issue resolved by pull request 4073 https://github.com/apache/kafka/pull/4073 > Add workload generation capabilities to Trogdor > --- > > Key: KAFKA-6060 > URL: https://issues.apache.org/jira/browse/KAFKA-6060 > Project: Kafka > Issue Type: Sub-task >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > Fix For: 1.1.0 > > > Add workload generation capabilities to Trogdor -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6060) Add workload generation capabilities to Trogdor
[ https://issues.apache.org/jira/browse/KAFKA-6060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237359#comment-16237359 ] ASF GitHub Bot commented on KAFKA-6060: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/4073 > Add workload generation capabilities to Trogdor > --- > > Key: KAFKA-6060 > URL: https://issues.apache.org/jira/browse/KAFKA-6060 > Project: Kafka > Issue Type: Sub-task >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > > Add workload generation capabilities to Trogdor -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.
[ https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237358#comment-16237358 ] kaushik srinivas commented on KAFKA-6165: - Sure will try to reduce the heap size to 12gb. Initially the config was 8gb heap. But then observed outOfMemory issues more frequently. Actually it was consuming around 10gb heap, that was the reason heap was increased to 16gb. > Kafka Brokers goes down with outOfMemoryError. > -- > > Key: KAFKA-6165 > URL: https://issues.apache.org/jira/browse/KAFKA-6165 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 > Environment: DCOS cluster with 4 agent nodes and 3 masters. > agent machine config : > RAM : 384 GB > DISK : 4TB >Reporter: kaushik srinivas >Priority: Major > Attachments: kafka_config.txt, stderr_broker1.txt, > stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt > > > Performance testing kafka with end to end pipe lines of, > Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1 > Kafka Data Producer -> kafka -> flume -> hdfs -- stream2 > stream1 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > stream2 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > Some important Kafka Configuration : > "BROKER_MEM": "32768"(32GB) > "BROKER_JAVA_HEAP": "16384"(16GB) > "BROKER_COUNT": "3" > "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB) > "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB) > "KAFKA_NUM_PARTITIONS": "20" > "BROKER_DISK_SIZE": "5000" (5GB) > "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB) > "KAFKA_LOG_RETENTION_BYTES": "50"(5GB) > Data Producer to kafka Throughput: > message rate : 5 lakhs messages/sec approx across all the 3 brokers and > topics/partitions. > message size : approx 300 to 400 bytes. > Issues observed with this configs: > Issue 1: > stack trace: > [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to > unrecoverable I/O error while handling produce request: > (kafka.server.ReplicaManager) > kafka.common.KafkaStorageException: I/O exception in append to log > 'store_sales-16' > at kafka.log.Log.append(Log.scala:349) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393) > at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330) > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425) > at kafka.server.KafkaApis.handle(KafkaApis.scala:78) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Map failed > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940) > at > kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116) > at > kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159) >
[jira] [Commented] (KAFKA-6110) Warning when running the broker on Windows
[ https://issues.apache.org/jira/browse/KAFKA-6110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237236#comment-16237236 ] huxihx commented on KAFKA-6110: --- Seems it's a duplicate of [KAFKA-6156|https://issues.apache.org/jira/browse/KAFKA-6156]. > Warning when running the broker on Windows > -- > > Key: KAFKA-6110 > URL: https://issues.apache.org/jira/browse/KAFKA-6110 > Project: Kafka > Issue Type: Bug > Environment: Windows 10 VM >Reporter: Vahid Hashemian >Priority: Minor > > *This issue exists in 1.0.0-RC2.* > The following warning appears in the broker log at startup: > {code} > [2017-10-23 15:29:49,370] WARN Error processing > kafka.log:type=LogManager,name=LogDirectoryOffline,logDirectory=C:\tmp\kafka-logs > (com.yammer.metrics.reporting.JmxReporter) > javax.management.MalformedObjectNameException: Invalid character ':' in value > part of property > at javax.management.ObjectName.construct(ObjectName.java:618) > at javax.management.ObjectName.(ObjectName.java:1382) > at > com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395) > at > com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516) > at > com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491) > at > com.yammer.metrics.core.MetricsRegistry.newGauge(MetricsRegistry.java:79) > at > kafka.metrics.KafkaMetricsGroup$class.newGauge(KafkaMetricsGroup.scala:80) > at kafka.log.LogManager.newGauge(LogManager.scala:50) > at kafka.log.LogManager$$anonfun$6.apply(LogManager.scala:117) > at kafka.log.LogManager$$anonfun$6.apply(LogManager.scala:116) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at kafka.log.LogManager.(LogManager.scala:116) > at kafka.log.LogManager$.apply(LogManager.scala:799) > at kafka.server.KafkaServer.startup(KafkaServer.scala:222) > at > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) > at kafka.Kafka$.main(Kafka.scala:92) > at kafka.Kafka.main(Kafka.scala) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3049) VerifiableProperties does not respect 'default' properties of underlying java.util.Properties instance
[ https://issues.apache.org/jira/browse/KAFKA-3049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237209#comment-16237209 ] Jeffrey Olchovy commented on KAFKA-3049: Saw this was updated. I believe the existing PR can still merge cleanly, but let me know if you want an updated patch / PR. > VerifiableProperties does not respect 'default' properties of underlying > java.util.Properties instance > -- > > Key: KAFKA-3049 > URL: https://issues.apache.org/jira/browse/KAFKA-3049 > Project: Kafka > Issue Type: Bug > Components: config, core >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Jeffrey Olchovy >Priority: Minor > Labels: easyfix > > When retrieving values from the underlying {{Properties}} instance with the > {{getString}}, {{get}}, etc. methods of a {{VerifiableProperties}} > instance, a call to the underlying {{Properties.containsKey}} method is made. > This method will not search the default properties values of the instance, > rendering any default properties defined on the {{Properties}} instance > useless. > A practical example is shown below: > {noformat} > // suppose we have a base, default set of properties to supply to all > consumer groups > val baseProps = new Properties > baseProps.setProperty("zookeeper.connect", "localhost:2181/kafka") > baseProps.setProperty("zookeeper.connection.timeout.ms", "2000") > // additional we have discrete properties instances for each consumer group > that utilize these defaults > val groupProps1 = new Properties(baseProps) > groupProps1.setProperty("group.id", "test-1") > val groupProps2 = new Properties(baseProps) > groupProps2.setProperty("group.id", "test-2") > // when attempting to create an instance of a high-level Consumer with the > above properties an exception will be thrown due to the aforementioned > problem description > java.lang.IllegalArgumentException: requirement failed: Missing required > property 'zookeeper.connect' > at scala.Predef$.require(Predef.scala:233) > at > kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177) > at kafka.utils.ZKConfig.(ZkUtils.scala:879) > at kafka.consumer.ConsumerConfig.(ConsumerConfig.scala:100) > at kafka.consumer.ConsumerConfig.(ConsumerConfig.scala:104) > // however, the groupProps instances will return the correct value for > "zookeeper.connect" when using `Properties.getProperty` > assert(groupProps1.getProperty("zookeeper.connect", "localhost:2181/kafka")) > assert(groupProps2.getProperty("zookeeper.connect", "localhost:2181/kafka")) > {noformat} > I believe it is worthwhile for Kafka to respect the default properties > feature of {{java.util.Properties}}, and further, that Kafka should > discourage the use of the methods on {{Properties}} that are inherited from > {{Hashtable}} (e.g. {{containsKey}}). One can argue that > {{VerifiableProperties}} is not 'correct' due to this behavior, but a user > can always workaround this by creating discrete instances of {{Properties}} > with a set of default properties manually added to each instance. However, > this is inconvenient and may only encourage the use of the discouraged > {{Hashtable}} methods like {{putAll}}. > Two proposed solutions follow: > 1. Do not delegate to the {{Properties.containsKey}} method during the > invocation of {{VerifiableProperties.containsKey}}. One can use a null check > in conjunction with {{getProperty}} in its place. > 2. Treat the underlying {{Properties}} instance as immutable and assign the > result of {{Properties.stringPropertyNames()}} to a member of > {{VerifiableProperties}}. One can check this set of known, available property > names, which respects the optional default properties, when > {{VerifiableProperties.containsKey}} is invoked. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.
[ https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237205#comment-16237205 ] huxihx commented on KAFKA-6165: --- Could you try to decrease the heap size a little bit? 16GB seems to be quite large. > Kafka Brokers goes down with outOfMemoryError. > -- > > Key: KAFKA-6165 > URL: https://issues.apache.org/jira/browse/KAFKA-6165 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 > Environment: DCOS cluster with 4 agent nodes and 3 masters. > agent machine config : > RAM : 384 GB > DISK : 4TB >Reporter: kaushik srinivas >Priority: Major > Attachments: kafka_config.txt, stderr_broker1.txt, > stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt > > > Performance testing kafka with end to end pipe lines of, > Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1 > Kafka Data Producer -> kafka -> flume -> hdfs -- stream2 > stream1 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > stream2 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > Some important Kafka Configuration : > "BROKER_MEM": "32768"(32GB) > "BROKER_JAVA_HEAP": "16384"(16GB) > "BROKER_COUNT": "3" > "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB) > "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB) > "KAFKA_NUM_PARTITIONS": "20" > "BROKER_DISK_SIZE": "5000" (5GB) > "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB) > "KAFKA_LOG_RETENTION_BYTES": "50"(5GB) > Data Producer to kafka Throughput: > message rate : 5 lakhs messages/sec approx across all the 3 brokers and > topics/partitions. > message size : approx 300 to 400 bytes. > Issues observed with this configs: > Issue 1: > stack trace: > [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to > unrecoverable I/O error while handling produce request: > (kafka.server.ReplicaManager) > kafka.common.KafkaStorageException: I/O exception in append to log > 'store_sales-16' > at kafka.log.Log.append(Log.scala:349) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393) > at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330) > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425) > at kafka.server.KafkaApis.handle(KafkaApis.scala:78) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Map failed > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940) > at > kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116) > at > kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159) > at kafka.log.Log.roll(Log.scala:771) > at kafka.log.Log.maybeRoll(Log.scala:742) > at kafka.log.Log.append(Log.scala:405) > ... 22 more > Ca
[jira] [Commented] (KAFKA-6165) Kafka Brokers goes down with outOfMemoryError.
[ https://issues.apache.org/jira/browse/KAFKA-6165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16237189#comment-16237189 ] kaushik srinivas commented on KAFKA-6165: - java version "1.8.0_112" Java(TM) SE Runtime Environment (build 1.8.0_112-b15) Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode) > Kafka Brokers goes down with outOfMemoryError. > -- > > Key: KAFKA-6165 > URL: https://issues.apache.org/jira/browse/KAFKA-6165 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 > Environment: DCOS cluster with 4 agent nodes and 3 masters. > agent machine config : > RAM : 384 GB > DISK : 4TB >Reporter: kaushik srinivas >Priority: Major > Attachments: kafka_config.txt, stderr_broker1.txt, > stderr_broker2.txt, stdout_broker1.txt, stdout_broker2.txt > > > Performance testing kafka with end to end pipe lines of, > Kafka Data Producer -> kafka -> spark streaming -> hdfs -- stream1 > Kafka Data Producer -> kafka -> flume -> hdfs -- stream2 > stream1 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > stream2 kafka configs : > No of topics : 10 > No of partitions : 20 for all the topics > Some important Kafka Configuration : > "BROKER_MEM": "32768"(32GB) > "BROKER_JAVA_HEAP": "16384"(16GB) > "BROKER_COUNT": "3" > "KAFKA_MESSAGE_MAX_BYTES": "112"(1MB) > "KAFKA_REPLICA_FETCH_MAX_BYTES": "1048576"(1MB) > "KAFKA_NUM_PARTITIONS": "20" > "BROKER_DISK_SIZE": "5000" (5GB) > "KAFKA_LOG_SEGMENT_BYTES": "5000",(50MB) > "KAFKA_LOG_RETENTION_BYTES": "50"(5GB) > Data Producer to kafka Throughput: > message rate : 5 lakhs messages/sec approx across all the 3 brokers and > topics/partitions. > message size : approx 300 to 400 bytes. > Issues observed with this configs: > Issue 1: > stack trace: > [2017-11-03 00:56:28,484] FATAL [Replica Manager on Broker 0]: Halting due to > unrecoverable I/O error while handling produce request: > (kafka.server.ReplicaManager) > kafka.common.KafkaStorageException: I/O exception in append to log > 'store_sales-16' > at kafka.log.Log.append(Log.scala:349) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443) > at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240) > at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407) > at > kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393) > at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330) > at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:425) > at kafka.server.KafkaApis.handle(KafkaApis.scala:78) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Map failed > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940) > at > kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:116) > at > kafka.log.AbstractIndex$$anonfun$resize$1.apply(AbstractIndex.scala:106) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:106) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(AbstractIndex.scala:160) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160) > at > kafka.log.AbstractIndex$$anonfun$trimToValidSize$1.apply(AbstractIndex.scala:160) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:159) > at kafka.log.Log.roll(Log.scala:771) > at kafka.log.Log.maybeRoll(Log.scal