[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626793#comment-16626793
 ] 

ASF GitHub Bot commented on KAFKA-7403:
---

vahidhashemian opened a new pull request #5690: KAFKA-7403: Follow-up fix for 
KAFKA-4682 (KIP-211) to correct some edge cases
URL: https://github.com/apache/kafka/pull/5690
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jon Lee
>Priority: Blocker
> Fix For: 2.1.0
>
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  

[jira] [Commented] (KAFKA-5066) KafkaMetricsConfig properties and description notably missing from documentation

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626775#comment-16626775
 ] 

ASF GitHub Bot commented on KAFKA-5066:
---

lindong28 closed pull request #5563: KAFKA-5066: Add KafkaMetricsConfig (Yammer 
metrics reporters) props to documentation
URL: https://github.com/apache/kafka/pull/5563
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala 
b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
index ad9eb20f3e7..a3a21bdbca0 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsConfig.scala
@@ -20,7 +20,8 @@
 
 package kafka.metrics
 
-import kafka.utils.{VerifiableProperties, CoreUtils}
+import kafka.server.{Defaults, KafkaConfig}
+import kafka.utils.{CoreUtils, VerifiableProperties}
 
 class KafkaMetricsConfig(props: VerifiableProperties) {
 
@@ -28,10 +29,12 @@ class KafkaMetricsConfig(props: VerifiableProperties) {
* Comma-separated list of reporter types. These classes should be on the
* classpath and will be instantiated at run-time.
*/
-  val reporters = 
CoreUtils.parseCsvList(props.getString("kafka.metrics.reporters", ""))
+  val reporters = 
CoreUtils.parseCsvList(props.getString(KafkaConfig.KafkaMetricsReporterClassesProp,
+Defaults.KafkaMetricReporterClasses))
 
   /**
* The metrics polling interval (in seconds).
*/
-  val pollingIntervalSecs = 
props.getInt("kafka.metrics.polling.interval.secs", 10)
+  val pollingIntervalSecs = 
props.getInt(KafkaConfig.KafkaMetricsPollingIntervalSecondsProp,
+Defaults.KafkaMetricsPollingIntervalSeconds)
 }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1bc9707fa0b..90225024bfc 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -203,6 +203,11 @@ object Defaults {
   val MetricReporterClasses = ""
   val MetricRecordingLevel = Sensor.RecordingLevel.INFO.toString()
 
+
+  /** * Kafka Yammer Metrics Reporter Configuration ***/
+  val KafkaMetricReporterClasses = ""
+  val KafkaMetricsPollingIntervalSeconds = 10
+
   /** * SSL configuration ***/
   val SslProtocol = SslConfigs.DEFAULT_SSL_PROTOCOL
   val SslEnabledProtocols = SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS
@@ -410,6 +415,10 @@ object KafkaConfig {
   val MetricReporterClassesProp: String = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
   val MetricRecordingLevelProp: String = 
CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG
 
+  /** * Kafka Yammer Metrics Reporters Configuration ***/
+  val KafkaMetricsReporterClassesProp = "kafka.metrics.reporters"
+  val KafkaMetricsPollingIntervalSecondsProp = 
"kafka.metrics.polling.interval.secs"
+
   /**  Common Security Configuration */
   val PrincipalBuilderClassProp = 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
 
@@ -720,6 +729,17 @@ object KafkaConfig {
   val MetricReporterClassesDoc = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
   val MetricRecordingLevelDoc = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC
 
+
+  /** * Kafka Yammer Metrics Reporter Configuration ***/
+  val KafkaMetricsReporterClassesDoc = "A list of classes to use as Yammer 
metrics custom reporters." +
+" The reporters should implement 
kafka.metrics.KafkaMetricsReporter trait. If a client wants" +
+" to expose JMX operations on a custom reporter, the custom reporter needs 
to additionally implement an MBean" +
+" trait that extends kafka.metrics.KafkaMetricsReporterMBean 
trait so that the registered MBean is compliant with" +
+" the standard MBean convention."
+
+  val KafkaMetricsPollingIntervalSecondsDoc = s"The metrics polling interval 
(in seconds) which can be used" +
+s" in $KafkaMetricsReporterClassesProp implementations."
+
   /**  Common Security Configuration */
   val PrincipalBuilderClassDoc = 
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
 
@@ -945,6 +965,10 @@ object KafkaConfig {
   .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, 
LOW, MetricReporterClassesDoc)
   .define(MetricRecordingLevelProp, STRING, Defaults.MetricRecordingLevel, 
LOW, MetricRecordingLevelDoc)
 
+  /** * Kafka Yammer Metrics Reporter Configuration for docs 
***/
+  .define(KafkaMetricsReporterClassesProp, LIST, 
Defaults.KafkaMetricReporterClasses, LOW, KafkaMetricsReporterClassesDoc)
+  

[jira] [Updated] (KAFKA-5066) KafkaMetricsConfig properties and description notably missing from documentation

2018-09-24 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-5066:

Fix Version/s: 2.1.0

> KafkaMetricsConfig properties and description notably missing from 
> documentation
> 
>
> Key: KAFKA-5066
> URL: https://issues.apache.org/jira/browse/KAFKA-5066
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Ryan P
>Priority: Major
> Fix For: 2.1.0
>
>
> `KafkaMetrics` implementations do not appear to be exposed to all the Yammer 
> metrics exposed to implementations of the `KafkaMetricsReporter` 
> Currently the docs only cover the `metric.reporters` which allows clients to 
> configure a `MetricsReporter` plugin. Clients are then disappointed to learn 
> that this affords them access to only a small subset of metrics. 
> Proper monitoring of the broker requires access to the Yammer metrics which 
> clients can gain access to with a `KafkaMetricsReporter` plugin. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-24 Thread Vahid Hashemian (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626774#comment-16626774
 ] 

Vahid Hashemian commented on KAFKA-7403:


[~hachikuji] Thanks for the pointer. I will try to submit a PR with that fix, 
but please feel free to take over (since you proposed the solution).

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jon Lee
>Priority: Blocker
> Fix For: 2.1.0
>
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at 

[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2018-09-24 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5863:
--
Description: 
Here is the call chain:
{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}

  was:
Here is the call chain:

{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}


> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-09-24 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626682#comment-16626682
 ] 

Matthias J. Sax commented on KAFKA-7432:


I tend to agree with John, that batching async calls (if possible), would be an 
implementation details (maybe exposes with some configs, if necessary). At 
least, we should consider this in the design discussion – if it does not fit we 
can still work on two independent features.

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7437) Store leader epoch in offset commit metadata

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626625#comment-16626625
 ] 

ASF GitHub Bot commented on KAFKA-7437:
---

hachikuji opened a new pull request #5689: KAFKA-7437; Persist leader epoch in 
offset commit metadata
URL: https://github.com/apache/kafka/pull/5689
 
 
   This patch implements the changes described in KIP-320 for the persistence 
of leader epoch information in the offset commit protocol.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Store leader epoch in offset commit metadata
> 
>
> Key: KAFKA-7437
> URL: https://issues.apache.org/jira/browse/KAFKA-7437
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> This patch implements the changes described in KIP-320 for the persistence of 
> leader epoch information in the offset commit metadata: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626533#comment-16626533
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---

vvcephei opened a new pull request #5688: KAFKA-7223: Part 3 preview
URL: https://github.com/apache/kafka/pull/5688
 
 
   "preview" of Part 3 of suppression.
   
   This PR is provided to aid understanding of #5687.
   
   To keep the diff legible, I dropped all the test changes (so the tests will 
actually not pass on this PR).
   
   After #5687 is merged, I will revamp this one to become Part 3
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> --
>
> Key: KAFKA-7223
> URL: https://issues.apache.org/jira/browse/KAFKA-7223
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626454#comment-16626454
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---

vvcephei opened a new pull request #5687: KAFKA-7223: add unit tests in 
preparation for suppression
URL: https://github.com/apache/kafka/pull/5687
 
 
   This is Part 2 of suppression. 
   Part 1 was https://github.com/apache/kafka/pull/5567
   
   In an effort to control the scope of the review, I'm sending this PR with 
just the unit tests for buffered suppression.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> --
>
> Key: KAFKA-7223
> URL: https://issues.apache.org/jira/browse/KAFKA-7223
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times

2018-09-24 Thread Nikolay Izhikov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626440#comment-16626440
 ] 

Nikolay Izhikov commented on KAFKA-7277:


[~vvcephei] Tests now green. Please, review the PR.

> Migrate Streams API to Duration instead of longMs times
> ---
>
> Key: KAFKA-7277
> URL: https://issues.apache.org/jira/browse/KAFKA-7277
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: kip, newbie
>
> Right now Streams API unversally represents time as ms-since-unix-epoch.
> There's nothing wrong, per se, with this, but Duration is more ergonomic for 
> an API.
> What we don't want is to present a heterogeneous API, so we need to make sure 
> the whole Streams API is in terms of Duration.
>  
> Implementation note: Durations potentially worsen memory pressure and gc 
> performance, so internally, we will still use longMs as the representation. 
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626419#comment-16626419
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---

guozhangwang closed pull request #5567: KAFKA-7223: Suppress API with only 
immediate emit
URL: https://github.com/apache/kafka/pull/5567
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index bdd6dc3b37a..293bc6b7a86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -389,6 +389,16 @@
  */
  KStream toStream(final KeyValueMapper mapper);
 
+/**
+ * Suppress some updates from this changelog stream, determined by the 
supplied {@link Suppressed} configuration.
+ *
+ * This controls what updates downstream table and stream operations will 
receive.
+ *
+ * @param suppressed Configuration object determining what, if any, 
updates to suppress
+ * @return A new KTable with the desired suppression characteristics.
+ */
+KTable suppress(final Suppressed suppressed);
+
 /**
  * Create a new {@code KTable} by transforming the value of each record in 
this {@code KTable} into a new value
  * (with possibly a new type), with default serializers, deserializers, 
and state store.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
new file mode 100644
index 000..7488ef6ff37
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import 
org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl;
+import 
org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder;
+import 
org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl;
+import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl;
+
+import java.time.Duration;
+
+public interface Suppressed {
+
+/**
+ * Marker interface for a buffer configuration that is "strict" in the 
sense that it will strictly
+ * enforce the time bound and never emit early.
+ */
+interface StrictBufferConfig extends BufferConfig {
+
+}
+
+interface BufferConfig> {
+/**
+ * Create a size-constrained buffer in terms of the maximum number of 
keys it will store.
+ */
+static BufferConfig maxRecords(final long recordLimit) {
+return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE);
+}
+
+/**
+ * Set a size constraint on the buffer in terms of the maximum number 
of keys it will store.
+ */
+BC withMaxRecords(final long recordLimit);
+
+/**
+ * Create a size-constrained buffer in terms of the maximum number of 
bytes it will use.
+ */
+static BufferConfig maxBytes(final long byteLimit) {
+return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit);
+}
+
+/**
+ * Set a size constraint on the buffer, the maximum number of bytes it 
will use.
+ */
+BC withMaxBytes(final long byteLimit);
+
+/**
+ * Create a buffer unconstrained by size (either keys or bytes).
+ *
+ * As a result, the buffer will consume as much memory as it needs, 
dictated by the time bound.
+ *
+ * If there isn't enough heap available to meet the demand, the 
application will encounter an
+ * {@link OutOfMemoryError} and shut down (not guaranteed to be a 
graceful exit). Also, note that
+ * JVM processes under extreme 

[jira] [Commented] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running

2018-09-24 Thread Suman B N (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626414#comment-16626414
 ] 

Suman B N commented on KAFKA-7412:
--

Even I have used the same client, broker, and code, but all the time there is 
an exception - org.apache.kafka.common.errors.TimeoutException. Re-run again 
and see if you can reproduce.

> Bug prone response from producer.send(ProducerRecord, Callback) if Kafka 
> broker is not running
> --
>
> Key: KAFKA-7412
> URL: https://issues.apache.org/jira/browse/KAFKA-7412
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.0.0
>Reporter: Michal Turek
>Priority: Major
> Attachments: metadata_when_kafka_is_stopped.png
>
>
> Hi there, I have probably found a bug in Java Kafka producer client.
> Scenario & current behavior:
> - Start Kafka broker, single instance.
> - Start application that produces messages to Kafka.
> - Let the application to load partitions for a topic to warm up the producer, 
> e.g. send a message to Kafka. I'm not sure if this is necessary step, but our 
> code does it.
> - Gracefully stop the Kafka broker.
> - Application logs now contains "org.apache.kafka.clients.NetworkClient: 
> [Producer clientId=...] Connection to node 0 could not be established. Broker 
> may not be available." so the client is aware about the Kafka unavailability.
> - Trigger the producer to send a message using 
> KafkaProducer.send(ProducerRecord, Callback) method.
> - The callback that notifies business code receives non-null RecordMetadata 
> and null Exception after request.timeout.ms. The metadata contains offset -1 
> which is value of ProduceResponse.INVALID_OFFSET.
> Expected behavior:
> - If the Kafka is not running and the message is not appended to the log, the 
> callback should contain null RecordMetadata and non-null Exception. At least 
> I subjectively understand the Javadoc this way, "exception on production 
> error" in simple words.
> - Developer that is not aware of this behavior and that doesn't test for 
> offset -1, may consider the message as successfully send and properly acked 
> by the broker.
> Known workaround
> - Together with checking for non-null exception in the callback, add another 
> condition for ProduceResponse.INVALID_OFFSET.
> {noformat}
> try {
> producer.send(record, (metadata, exception) -> {
> if (metadata != null) {
> if (metadata.offset() != 
> ProduceResponse.INVALID_OFFSET) {
> // Success
> } else {
> // Failure
> }
> } else {
> // Failure
> }
> });
> } catch (Exception e) {
> // Failure
> }
> {noformat}
> Used setup
> - Latest Kafka 2.0.0 for both broker and Java client.
> - Originally found with broker 0.11.0.1 and client 2.0.0.
> - Code is analogy of the one in Javadoc of KafkaProducer.send().
> - Used producer configuration (others use defaults).
> {noformat}
> bootstrap.servers = "localhost:9092"
> client.id = "..."
> acks = "all"
> retries = 1
> linger.ms = "20"
> compression.type = "lz4"
> request.timeout.ms = 5000 # The same behavior is with default, this is to 
> speed up the tests
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-24 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626228#comment-16626228
 ] 

Jason Gustafson commented on KAFKA-7403:


[~vahid] I was looking at this as part of KAFKA-7437. I think when setting the 
expiration timestamp for v1, we can use 
`getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)`. The code from old versions 
appears to already handle this value correctly by using the commit timestamp 
plus the retention time when loading the cache.

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jon Lee
>Priority: Blocker
> Fix For: 2.1.0
>
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> 

[jira] [Updated] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-24 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7403:
---
Priority: Blocker  (was: Major)

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jon Lee
>Priority: Blocker
> Fix For: 2.1.0
>
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[scala-library-2.11.12.jar:?]
> 

[jira] [Updated] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

2018-09-24 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7403:
---
Fix Version/s: 2.1.0

> Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
> -
>
> Key: KAFKA-7403
> URL: https://issues.apache.org/jira/browse/KAFKA-7403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jon Lee
>Priority: Major
> Fix For: 2.1.0
>
>
> I am currently trying broker upgrade from 0.11 to 2.0 with some patches 
> including KIP-211/KAFKA-4682. After the upgrade, however, applications with 
> 0.10.2 Kafka clients failed with the following error:
> {code:java}
> 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. 
> org.apache.kafka.common.KafkaException: Unexpected error in commit: The 
> server experienced an unexpected error when processing the request at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
>  ~[kafka-clients-0.10.2.86.jar:?] at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
>  ~[kafka-clients-0.10.2.86.jar:?]
> {code}
> From my reading of the code, it looks like the following happened:
>  # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets 
> the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
>  # In the 2.0 broker code, upon receiving an OffsetCommitRequest with 
> DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the 
> "expireTimestamp" field of OffsetAndMetadata to None.
>  # Later in the code path, GroupMetadataManager.offsetCommitValue() expects 
> OffsetAndMetadata to have a non-empty "expireTimestamp" field if the 
> inter.broker.protocol.version is < KAFKA_2_1_IV0.
>  # However, the inter.broker.protocol.version was set to "1.0" prior to the 
> upgrade, and as a result, the following code in offsetCommitValue() raises an 
> error because expireTimestamp is None:
> {code:java}
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.get){code}
>  
> Here is the stack trace for the broker side error
> {code:java}
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
> at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324)
>  ~[kafka_2.11-2.0.0.10.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
> ~[scala-library-2.11.12.jar:?]
> at 
> 

[jira] [Assigned] (KAFKA-7437) Store leader epoch in offset commit metadata

2018-09-24 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-7437:
--

Assignee: Jason Gustafson

> Store leader epoch in offset commit metadata
> 
>
> Key: KAFKA-7437
> URL: https://issues.apache.org/jira/browse/KAFKA-7437
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> This patch implements the changes described in KIP-320 for the persistence of 
> leader epoch information in the offset commit metadata: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7437) Store leader epoch in offset commit metadata

2018-09-24 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-7437:
--

 Summary: Store leader epoch in offset commit metadata
 Key: KAFKA-7437
 URL: https://issues.apache.org/jira/browse/KAFKA-7437
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


This patch implements the changes described in KIP-320 for the persistence of 
leader epoch information in the offset commit metadata: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7435) Consider standardizing the config object pattern on interface/implementation.

2018-09-24 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626117#comment-16626117
 ] 

John Roesler commented on KAFKA-7435:
-

Thanks for the context, [~guozhang].

I agree it would not have been attractive with Java 7 interfaces.

Note that the "flexible implementation" argument is actually the justification 
for Suppressed being an interface in KAFKA-7223 / KIP-328 / 
[https://github.com/apache/kafka/pull/5567] . That changeset is what got me 
thinking about applying the pattern more broadly. If we are comfortable using 
interfaces on an as-needed basis, then we don't need to do anything at all!

> Consider standardizing the config object pattern on interface/implementation.
> -
>
> Key: KAFKA-7435
> URL: https://issues.apache.org/jira/browse/KAFKA-7435
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
> Fix For: 3.0.0
>
>
> Currently, the majority of Streams's config objects are structured as a 
> "external" builder class (with protected state) and an "internal" subclass 
> exposing getters to the state. This is serviceable, but there is an 
> alternative we can consider: to use an interface for the external API and the 
> implementation class for the internal one.
> Advantages:
>  * we could use private state, which improves maintainability
>  * the setters and getters would all be defined in the same class, improving 
> readability
>  * users browsing the public API would be able to look at an interface that 
> contains less extraneous internal details than the current class
>  * there is more flexibility in implementation
> Alternatives
>  * instead of external-class/internal-subclass, we could use an external 
> *final* class with package-protected state and an internal accessor class 
> (not a subclass, obviously). This would make it impossible for users to try 
> and create custom subclasses of our config objects, which is generally not 
> allowed already, but is currently a runtime class cast exception.
> Example implementation: [https://github.com/apache/kafka/pull/5677]
> This change would break binary, but not source, compatibility, so the 
> earliest we could consider it is 3.0.
> To be clear, I'm *not* saying this *should* be done, just calling for a 
> discussion. Otherwise, I'd make a KIP.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5462) Add a configuration for users to specify a template for building a custom principal name

2018-09-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar reassigned KAFKA-5462:


Assignee: Manikumar

> Add a configuration for users to specify a template for building a custom 
> principal name
> 
>
> Key: KAFKA-5462
> URL: https://issues.apache.org/jira/browse/KAFKA-5462
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.10.2.1
>Reporter: Koelli Mungee
>Assignee: Manikumar
>Priority: Major
>
> Add a configuration for users to specify a template for building a custom 
> principal name.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7430) Improve Transformer interface JavaDoc

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626089#comment-16626089
 ] 

ASF GitHub Bot commented on KAFKA-7430:
---

guozhangwang closed pull request #5675: KAFKA-7430: Improve Transformer 
interface JavaDoc
URL: https://github.com/apache/kafka/pull/5675
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
index 0ab34699cf7..b7487b8e13d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
@@ -71,9 +71,13 @@
  * attached} to this operator can be accessed and modified
  * arbitrarily (cf. {@link ProcessorContext#getStateStore(String)}).
  * 
- * If more than one output record should be forwarded downstream {@link 
ProcessorContext#forward(Object, Object)}
+ * If only one record should be forward downstream, {@code transform} can 
return a new {@link KeyValue}. If
+ * more than one output record should be forwarded downstream, {@link 
ProcessorContext#forward(Object, Object)}
  * and {@link ProcessorContext#forward(Object, Object, To)} can be used.
- * If record should not be forwarded downstream, {@code transform} can 
return {@code null}.
+ * If no record should be forwarded downstream, {@code transform} can 
return {@code null}.
+ *
+ * Note that returning a new {@link KeyValue} is merely for convenience. 
The same can be achieved by using
+ * {@link ProcessorContext#forward(Object, Object)} and returning {@code 
null}.
  *
  * @param key the key for the record
  * @param value the value for the record


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Transformer interface JavaDoc
> -
>
> Key: KAFKA-7430
> URL: https://issues.apache.org/jira/browse/KAFKA-7430
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.2, 0.11.0.3, 1.1.1, 2.0.0
>Reporter: Eugen Feller
>Assignee: Eugen Feller
>Priority: Trivial
>  Labels: stream
>
> Currently Transformer JavaDoc mentions that it is possible to use both 
> ctx.forward() and returning a KeyValue(). It would be great if we could 
> mention that returning a KeyValue is merely a convenience thing. In other 
> words, everything can be achieved using ctx.forward():
> "return new KeyValue()" and "ctx.forward(); return null;" are equivalent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7435) Consider standardizing the config object pattern on interface/implementation.

2018-09-24 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626080#comment-16626080
 ] 

Guozhang Wang commented on KAFKA-7435:
--

John, thanks for the proposal.

We did talk about this approach when introducing the config object for the 
first time, but since J7 is not dropped at that time which does not support 
static function on interface we have to ditch this idea.

Personally I think it is a good-to-have, but probably not must-have given its 
compatibility implications comparing the listed advantages. If we encountered 
some cases where, for example, "flexibility in implementation" is definitely 
needed maybe we can propose this change then?

> Consider standardizing the config object pattern on interface/implementation.
> -
>
> Key: KAFKA-7435
> URL: https://issues.apache.org/jira/browse/KAFKA-7435
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
> Fix For: 3.0.0
>
>
> Currently, the majority of Streams's config objects are structured as a 
> "external" builder class (with protected state) and an "internal" subclass 
> exposing getters to the state. This is serviceable, but there is an 
> alternative we can consider: to use an interface for the external API and the 
> implementation class for the internal one.
> Advantages:
>  * we could use private state, which improves maintainability
>  * the setters and getters would all be defined in the same class, improving 
> readability
>  * users browsing the public API would be able to look at an interface that 
> contains less extraneous internal details than the current class
>  * there is more flexibility in implementation
> Alternatives
>  * instead of external-class/internal-subclass, we could use an external 
> *final* class with package-protected state and an internal accessor class 
> (not a subclass, obviously). This would make it impossible for users to try 
> and create custom subclasses of our config objects, which is generally not 
> allowed already, but is currently a runtime class cast exception.
> Example implementation: [https://github.com/apache/kafka/pull/5677]
> This change would break binary, but not source, compatibility, so the 
> earliest we could consider it is 3.0.
> To be clear, I'm *not* saying this *should* be done, just calling for a 
> discussion. Otherwise, I'd make a KIP.
> Thoughts?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-2471) Replicas Order and Leader out of sync

2018-09-24 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-2471.
--
Resolution: Auto Closed

Closing inactive issue.  Please reopen if the issue still exists in newer 
versions.

> Replicas Order and Leader out of sync
> -
>
> Key: KAFKA-2471
> URL: https://issues.apache.org/jira/browse/KAFKA-2471
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.2.1
>Reporter: Manish Sharma
>Priority: Major
>
> Our 2 kafka brokers ( 1 & 5) were rebooted due to hypervisor going down and I 
> think we encountered a similar
> issue that was discussed in thread "Problem with node after restart no 
> partitions?".  The resulting JIRA is closed without conclusions or
> recovery steps. 
> Our Brokers 5 and 1 were also running zookeeper of our cluster (along with 
> broker 2),
> we are running kafka version 0.8.2.1
> After doing a controlled restarts over all brokers a few times our cluster 
> seems ok now.
> But there are a some topics that have replicas out of sync with Leaders.
> Partition 2 below has Leader 5 and replicas order should be 5,1 
> {code}
> Topic:2015-01-12PartitionCount:3ReplicationFactor:2 
> Configs:
> Topic: 2015-01-12   Partition: 0Leader: 4   Replicas: 4,3 
>   Isr: 3,4
> Topic: 2015-01-12   Partition: 1Leader: 0   Replicas: 0,4 
>   Isr: 0,4
> Topic: 2015-01-12   Partition: 2Leader: 5   Replicas: 1,5 
>   Isr: 5
> {code}
> I tried reassigning partition 2 replicas to broker 5 (leader) and broker : 0
> Now partition reassignment is stuck for more than a day. 
> %) /usr/local/kafka/bin/kafka-reassign-partitions.sh --zookeeper 
> kafka-trgt05:2182 --reassignment-json-file 2015-01-12_2.json --verify
> Status of partition reassignment:
> Reassignment of partition [2015-01-12,2] is still in progress
> And In zookeeper, reassign_partitions is empty..
> [zk: kafka-trgt05:2182(CONNECTED) 2] ls /admin/reassign_partitions
> []
> This seems like a bug being triggered, that leaves the cluster in unhealthy 
> state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6989) Support Async Processing in Streams

2018-09-24 Thread sam (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626058#comment-16626058
 ] 

sam commented on KAFKA-6989:


[~virgilp] 

> num-stream-threads just allows you to process more than one partition in 
>parallel, in a single process. It's still synchronous, in that you can't have 
>multiple messages processed in parallel, from the same partition. 

 

Thanks, very helpful. That should be explicit in the documentation, i.e. that 
it's not currently async.

 

> at the end of processing, you need to use your own kafka producer for writing 
>back the result, and there are some pitfalls /misconfigurations that may 
>happen there


Yes we tried this, and we ended up reimplementing akka-streams.

 

> This ticket is about adding this sort of async processing as a "first class" 
>stream processor in Kafka Streams. 

Yes it's a good idea.

 

> Support Async Processing in Streams
> ---
>
> Key: KAFKA-6989
> URL: https://issues.apache.org/jira/browse/KAFKA-6989
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Today Kafka Streams use a single-thread per task architecture to achieve 
> embarrassing parallelism and good isolation. However there are a couple 
> scenarios where async processing may be preferable:
> 1) External resource access or heavy IOs with high-latency. Suppose you need 
> to access a remote REST api, read / write to an external store, or do a heavy 
> disk IO operation that may result in high latency. Current threading model 
> would block any other records before this record's done, waiting on the 
> remote call / IO to finish.
> 2) Robust failure handling with retries. Imagine the app-level processing of 
> a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an 
> external system, and this call failed), and failed records are moved into a 
> separate "retry" topic. How can you process such failed records in a scalable 
> way? For example, imagine you need to implement a retry policy such as "retry 
> with exponential backoff". Here, you have the problem that 1. you can't 
> really pause processing a single record because this will pause the 
> processing of the full stream (bottleneck!) and 2. there is no 
> straight-forward way to "sort" failed records based on their "next retry 
> time" (think: priority queue).
> 3) Delayed processing. One use case is delaying re-processing (e.g. "delay 
> re-processing this event for 5 minutes") as mentioned in 2), another is for 
> implementing a scheduler: e.g. do some additional operations later based on 
> this processed record. based on Zalando Dublin, for example, are implementing 
> a distributed web crawler. Note that although this feature can be handled in 
> punctuation, it is not well aligned with our current offset committing 
> behavior, which always advance the offset once the record has been done 
> traversing the topology.
> I'm thinking of two options to support this feature:
> 1. Make the commit() mechanism more customizable to users for them to 
> implement multi-threading processing themselves: users can always do async 
> processing in the Processor API by spawning a thread-poll, e.g. but the key 
> is that the offset to be committed should be only advanced with such async 
> processing is done. This is a light-weight approach: we provide all the 
> pieces and tools, and users stack them up to build their own LEGOs.
> 2. Provide an general API to do async processing in Processor API, and take 
> care of the offsets committing internally. This is a heavy-weight approach: 
> the API may not cover all async scenarios, but it is a easy way to cover the 
> rest majority scenarios, and users do not need to worry of internal 
> implementation details such as offsets and fault tolerance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7436) overriding auto.offset.reset to none for executor

2018-09-24 Thread Sanjay Kumar (JIRA)
Sanjay Kumar created KAFKA-7436:
---

 Summary: overriding auto.offset.reset to none for executor
 Key: KAFKA-7436
 URL: https://issues.apache.org/jira/browse/KAFKA-7436
 Project: Kafka
  Issue Type: Bug
Reporter: Sanjay Kumar


Hi All,

I am setting overriding auto.offset.reset to "latest", but its throwing me 
warning saying overriding auto.offset.reset to none for executor. 

For example, upon shutting down the stream application or an unexpected 
failure, how will be able to retrieve the previous offset.

Any help on this would be highly appreciated. looking forward for your valuable 
suggestions.

Thanks in advance.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7435) Consider standardizing the config object pattern on interface/implementation.

2018-09-24 Thread John Roesler (JIRA)
John Roesler created KAFKA-7435:
---

 Summary: Consider standardizing the config object pattern on 
interface/implementation.
 Key: KAFKA-7435
 URL: https://issues.apache.org/jira/browse/KAFKA-7435
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
 Fix For: 3.0.0


Currently, the majority of Streams's config objects are structured as a 
"external" builder class (with protected state) and an "internal" subclass 
exposing getters to the state. This is serviceable, but there is an alternative 
we can consider: to use an interface for the external API and the 
implementation class for the internal one.

Advantages:
 * we could use private state, which improves maintainability
 * the setters and getters would all be defined in the same class, improving 
readability
 * users browsing the public API would be able to look at an interface that 
contains less extraneous internal details than the current class
 * there is more flexibility in implementation

Alternatives
 * instead of external-class/internal-subclass, we could use an external 
*final* class with package-protected state and an internal accessor class (not 
a subclass, obviously). This would make it impossible for users to try and 
create custom subclasses of our config objects, which is generally not allowed 
already, but is currently a runtime class cast exception.

Example implementation: [https://github.com/apache/kafka/pull/5677]

This change would break binary, but not source, compatibility, so the earliest 
we could consider it is 3.0.

To be clear, I'm *not* saying this *should* be done, just calling for a 
discussion. Otherwise, I'd make a KIP.

Thoughts?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment

2018-09-24 Thread Sandor Murakozi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625945#comment-16625945
 ] 

Sandor Murakozi commented on KAFKA-6794:


[~viktorsomogyi] you can build on my half-ready solution available in 
[https://github.com/smurakozi/kafka/tree/KAFKA-6794]

> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Sandor Murakozi
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment

2018-09-24 Thread Sandor Murakozi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625935#comment-16625935
 ] 

Sandor Murakozi commented on KAFKA-6794:


[~viktorsomogyi] I think I won't be able to work on it for a while. Please feel 
free to reassign.

> Support for incremental replica reassignment
> 
>
> Key: KAFKA-6794
> URL: https://issues.apache.org/jira/browse/KAFKA-6794
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Sandor Murakozi
>Priority: Major
>
> Say you have a replication factor of 4 and you trigger a reassignment which 
> moves all replicas to new brokers. Now 8 replicas are fetching at the same 
> time which means you need to account for 8 times the current producer load 
> plus the catch-up replication. To make matters worse, the replicas won't all 
> become in-sync at the same time; in the worst case, you could have 7 replicas 
> in-sync while one is still catching up. Currently, the old replicas won't be 
> disabled until all new replicas are in-sync. This makes configuring the 
> throttle tricky since ISR traffic is not subject to it.
> Rather than trying to bring all 4 new replicas online at the same time, a 
> friendlier approach would be to do it incrementally: bring one replica 
> online, bring it in-sync, then remove one of the old replicas. Repeat until 
> all replicas have been changed. This would reduce the impact of a 
> reassignment and make configuring the throttle easier at the cost of a slower 
> overall reassignment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-09-24 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625919#comment-16625919
 ] 

John Roesler commented on KAFKA-7432:
-

Hi [~sams],

This sounds like a valuable optimization. 

I agree that it's not the same request, but it seems like we'll get the best 
final result by considering this and KAFKA-6989 together in one design. Would 
it be ok with you to move this request into KAFKA-6989 for that reason ? Then, 
we could be sure that any KIP coming out would consider both batching and async.

 

In case you're looking for an immediate solution, I'd recommend using the 
Processor API to define a custom Processor that uses a state store to save up 
records until you get the desired number, and then `context.forward` them as a 
list. After the map, you could have a reciprocal processor to turn the list 
back into individual records. I think the implementation of your feature would 
look something like this in practice, so you could also contribute valuable 
experience to the KIP discussion.

Thanks,

-John

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-24 Thread shadi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625887#comment-16625887
 ] 

shadi commented on KAFKA-1194:
--

Thanks for the feedback.

I tried to delete some log files beyond certain date manually on a cluster of 
three nodes. I took down the first node, deleted log files, and took it back 
up. I then took down the second node, deleted some log files, however it was 
unable to bounce as i was hit by this error:

_Replica xxx for partition xxx reset its fetch offset from x to current 
leader 1's latest offset xxx (kafka.server.ReplicaFetcherThread)_

_Error getting offset for partition x to broker 1 
(kafka.server.ReplicaFetcherThread)_

_java.lang.IllegalStateException: Compaction for partition  cannot be 
aborted and paused since it is in LogCleaningPaused state._

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I 

[jira] [Updated] (KAFKA-7434) DeadLetterQueueReporter throws NPE if transform throws NPE

2018-09-24 Thread Michal Borowiecki (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-7434:
-
Description: 
A NPE thrown from a transform in a connector configured with

errors.deadletterqueue.context.headers.enable=true

causes DeadLetterQueueReporter to break with a NPE.
{code}
Executing stage 'TRANSFORMATION' with class 
'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
{topic='', partition=1, offset=0, timestamp=1537370573366, 
timestampType=CreateTime}. (org.apache.kafka.connect.runtime.errors.LogReporter)
java.lang.NullPointerException
Task threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)
at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
 

This is caused by populateContextHeaders only checking if the Throwable is not 
null, but not checking that the message in the Throwable is not null before 
trying to serialize the message:

[https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177]
{code:java}
if (context.error() != null) {
     headers.add(ERROR_HEADER_EXCEPTION, 
toBytes(context.error().getClass().getName()));
     headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, 
toBytes(context.error().getMessage()));
{code}
toBytes throws an NPE if passed null as the parameter.

 

  was:
A NPE thrown from a transform in a connector configured with

errors.deadletterqueue.context.headers.enable=true

causes DeadLetterQueueReporter to break with a NPE.
{quote}{{Executing stage 'TRANSFORMATION' with class 
'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
\{topic='', partition=1, offset=0, timestamp=1537370573366, 
timestampType=CreateTime}. 
(org.apache.kafka.connect.runtime.errors.LogReporter)}}
{{java.lang.NullPointerException}}
{{Task threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)}}
{{java.lang.NullPointerException}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)}}
{{ at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)}}
{{ at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)}}
{{ at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)}}
{{ at 

[jira] [Updated] (KAFKA-7434) DeadLetterQueueReporter throws NPE if transform throws NPE

2018-09-24 Thread Michal Borowiecki (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-7434:
-
Description: 
A NPE thrown from a transform in a connector configured with

errors.deadletterqueue.context.headers.enable=true

causes DeadLetterQueueReporter to break with a NPE.
{quote}{{Executing stage 'TRANSFORMATION' with class 
'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
\{topic='', partition=1, offset=0, timestamp=1537370573366, 
timestampType=CreateTime}. 
(org.apache.kafka.connect.runtime.errors.LogReporter)}}
{{java.lang.NullPointerException}}
{{Task threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)}}
{{java.lang.NullPointerException}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)}}
{{ at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)}}
{{ at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)}}
{{ at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)}}
{{ at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)}}
{{ at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)}}
{{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
{{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}}
{{ at java.lang.Thread.run(Thread.java:748)}}{quote}
 

This is caused by populateContextHeaders only checking if the Throwable is not 
null, but not checking that the message in the Throwable is not null before 
trying to serialize the message:

[https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177]

if (context.error() != null) {
     headers.add(ERROR_HEADER_EXCEPTION, 
toBytes(context.error().getClass().getName()));
     headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, 
toBytes(context.error().getMessage()));

toBytes throws an NPE if passed null as the parameter.

 

  was:
A NPE thrown from a transform in a connector configured with

errors.deadletterqueue.context.headers.enable=true

causes DeadLetterQueueReporter to break with a NPE.
{quote}{{Executing stage 'TRANSFORMATION' with class 
'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
\{topic='', partition=1, offset=0, timestamp=1537370573366, 
timestampType=CreateTime}. 
(org.apache.kafka.connect.runtime.errors.LogReporter)}}
 {{java.lang.NullPointerException}}
 {{Task threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)}}
 {{java.lang.NullPointerException}}
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)
at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at 

[jira] [Assigned] (KAFKA-7434) DeadLetterQueueReporter throws NPE if transform throws NPE

2018-09-24 Thread Michal Borowiecki (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki reassigned KAFKA-7434:


Assignee: Michal Borowiecki

> DeadLetterQueueReporter throws NPE if transform throws NPE
> --
>
> Key: KAFKA-7434
> URL: https://issues.apache.org/jira/browse/KAFKA-7434
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
> Environment: jdk 8
>Reporter: Michal Borowiecki
>Assignee: Michal Borowiecki
>Priority: Major
>
> A NPE thrown from a transform in a connector configured with
> errors.deadletterqueue.context.headers.enable=true
> causes DeadLetterQueueReporter to break with a NPE.
> {quote}{{Executing stage 'TRANSFORMATION' with class 
> 'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
> \{topic='', partition=1, offset=0, timestamp=1537370573366, 
> timestampType=CreateTime}. 
> (org.apache.kafka.connect.runtime.errors.LogReporter)}}
>  {{java.lang.NullPointerException}}
>  {{Task threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)}}
>  {{java.lang.NullPointerException}}
> at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)
> at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)
> at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)
> at 
> org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
> at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
> at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
> at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748){quote}
>  
> This is caused by populateContextHeaders only checking if the Throwable is 
> not null, but not checking that the message in the Throwable is not null 
> before trying to serialize the message:
> [https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177]
> if (context.error() != null) {
>      headers.add(ERROR_HEADER_EXCEPTION, 
> toBytes(context.error().getClass().getName()));
>      headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, 
> toBytes(context.error().getMessage()));
> toBytes throws an NPE if passed null as the parameter.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7434) DeadLetterQueueReporter throws NPE if transform throws NPE

2018-09-24 Thread Michal Borowiecki (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Borowiecki updated KAFKA-7434:
-
Description: 
A NPE thrown from a transform in a connector configured with

errors.deadletterqueue.context.headers.enable=true

causes DeadLetterQueueReporter to break with a NPE.
{quote}{{Executing stage 'TRANSFORMATION' with class 
'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
\{topic='', partition=1, offset=0, timestamp=1537370573366, 
timestampType=CreateTime}. 
(org.apache.kafka.connect.runtime.errors.LogReporter)}}
 {{java.lang.NullPointerException}}
 {{Task threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)}}
 {{java.lang.NullPointerException}}
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)
at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)
at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748){quote}
 

This is caused by populateContextHeaders only checking if the Throwable is not 
null, but not checking that the message in the Throwable is not null before 
trying to serialize the message:

[https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177]

if (context.error() != null) {
     headers.add(ERROR_HEADER_EXCEPTION, 
toBytes(context.error().getClass().getName()));
     headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, 
toBytes(context.error().getMessage()));

toBytes throws an NPE if passed null as the parameter.

 

  was:
A NPE thrown from a transform in a connector configured with

errors.deadletterqueue.context.headers.enable=true

causes DeadLetterQueueReporter to break with a NPE.
{quote}{{Executing stage 'TRANSFORMATION' with class 
'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
\{topic='', partition=1, offset=0, timestamp=1537370573366, 
timestampType=CreateTime}. 
(org.apache.kafka.connect.runtime.errors.LogReporter)}}
{{java.lang.NullPointerException}}
{{Task threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)}}
{{java.lang.NullPointerException}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)}}
{{ at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)}}
{{ at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)}}
{{ at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)}}
{{ at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)}}
{{ at 

[jira] [Created] (KAFKA-7434) DeadLetterQueueReporter throws NPE if transform throws NPE

2018-09-24 Thread Michal Borowiecki (JIRA)
Michal Borowiecki created KAFKA-7434:


 Summary: DeadLetterQueueReporter throws NPE if transform throws NPE
 Key: KAFKA-7434
 URL: https://issues.apache.org/jira/browse/KAFKA-7434
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
 Environment: jdk 8
Reporter: Michal Borowiecki


A NPE thrown from a transform in a connector configured with

errors.deadletterqueue.context.headers.enable=true

causes DeadLetterQueueReporter to break with a NPE.
{quote}{{Executing stage 'TRANSFORMATION' with class 
'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
\{topic='', partition=1, offset=0, timestamp=1537370573366, 
timestampType=CreateTime}. 
(org.apache.kafka.connect.runtime.errors.LogReporter)}}
{{java.lang.NullPointerException}}
{{Task threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask)}}
{{java.lang.NullPointerException}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)}}
{{ at 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)}}
{{ at 
org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)}}
{{ at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)}}
{{ at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)}}
{{ at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)}}
{{ at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)}}
{{ at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)}}
{{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
{{ at java.util.concurrent.FutureTask.run(FutureTask.java:266)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)}}
{{ at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}}
{{ at java.lang.Thread.run(Thread.java:748)}}
{quote}
 

This is caused by populateContextHeaders only checking if the Throwable is not 
null, but not checking that the message in the Throwable is not null before 
trying to serialize the message:

[https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177]

if (context.error() != null) {
    headers.add(ERROR_HEADER_EXCEPTION, 
toBytes(context.error().getClass().getName()));
    headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, 
toBytes(context.error().getMessage()));



toBytes throws an NPE if passed null as the parameter.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-24 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625600#comment-16625600
 ] 

M. Manna edited comment on KAFKA-1194 at 9/24/18 10:11 AM:
---

The best thing to do is to check the following:

1) Set the default value of log.segment.bytes to something slightly smaller 
e.g. 20 MB, 
 2) Disable log.cleaner.enable (i.e. false) - This should ensure that you don't 
have Log.scala segments opened accidentally by the brokers. In fact, once the 
cleaner is disabled, and the segments are rolled over - we should be able to 
delete them manually.
 3) Allow rollover of new log files.
 4) Now manually delete the files using some scripts.

This is still not something I would recommend, but if Windows Operation is a 
MUST (without using containers), you want to try this out.

Regards,


was (Author: manme...@gmail.com):
The best thing to do is to check the following:

1) Set the default value of log.segment.bytes to something slightly smaller 
e.g. 20 MB, 
2) Disable log.cleaner.enable (i.e. false)
3) Allow rollover of new log files.
4) Now manually delete the files using some scripts.

This is still not something I would recommend, but if Windows Operation is a 
MUST (without using containers), you want to try this out.

Regards,

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> 

[jira] [Commented] (KAFKA-7433) Introduce broker options in TopicCommand to use AdminClient

2018-09-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625601#comment-16625601
 ] 

ASF GitHub Bot commented on KAFKA-7433:
---

viktorsomogyi opened a new pull request #5683: WIP - KAFKA-7433: Introduce 
broker options in TopicCommand to use AdminClient
URL: https://github.com/apache/kafka/pull/5683
 
 
   The PR adds --bootstrap-server and --admin.config options to TopicCommand 
and implements an alternative, AdminClient based way of topic management.
   
   As testing I've duplicated the existing tests and made them working with the 
AdminClient options.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Introduce broker options in TopicCommand to use AdminClient
> ---
>
> Key: KAFKA-7433
> URL: https://issues.apache.org/jira/browse/KAFKA-7433
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 2.1.0
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>Priority: Major
>  Labels: kip
>
> This task aim to add --bootstrap-servers and --admin.config options which 
> enable kafka.admin.TopicCommand to work with the Java based AdminClient.
> Ideally KAFKA-5561 might replace this task but as an incremental step until 
> that succeeds it might be enough just to add these options to the existing 
> command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-09-24 Thread M. Manna (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625600#comment-16625600
 ] 

M. Manna commented on KAFKA-1194:
-

The best thing to do is to check the following:

1) Set the default value of log.segment.bytes to something slightly smaller 
e.g. 20 MB, 
2) Disable log.cleaner.enable (i.e. false)
3) Allow rollover of new log files.
4) Now manually delete the files using some scripts.

This is still not something I would recommend, but if Windows Operation is a 
MUST (without using containers), you want to try this out.

Regards,

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7433) Introduce broker options in TopicCommand to use AdminClient

2018-09-24 Thread Viktor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi updated KAFKA-7433:
--
Component/s: (was: core)
 admin

> Introduce broker options in TopicCommand to use AdminClient
> ---
>
> Key: KAFKA-7433
> URL: https://issues.apache.org/jira/browse/KAFKA-7433
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 2.1.0
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>Priority: Major
>  Labels: kip
>
> This task aim to add --bootstrap-servers and --admin.config options which 
> enable kafka.admin.TopicCommand to work with the Java based AdminClient.
> Ideally KAFKA-5561 might replace this task but as an incremental step until 
> that succeeds it might be enough just to add these options to the existing 
> command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7433) Introduce broker options in TopicCommand to use AdminClient

2018-09-24 Thread Viktor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi updated KAFKA-7433:
--
Description: 
This task aim to add --bootstrap-servers and --admin.config options which 
enable kafka.admin.TopicCommand to work with the Java based AdminClient.
Ideally KAFKA-5561 might replace this task but as an incremental step until 
that succeeds it might be enough just to add these options to the existing 
command.

  was:This task aim to add --bootstrap-servers and --admin.config options which 
enable TopicCommand to work with the Java based AdminClient.


> Introduce broker options in TopicCommand to use AdminClient
> ---
>
> Key: KAFKA-7433
> URL: https://issues.apache.org/jira/browse/KAFKA-7433
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>Priority: Major
>  Labels: kip
>
> This task aim to add --bootstrap-servers and --admin.config options which 
> enable kafka.admin.TopicCommand to work with the Java based AdminClient.
> Ideally KAFKA-5561 might replace this task but as an incremental step until 
> that succeeds it might be enough just to add these options to the existing 
> command.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7433) Introduce broker options in TopicCommand to use AdminClient

2018-09-24 Thread Viktor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi updated KAFKA-7433:
--
Description: This task aim to add --bootstrap-servers and --admin.config 
options which enable TopicCommand to work with the Java based AdminClient.

> Introduce broker options in TopicCommand to use AdminClient
> ---
>
> Key: KAFKA-7433
> URL: https://issues.apache.org/jira/browse/KAFKA-7433
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>Priority: Major
>  Labels: kip
>
> This task aim to add --bootstrap-servers and --admin.config options which 
> enable TopicCommand to work with the Java based AdminClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7433) Introduce broker options in TopicCommand to use AdminClient

2018-09-24 Thread Viktor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi updated KAFKA-7433:
--
Labels: kip  (was: )

> Introduce broker options in TopicCommand to use AdminClient
> ---
>
> Key: KAFKA-7433
> URL: https://issues.apache.org/jira/browse/KAFKA-7433
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>Priority: Major
>  Labels: kip
>
> This task aim to add --bootstrap-servers and --admin.config options which 
> enable TopicCommand to work with the Java based AdminClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7433) Introduce broker options in TopicCommand to use AdminClient

2018-09-24 Thread Viktor Somogyi (JIRA)
Viktor Somogyi created KAFKA-7433:
-

 Summary: Introduce broker options in TopicCommand to use 
AdminClient
 Key: KAFKA-7433
 URL: https://issues.apache.org/jira/browse/KAFKA-7433
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.1.0
Reporter: Viktor Somogyi
Assignee: Viktor Somogyi






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5561) Java based TopicCommand to use the Admin client

2018-09-24 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625590#comment-16625590
 ] 

Viktor Somogyi commented on KAFKA-5561:
---

[~ppatierno] if you don't mind I've renamed this JIRA as your efforts are 
rather to create a Java based command and what I'm working on is to fix up the 
Scala one which is a bit parallel to yours. Hope you don't mind it.

> Java based TopicCommand to use the Admin client
> ---
>
> Key: KAFKA-5561
> URL: https://issues.apache.org/jira/browse/KAFKA-5561
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Hi, 
> as suggested in the https://issues.apache.org/jira/browse/KAFKA-3331, it 
> could be great to have the TopicCommand using the new Admin client instead of 
> the way it works today.
> As pushed by [~gwenshap] in the above JIRA, I'm going to work on it.
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5561) Java based TopicCommand to use the Admin client

2018-09-24 Thread Viktor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi updated KAFKA-5561:
--
Summary: Java based TopicCommand to use the Admin client  (was: Refactor 
TopicCommand to use the Admin client)

> Java based TopicCommand to use the Admin client
> ---
>
> Key: KAFKA-5561
> URL: https://issues.apache.org/jira/browse/KAFKA-5561
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Hi, 
> as suggested in the https://issues.apache.org/jira/browse/KAFKA-3331, it 
> could be great to have the TopicCommand using the new Admin client instead of 
> the way it works today.
> As pushed by [~gwenshap] in the above JIRA, I'm going to work on it.
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-3268) Refactor existing CLI scripts to use KafkaAdminClient

2018-09-24 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625545#comment-16625545
 ] 

Viktor Somogyi edited comment on KAFKA-3268 at 9/24/18 8:58 AM:


[~enothereska] I'll publish a KIP this week about KAFKA-5561 (TopicCommand - PR 
too) and KAFKA-5722 is on its way too. Unfortunately lately I got stuck in 
other things. I'd be very happy if you could pick up KAFKA-5723 or KAFKA-5601, 
I think neither got continued. I'd gladly help in the KIP discussions or in the 
implementation too.


was (Author: viktorsomogyi):
[~enothereska] I'll publish a KIP this week about  (TopicCommand - PR also) and 
 is on its way too. Unfortunately lately I got stuck in other things. I'd be 
very happy if you could pick up  or , I think neither got continued. I'd gladly 
help in the KIP discussions or in the implementation too.

> Refactor existing CLI scripts to use KafkaAdminClient
> -
>
> Key: KAFKA-3268
> URL: https://issues.apache.org/jira/browse/KAFKA-3268
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Viktor Somogyi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-3268) Refactor existing CLI scripts to use KafkaAdminClient

2018-09-24 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625545#comment-16625545
 ] 

Viktor Somogyi edited comment on KAFKA-3268 at 9/24/18 8:57 AM:


[~enothereska] I'll publish a KIP this week about  (TopicCommand - PR also) and 
 is on its way too. Unfortunately lately I got stuck in other things. I'd be 
very happy if you could pick up  or , I think neither got continued. I'd gladly 
help in the KIP discussions or in the implementation too.


was (Author: viktorsomogyi):
[~enothereska] I'll publish a KIP this week about KAFKA-5561 (TopicCommand) and 
KAFKA-5722 is on its way too. Unfortunately lately I got stuck in other things. 
I'd be very happy if you could pick up KAFKA-5723 or KAFKA-5601, I think 
neither got continued. I'd gladly help in the KIP discussions or in the 
implementation too.

> Refactor existing CLI scripts to use KafkaAdminClient
> -
>
> Key: KAFKA-3268
> URL: https://issues.apache.org/jira/browse/KAFKA-3268
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Viktor Somogyi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3268) Refactor existing CLI scripts to use KafkaAdminClient

2018-09-24 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625545#comment-16625545
 ] 

Viktor Somogyi commented on KAFKA-3268:
---

[~enothereska] I'll publish a KIP this week about KAFKA-5561 (TopicCommand) and 
KAFKA-5722 is on its way too. Unfortunately lately I got stuck in other things. 
I'd be very happy if you could pick up KAFKA-5723 or KAFKA-5601, I think 
neither got continued. I'd gladly help in the KIP discussions or in the 
implementation too.

> Refactor existing CLI scripts to use KafkaAdminClient
> -
>
> Key: KAFKA-3268
> URL: https://issues.apache.org/jira/browse/KAFKA-3268
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Viktor Somogyi
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7417) Some topics lost / cannot recover their ISR status following broker crash

2018-09-24 Thread Mikhail Khomenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mikhail Khomenko updated KAFKA-7417:

Summary: Some topics lost / cannot recover their ISR status following 
broker crash  (was: Some replicas cannot become in-sync)

> Some topics lost / cannot recover their ISR status following broker crash
> -
>
> Key: KAFKA-7417
> URL: https://issues.apache.org/jira/browse/KAFKA-7417
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Mikhail Khomenko
>Priority: Major
>
> Hi,
>  we have faced with the next issue - some replicas cannot become in-sync. 
> Distribution of in-sync replicas amongst topics is random. For instance:
> {code:java}
> $ kafka-topics --zookeeper 1.2.3.4:8181 --describe --topic TEST
> Topic:TEST PartitionCount:8 ReplicationFactor:3 Configs:
> Topic: TEST Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 0,1,2
> Topic: TEST Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 0,1,2
> Topic: TEST Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 0,1,2
> Topic: TEST Partition: 3 Leader: 2 Replicas: 0,1,2 Isr: 0,1,2
> Topic: TEST Partition: 4 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2
> Topic: TEST Partition: 5 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
> Topic: TEST Partition: 6 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
> Topic: TEST Partition: 7 Leader: 0 Replicas: 1,0,2 Isr: 0,2{code}
> Files in segment TEST-7 are equal (the same md5sum) on all 3 brokers. Also 
> were checked by kafka.tools.DumpLogSegments - messages are the same.
> We have 3-broker cluster configuration with Confluent Kafka 5.0.0 (it's 
> Apache Kafka 2.0.0).
>  Each broker has the next configuration:
> {code:java}
> advertised.host.name = null
> advertised.listeners = PLAINTEXT://1.2.3.4:9200
> advertised.port = null
> alter.config.policy.class.name = null
> alter.log.dirs.replication.quota.window.num = 11
> alter.log.dirs.replication.quota.window.size.seconds = 1
> authorizer.class.name = 
> auto.create.topics.enable = true
> auto.leader.rebalance.enable = true
> background.threads = 10
> broker.id = 1
> broker.id.generation.enable = true
> broker.interceptor.class = class 
> org.apache.kafka.server.interceptor.DefaultBrokerInterceptor
> broker.rack = null
> client.quota.callback.class = null
> compression.type = producer
> connections.max.idle.ms = 60
> controlled.shutdown.enable = true
> controlled.shutdown.max.retries = 3
> controlled.shutdown.retry.backoff.ms = 5000
> controller.socket.timeout.ms = 3
> create.topic.policy.class.name = null
> default.replication.factor = 3
> delegation.token.expiry.check.interval.ms = 360
> delegation.token.expiry.time.ms = 8640
> delegation.token.master.key = null
> delegation.token.max.lifetime.ms = 60480
> delete.records.purgatory.purge.interval.requests = 1
> delete.topic.enable = true
> fetch.purgatory.purge.interval.requests = 1000
> group.initial.rebalance.delay.ms = 3000
> group.max.session.timeout.ms = 30
> group.min.session.timeout.ms = 6000
> host.name = 
> inter.broker.listener.name = null
> inter.broker.protocol.version = 2.0
> leader.imbalance.check.interval.seconds = 300
> leader.imbalance.per.broker.percentage = 10
> listener.security.protocol.map = 
> PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> listeners = PLAINTEXT://0.0.0.0:9200
> log.cleaner.backoff.ms = 15000
> log.cleaner.dedupe.buffer.size = 134217728
> log.cleaner.delete.retention.ms = 8640
> log.cleaner.enable = true
> log.cleaner.io.buffer.load.factor = 0.9
> log.cleaner.io.buffer.size = 524288
> log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
> log.cleaner.min.cleanable.ratio = 0.5
> log.cleaner.min.compaction.lag.ms = 0
> log.cleaner.threads = 1
> log.cleanup.policy = [delete]
> log.dir = /tmp/kafka-logs
> log.dirs = /var/lib/kafka/data
> log.flush.interval.messages = 9223372036854775807
> log.flush.interval.ms = null
> log.flush.offset.checkpoint.interval.ms = 6
> log.flush.scheduler.interval.ms = 9223372036854775807
> log.flush.start.offset.checkpoint.interval.ms = 6
> log.index.interval.bytes = 4096
> log.index.size.max.bytes = 10485760
> log.message.downconversion.enable = true
> log.message.format.version = 2.0
> log.message.timestamp.difference.max.ms = 9223372036854775807
> log.message.timestamp.type = CreateTime
> log.preallocate = false
> log.retention.bytes = -1
> log.retention.check.interval.ms = 30
> log.retention.hours = 8760
> log.retention.minutes = null
> log.retention.ms = null
> log.roll.hours = 168
> log.roll.jitter.hours = 0
> log.roll.jitter.ms = null
> log.roll.ms = null
> log.segment.bytes = 1073741824
> log.segment.delete.delay.ms = 6
> max.connections.per.ip = 2147483647
> max.connections.per.ip.overrides = 
> 

[jira] [Commented] (KAFKA-6989) Support Async Processing in Streams

2018-09-24 Thread Virgil Palanciuc (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625477#comment-16625477
 ] 

Virgil Palanciuc commented on KAFKA-6989:
-

num-stream-threads just allows you to process more than one partition in 
parallel, in a single process. It's still synchronous, in that you can't have 
multiple messages processed in parallel, from the same partition. 

TBH the entire philosophy of kafka streams is that you increase the parallelism 
by increasing the number of partitions - and this works well when the work that 
you need to do is not massively IO-bound (e.g. you don't need to call an 
external/ 3rd-party HTTP endpoint).  It is possible to do this "manually" by 
launching an async task and pretending that you finished processing your 
message, but it is cumbersome/tedious (e.g. at the end of processing, you need 
to use your own kafka producer for writing back the result, and there are some 
pitfalls /misconfigurations that may happen there). This ticket is about adding 
this sort of async processing as a "first class" stream processor in Kafka 
Streams. 

> Support Async Processing in Streams
> ---
>
> Key: KAFKA-6989
> URL: https://issues.apache.org/jira/browse/KAFKA-6989
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Today Kafka Streams use a single-thread per task architecture to achieve 
> embarrassing parallelism and good isolation. However there are a couple 
> scenarios where async processing may be preferable:
> 1) External resource access or heavy IOs with high-latency. Suppose you need 
> to access a remote REST api, read / write to an external store, or do a heavy 
> disk IO operation that may result in high latency. Current threading model 
> would block any other records before this record's done, waiting on the 
> remote call / IO to finish.
> 2) Robust failure handling with retries. Imagine the app-level processing of 
> a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an 
> external system, and this call failed), and failed records are moved into a 
> separate "retry" topic. How can you process such failed records in a scalable 
> way? For example, imagine you need to implement a retry policy such as "retry 
> with exponential backoff". Here, you have the problem that 1. you can't 
> really pause processing a single record because this will pause the 
> processing of the full stream (bottleneck!) and 2. there is no 
> straight-forward way to "sort" failed records based on their "next retry 
> time" (think: priority queue).
> 3) Delayed processing. One use case is delaying re-processing (e.g. "delay 
> re-processing this event for 5 minutes") as mentioned in 2), another is for 
> implementing a scheduler: e.g. do some additional operations later based on 
> this processed record. based on Zalando Dublin, for example, are implementing 
> a distributed web crawler. Note that although this feature can be handled in 
> punctuation, it is not well aligned with our current offset committing 
> behavior, which always advance the offset once the record has been done 
> traversing the topology.
> I'm thinking of two options to support this feature:
> 1. Make the commit() mechanism more customizable to users for them to 
> implement multi-threading processing themselves: users can always do async 
> processing in the Processor API by spawning a thread-poll, e.g. but the key 
> is that the offset to be committed should be only advanced with such async 
> processing is done. This is a light-weight approach: we provide all the 
> pieces and tools, and users stack them up to build their own LEGOs.
> 2. Provide an general API to do async processing in Processor API, and take 
> care of the offsets committing internally. This is a heavy-weight approach: 
> the API may not cover all async scenarios, but it is a easy way to cover the 
> rest majority scenarios, and users do not need to worry of internal 
> implementation details such as offsets and fault tolerance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7432) API Method on Kafka Streams for processing chunks/batches of data

2018-09-24 Thread sam (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625446#comment-16625446
 ] 

sam commented on KAFKA-7432:


[~mjsax] I see the similarity in use case, but KAFKA-6988 seems to be 
requesting something like this:
{code:java}
builder.stream[String, String]("in").async(numThreads = 
10).map(someCallThatIsSlowButNotCPUIntensive).to("out"){code}
but what we have is an external call that expects a batch of records, e.g. a 
signature like this:
{code:java}
def externalBatchedApiCall(it: List[A]): List[B]{code}

> API Method on Kafka Streams for processing chunks/batches of data
> -
>
> Key: KAFKA-7432
> URL: https://issues.apache.org/jira/browse/KAFKA-7432
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: sam
>Priority: Major
>
> For many situations in Big Data it is preferable to work with a small buffer 
> of records at a go, rather than one record at a time.
> The natural example is calling some external API that supports batching for 
> efficiency.
> How can we do this in Kafka Streams? I cannot find anything in the API that 
> looks like what I want.
> So far I have:
> {{builder.stream[String, String]("my-input-topic") 
> .mapValues(externalApiCall).to("my-output-topic")}}
> What I want is:
> {{builder.stream[String, String]("my-input-topic") .batched(chunkSize = 
> 2000).map(externalBatchedApiCall).to("my-output-topic")}}
> In Scala and Akka Streams the function is called {{grouped}} or {{batch}}. In 
> Spark Structured Streaming we can do 
> {{mapPartitions.map(_.grouped(2000).map(externalBatchedApiCall))}}.
>  
>  
> https://stackoverflow.com/questions/52366623/how-to-process-data-in-chunks-batches-with-kafka-streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6989) Support Async Processing in Streams

2018-09-24 Thread sam (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625444#comment-16625444
 ] 

sam commented on KAFKA-6989:


This ticket is requesting something like this?
{code:java}
builder.stream[String, String]("in").async(numThreads = 
10).map(someCallThatIsSlowButNotCPUIntensive).to("out"){code}
so that someCallThatIsSlowButNotCPUIntensive will run run in say 10 threads.

 

That seems like a good idea.  I don't see an issue with a high-level API call 
like this.  To be honest I actually assumed Kafka Streams would do this kind of 
thing already and that was the point of 
[https://docs.confluent.io/current/streams/developer-guide/config-streams.html#num-stream-threads.]
 Am I misunderstanding something?

> Support Async Processing in Streams
> ---
>
> Key: KAFKA-6989
> URL: https://issues.apache.org/jira/browse/KAFKA-6989
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Today Kafka Streams use a single-thread per task architecture to achieve 
> embarrassing parallelism and good isolation. However there are a couple 
> scenarios where async processing may be preferable:
> 1) External resource access or heavy IOs with high-latency. Suppose you need 
> to access a remote REST api, read / write to an external store, or do a heavy 
> disk IO operation that may result in high latency. Current threading model 
> would block any other records before this record's done, waiting on the 
> remote call / IO to finish.
> 2) Robust failure handling with retries. Imagine the app-level processing of 
> a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an 
> external system, and this call failed), and failed records are moved into a 
> separate "retry" topic. How can you process such failed records in a scalable 
> way? For example, imagine you need to implement a retry policy such as "retry 
> with exponential backoff". Here, you have the problem that 1. you can't 
> really pause processing a single record because this will pause the 
> processing of the full stream (bottleneck!) and 2. there is no 
> straight-forward way to "sort" failed records based on their "next retry 
> time" (think: priority queue).
> 3) Delayed processing. One use case is delaying re-processing (e.g. "delay 
> re-processing this event for 5 minutes") as mentioned in 2), another is for 
> implementing a scheduler: e.g. do some additional operations later based on 
> this processed record. based on Zalando Dublin, for example, are implementing 
> a distributed web crawler. Note that although this feature can be handled in 
> punctuation, it is not well aligned with our current offset committing 
> behavior, which always advance the offset once the record has been done 
> traversing the topology.
> I'm thinking of two options to support this feature:
> 1. Make the commit() mechanism more customizable to users for them to 
> implement multi-threading processing themselves: users can always do async 
> processing in the Processor API by spawning a thread-poll, e.g. but the key 
> is that the offset to be committed should be only advanced with such async 
> processing is done. This is a light-weight approach: we provide all the 
> pieces and tools, and users stack them up to build their own LEGOs.
> 2. Provide an general API to do async processing in Processor API, and take 
> care of the offsets committing internally. This is a heavy-weight approach: 
> the API may not cover all async scenarios, but it is a easy way to cover the 
> rest majority scenarios, and users do not need to worry of internal 
> implementation details such as offsets and fault tolerance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)