[jira] [Updated] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements

2018-12-10 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7523:
---
Description: 
KIP-401: 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756]

I have found that when writing "low level" {{Processors}} and {{Transformers}} 
that are stateful, often I want these processors to "own" one or more state 
stores, the details of which are not important to the business logic of the 
application.  However, when incorporating these into the topologies defined by 
the high level API, using {{KStream::transform}} or {{KStream::process}}, I'm 
forced to specify the stores so the topology is wired up correctly.  This 
creates an unfortunate pattern where the {{TransformerSupplier}} or 
{{ProcessorSupplier,}} who (according to the pattern I've been following) holds 
the information about the name of the state stores, must be defined above the 
"high level" "fluent API"-style pipeline, which makes it hard to understand the 
business logic data flow.

 

What I currently have to do:
{code:java}
TransformerSupplier transformerSupplier = new 
TransformerSupplierWithState(topology, val -> businessLogic(val));
builder.stream("in.topic")
.transform(transformerSupplier, transformerSupplier.stateStoreNames())
.to("out.topic");{code}
I have to both define the {{TransformerSupplier}} above the "fluent block", and 
pass the topology in so I can call {{topology.addStateStore()}} inside the 
{{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what the 
state store names are for that point in the topology. The lambda {{val -> 
businessLogic(val)}} is really what I want to see in-line because that's the 
crux of what is happening, along with the name of some factory method 
describing what the transformer is doing for me internally. This issue is 
obviously exacerbated when the "fluent block" is much longer than this example 
- It gets worse the farther away {{val -> businessLogic(val)}} is from 
{{KStream::transform}}.

 
 An improvement:
{code:java}
builder.stream("in.topic")
.transform(transformerSupplierWithState(topology, val -> 
businessLogic(val)))
.to("out.topic");{code}
Which implies the existence of a {{KStream::transform}} that takes a single 
argument that adheres to this interface:
{code:java}
interface TransformerSupplierWithState {
Transformer get();
String[] stateStoreNames();
}{code}
Or better yet, I wouldn't have to pass in the topology, the caller of 
{{TransformerSupplierWithState}} could also handle the job of "adding" its 
state stores to the topology:
{code:java}
interface TransformerSupplierWithState {
Transformer get();
Map stateStores();
}{code}
Which would enable my ideal:
{code:java}
builder.stream("in.topic")
.transform(transformerSupplierWithState(val -> businessLogic(val)))
.to("out.topic");{code}
I think this would be a huge improvement in the usability of low-level 
processors with the high-level DSL.

Please let me know if I'm missing something as to why this cannot or should not 
happen, or if there is a better forum for this suggestion (presumably it would 
require a KIP?). I'd be happy to build it as well if there is a chance of it 
being merged, it doesn't seem like a huge challenge to me.

  was:
I have found that when writing "low level" {{Processors}} and {{Transformers}} 
that are stateful, often I want these processors to "own" one or more state 
stores, the details of which are not important to the business logic of the 
application.  However, when incorporating these into the topologies defined by 
the high level API, using {{KStream::transform}} or {{KStream::process}}, I'm 
forced to specify the stores so the topology is wired up correctly.  This 
creates an unfortunate pattern where the {{TransformerSupplier}} or 
{{ProcessorSupplier,}} who (according to the pattern I've been following) holds 
the information about the name of the state stores, must be defined above the 
"high level" "fluent API"-style pipeline, which makes it hard to understand the 
business logic data flow.

 

What I currently have to do:
{code:java}
TransformerSupplier transformerSupplier = new 
TransformerSupplierWithState(topology, val -> businessLogic(val));
builder.stream("in.topic")
.transform(transformerSupplier, transformerSupplier.stateStoreNames())
.to("out.topic");{code}
I have to both define the {{TransformerSupplier}} above the "fluent block", and 
pass the topology in so I can call {{topology.addStateStore()}} inside the 
{{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what the 
state store names are for that point in the topology. The lambda {{val -> 
businessLogic(val)}} is really what I want to see in-line because that's the 
crux of what is happening, along with the name of some factory method 
describing what the transformer is 

[jira] [Commented] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence

2018-12-10 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716273#comment-16716273
 ] 

Jun Rao commented on KAFKA-7680:


[~NIzhikov], thanks for your interest. Feel free to assign the Jira to yourself.

> fetching a refilled chunk of log can cause log divergence
> -
>
> Key: KAFKA-7680
> URL: https://issues.apache.org/jira/browse/KAFKA-7680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Major
>
> We use FileRecords.writeTo to send a fetch response for a follower. A log 
> could be truncated and refilled in the middle of the send process (due to 
> leader change). Then it's possible for the follower to append some 
> uncommitted messages followed by committed messages. Those uncommitted 
> messages may never be removed, causing log divergence.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6820) Improve on StreamsMetrics Public APIs

2018-12-10 Thread Nikolay Izhikov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716257#comment-16716257
 ] 

Nikolay Izhikov commented on KAFKA-6820:


Hello, [~vvcephei]

Is this ticket still actual?
I want to pick up it.

> Improve on StreamsMetrics Public APIs
> -
>
> Key: KAFKA-6820
> URL: https://issues.apache.org/jira/browse/KAFKA-6820
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Our current `addLatencyAndThroughputSensor`, `addThroughputSensor` are not 
> very well designed and hence not very user friendly to people to add their 
> customized sensors. We could consider improving on this feature. Some related 
> things to consider:
> 1. Our internal built-in metrics should be independent on these public APIs 
> which are for user customized sensor only. See KAFKA-6819 for related 
> description.
> 2. We could enforce the scopeName possible values, and well document on the 
> sensor hierarchies that would be incurred from the function calls. In this 
> way the library can help closing user's sensors automatically when the 
> corresponding scope (store, task, thread, etc) is being de-constructed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6819) Refactor build-in StreamsMetrics internal implementations

2018-12-10 Thread Nikolay Izhikov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716255#comment-16716255
 ] 

Nikolay Izhikov commented on KAFKA-6819:


Hello, [~vvcephei]

Is this ticket still actual?
I want to pick up it.

> Refactor build-in StreamsMetrics internal implementations
> -
>
> Key: KAFKA-6819
> URL: https://issues.apache.org/jira/browse/KAFKA-6819
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Our current internal implementations of StreamsMetrics and different layered 
> metrics like StreamMetricsThreadImpl, TaskMetrics, NodeMetrics etc are a bit 
> messy nowadays. We could improve on the current situation by doing the 
> following:
> 0. For thread-level metrics, refactor the {{StreamsMetricsThreadImpl}} class 
> to {{ThreadMetrics}} such that a) it does not extend from 
> {{StreamsMetricsImpl}} but just include the {{StreamsMetricsThreadImpl}} as 
> its constructor parameters. And make its constructor, replacing with a static 
> {{addAllSensors(threadName)}} that tries to register all the thread-level 
> sensors for the given thread name.
> 1. Add a static function for each of the built-in sensors of the thread-level 
> metrics in {{ThreadMetrics}} that relies on the internal 
> {{StreamsMetricsConventions}} to get thread level sensor names. If the sensor 
> cannot be found from the internal {{Metrics}} registry, create the sensor 
> on-the-fly.
> 2.a Add a static {{removeAllSensors(threadName)}} function in 
> {{ThreadMetrics}} that tries to de-register all the thread-level metrics for 
> this thread, if there is no sensors then it will be a no-op. In 
> {{StreamThread#close()}} we will trigger this function; and similarly in 
> `TopologyTestDriver` when we close the driver we will also call this function 
> as well. As a result, the {{ThreadMetrics}} class itself would only contain 
> static functions with no member fields at all.
> 2.b We can consider doing the same for {{TaskMetrics}}, {{NodeMetrics}} and 
> {{NamedCacheMetrics}} as well, and add a {{StoreMetrics}} following the 
> similar pattern: although these metrics are not accessed externally to their 
> enclosing class in the future this may be changed as well.
> 3. Then, we only pass {{StreamsMetricsImpl}} around between the internal 
> classes, to access the specific sensor whenever trying to record it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6393) Add tool to view active brokers

2018-12-10 Thread Nikolay Izhikov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716245#comment-16716245
 ] 

Nikolay Izhikov edited comment on KAFKA-6393 at 12/11/18 5:42 AM:
--

Hello, [~hachikuji]

Is this still actual?
I want to pick up and resolve this JIRA.


was (Author: nizhikov):
Helo, [~hachikuji]

Is this still actual?
I want to pick up and resolve this JIRA.

> Add tool to view active brokers
> ---
>
> Key: KAFKA-6393
> URL: https://issues.apache.org/jira/browse/KAFKA-6393
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> It would be helpful to have a tool to view the active brokers in the cluster. 
> For example, it could include the following:
> 1. Broker id and version (maybe detected through ApiVersions request)
> 2. Broker listener information
> 3. Whether broker is online
> 4. Which broker is the active controller
> 5. Maybe some key configs (e.g. inter-broker version and message format 
> version)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6393) Add tool to view active brokers

2018-12-10 Thread Nikolay Izhikov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716245#comment-16716245
 ] 

Nikolay Izhikov commented on KAFKA-6393:


Helo, [~hachikuji]

Is this still actual?
I want to pick up and resolve this JIRA.

> Add tool to view active brokers
> ---
>
> Key: KAFKA-6393
> URL: https://issues.apache.org/jira/browse/KAFKA-6393
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> It would be helpful to have a tool to view the active brokers in the cluster. 
> For example, it could include the following:
> 1. Broker id and version (maybe detected through ApiVersions request)
> 2. Broker listener information
> 3. Whether broker is online
> 4. Which broker is the active controller
> 5. Maybe some key configs (e.g. inter-broker version and message format 
> version)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7680) fetching a refilled chunk of log can cause log divergence

2018-12-10 Thread Nikolay Izhikov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716210#comment-16716210
 ] 

Nikolay Izhikov commented on KAFKA-7680:


Hello, [~junrao], [~hachikuji]

I want to pick up this ticket.
Would you mind it?

> fetching a refilled chunk of log can cause log divergence
> -
>
> Key: KAFKA-7680
> URL: https://issues.apache.org/jira/browse/KAFKA-7680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Major
>
> We use FileRecords.writeTo to send a fetch response for a follower. A log 
> could be truncated and refilled in the middle of the send process (due to 
> leader change). Then it's possible for the follower to append some 
> uncommitted messages followed by committed messages. Those uncommitted 
> messages may never be removed, causing log divergence.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-12-10 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7610.
--
   Resolution: Fixed
 Assignee: Jason Gustafson  (was: Boyang Chen)
Fix Version/s: 2.1.1
   2.2.0

> Detect consumer failures in initial JoinGroup
> -
>
> Key: KAFKA-7610
> URL: https://issues.apache.org/jira/browse/KAFKA-7610
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7610) Detect consumer failures in initial JoinGroup

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715689#comment-16715689
 ] 

ASF GitHub Bot commented on KAFKA-7610:
---

guozhangwang closed pull request #5962: KAFKA-7610; Proactively timeout new 
group members if rebalance is delayed
URL: https://github.com/apache/kafka/pull/5962
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala 
b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 5f16acb6a85..93775182570 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -26,11 +26,11 @@ import kafka.server.DelayedOperation
 private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
   group: GroupMetadata,
   member: MemberMetadata,
-  heartbeatDeadline: Long,
-  sessionTimeout: Long)
-  extends DelayedOperation(sessionTimeout, Some(group.lock)) {
+  deadline: Long,
+  timeoutMs: Long)
+  extends DelayedOperation(timeoutMs, Some(group.lock)) {
 
-  override def tryComplete(): Boolean = 
coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, 
forceComplete _)
-  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, 
heartbeatDeadline)
+  override def tryComplete(): Boolean = 
coordinator.tryCompleteHeartbeat(group, member, deadline, forceComplete _)
+  override def onExpiration() = coordinator.onExpireHeartbeat(group, member, 
deadline)
   override def onComplete() = coordinator.onCompleteHeartbeat()
 }
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index db89f14592f..007c6eea75a 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -600,7 +600,7 @@ class GroupCoordinator(val brokerId: Int,
 case Empty | Dead =>
 case PreparingRebalance =>
   for (member <- group.allMemberMetadata) {
-group.invokeJoinCallback(member, joinError(member.memberId, 
Errors.NOT_COORDINATOR))
+group.maybeInvokeJoinCallback(member, joinError(member.memberId, 
Errors.NOT_COORDINATOR))
   }
 
   joinPurgatory.checkAndComplete(GroupKey(group.groupId))
@@ -674,14 +674,18 @@ class GroupCoordinator(val brokerId: Int,
* Complete existing DelayedHeartbeats for the given member and schedule the 
next one
*/
   private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, 
member: MemberMetadata) {
+completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs)
+  }
+
+  private def completeAndScheduleNextExpiration(group: GroupMetadata, member: 
MemberMetadata, timeoutMs: Long): Unit = {
 // complete current heartbeat expectation
 member.latestHeartbeat = time.milliseconds()
 val memberKey = MemberKey(member.groupId, member.memberId)
 heartbeatPurgatory.checkAndComplete(memberKey)
 
 // reschedule the next heartbeat expiration deadline
-val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
-val delayedHeartbeat = new DelayedHeartbeat(this, group, member, 
newHeartbeatDeadline, member.sessionTimeoutMs)
+val deadline = member.latestHeartbeat + timeoutMs
+val delayedHeartbeat = new DelayedHeartbeat(this, group, member, deadline, 
timeoutMs)
 heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
   }
 
@@ -702,11 +706,23 @@ class GroupCoordinator(val brokerId: Int,
 val memberId = clientId + "-" + group.generateMemberIdSuffix
 val member = new MemberMetadata(memberId, group.groupId, clientId, 
clientHost, rebalanceTimeoutMs,
   sessionTimeoutMs, protocolType, protocols)
+
+member.isNew = true
+
 // update the newMemberAdded flag to indicate that the join group can be 
further delayed
 if (group.is(PreparingRebalance) && group.generationId == 0)
   group.newMemberAdded = true
 
 group.add(member, callback)
+
+// The session timeout does not affect new members since they do not have 
their memberId and
+// cannot send heartbeats. Furthermore, we cannot detect disconnects 
because sockets are muted
+// while the JoinGroup is in purgatory. If the client does disconnect 
(e.g. because of a request
+// timeout during a l

[jira] [Created] (KAFKA-7718) Allow customized header inheritance for stateful operators in DSL

2018-12-10 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7718:


 Summary: Allow customized header inheritance for stateful 
operators in DSL
 Key: KAFKA-7718
 URL: https://issues.apache.org/jira/browse/KAFKA-7718
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


As a follow-up work of 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API,
 we want to provide allow users to customize how record headers are inherited 
while traversing the topology at the DSL layer (at the lower-level Processor 
API layer, users are already capable for customizing and inheriting the headers 
as they forward the records to next processor nodes).

Today the headers are implicitly inherited throughout the topology without any 
modifications within the Streams library. For stateless operators (filter, map, 
etc) this default inheritance policy should be sufficient. For stateful 
operators where multiple input records may be generating a single record (i.e. 
it is an n:1 transformations rather than 1:1 mapping), since we only inherit 
from the triggering record, which would seem to be a "random" choice to the 
users and other records' headers are lost.

I'd propose we extend DSL to allow users to customize the headers inheritance 
policy for stateful operators, namely Joins and Aggregations. It would contain 
two parts:

1) On the DSL layer, I'd suggest we extend `Joined` and `Grouped` control 
object with an additional function that allows users to pass in a lambda 
function (let's say its called HeadersMerger, but name subject to discuss over 
KIP) that takes two Headers object and generated a single Headers object in the 
return value.

2) On the implementation layer, we need to actually store the headers at the 
materialized state store so that they can be retrieved along with the record 
for join / aggregation processor. This would be changing the state store value 
bytes organization and hence better be considered carefully. Then when join / 
aggregate processor is triggered, the Headers of both records will be retrieved 
(one from the triggering record, one read from the materialized state store) 
and then passed to the HeadersMerger. Some low-hanging optimizations can be 
considered though, e.g. if users do not have overridden this interface, then we 
can consider not reading the headers from the other side at all to save IO cost.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6036) Enable logical materialization to physical materialization

2018-12-10 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6036.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

> Enable logical materialization to physical materialization
> --
>
> Key: KAFKA-6036
> URL: https://issues.apache.org/jira/browse/KAFKA-6036
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.2.0
>
>
> Today whenever users specify a queryable store name for KTable, we would 
> always add a physical state store in the translated processor topology.
> For some scenarios, we should consider not physically materialize the KTable 
> but only "logically" materialize it when you have some simple transformation 
> operations or even join operations that generated new KTables, and which 
> needs to be materialized with a state store, you can use the changelog topic 
> of the previous KTable and applies the transformation logic upon restoration 
> instead of creating a new changelog topic. For example:
> {code}
> table1 = builder.table("topic1");
> table2 = table1.filter(..).join(table3); // table2 needs to be materialized 
> for joining
> {code}
> We can actually set the {{getter}} function of table2's materialized store, 
> say {{state2}} to be reading from {{topic1}} and then apply the filter 
> operator, instead of creating a new {{state2-changelog}} topic in this case.
> We can come up with a general internal impl optimizations to determine when 
> to logically materialize, and whether we should actually allow users of DSL 
> to "hint" whether to materialize or not (it then may need a KIP).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7717) Enable security configs in kafka.tools.EndToEndLatency

2018-12-10 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7717:
--

 Summary: Enable security configs in kafka.tools.EndToEndLatency
 Key: KAFKA-7717
 URL: https://issues.apache.org/jira/browse/KAFKA-7717
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski


The [end to end latency 
tool|[http://example.com|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/EndToEndLatency.scala]]
 does not support security configurations for authenticating to a secured 
broker. It only accepts `bootstrap.servers`, rendering it useless against 
SASL-secured clusters



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6144) Allow state stores to serve stale reads during rebalance

2018-12-10 Thread Navinder Brar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715471#comment-16715471
 ] 

Navinder Brar commented on KAFKA-6144:
--

Sure [~NIzhikov] , I will start writing a KIP.

> Allow state stores to serve stale reads during rebalance
> 
>
> Key: KAFKA-6144
> URL: https://issues.apache.org/jira/browse/KAFKA-6144
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Priority: Major
>  Labels: needs-kip
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround is to allow stale data to be read from the state stores when 
> use case allows.
> Relates to KAFKA-6145 - Warm up new KS instances before migrating tasks - 
> potentially a two phase rebalance
> This is the description from KAFKA-6031 (keeping this JIRA as the title is 
> more descriptive):
> {quote}
> Currently reads for a key are served by single replica, which has 2 drawbacks:
>  - if replica is down there is a down time in serving reads for keys it was 
> responsible for until a standby replica takes over
>  - in case of semantic partitioning some replicas might become hot and there 
> is no easy way to scale the read load
> If standby replicas would have endpoints that are exposed in StreamsMetadata 
> it would enable serving reads from several replicas, which would mitigate the 
> above drawbacks. 
> Due to the lag between replicas reading from multiple replicas simultaneously 
> would have weaker (eventual) consistency comparing to reads from single 
> replica. This however should be acceptable tradeoff in many cases.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-10 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715264#comment-16715264
 ] 

Mayuresh Gharat commented on KAFKA-7681:


[~junrao] will try to give a shot at this. 

> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7681) new metric for request thread utilization by request type

2018-12-10 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat reassigned KAFKA-7681:
--

Assignee: Mayuresh Gharat

> new metric for request thread utilization by request type
> -
>
> Key: KAFKA-7681
> URL: https://issues.apache.org/jira/browse/KAFKA-7681
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>Priority: Major
>
> When the request thread pool is saturated, it's often useful to know which 
> type request is using the thread pool the most. It would be useful to add a 
> metric that tracks the fraction of request thread pool usage by request type. 
> This would be equivalent to (request rate) * (request local time ms) / 1000, 
> but will be more direct. This would require a new KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715218#comment-16715218
 ] 

ASF GitHub Bot commented on KAFKA-7549:
---

hachikuji closed pull request #5925: KAFKA-7549: Old ProduceRequest with zstd 
compression does not return error to client
URL: https://github.com/apache/kafka/pull/5925
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index f87090eba6a..9f9de42c866 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.CommonFields;
 import org.apache.kafka.common.protocol.Errors;
@@ -172,6 +173,21 @@ public Builder(short minVersion,
 
 @Override
 public ProduceRequest build(short version) {
+return build(version, true);
+}
+
+// Visible for testing only
+public ProduceRequest buildUnsafe(short version) {
+return build(version, false);
+}
+
+private ProduceRequest build(short version, boolean validate) {
+if (validate) {
+// Validate the given records first
+for (MemoryRecords records : partitionRecords.values()) {
+ProduceRequest.validateRecords(version, records);
+}
+}
 return new ProduceRequest(version, acks, timeout, 
partitionRecords, transactionalId);
 }
 
@@ -210,8 +226,9 @@ private ProduceRequest(short version, short acks, int 
timeout, Map 
createPartitionSizes(Map partitionRecords) {
@@ -231,7 +248,7 @@ public ProduceRequest(Struct struct, short version) {
 Struct partitionResponse = (Struct) partitionResponseObj;
 int partition = partitionResponse.get(PARTITION_ID);
 MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-validateRecords(version, records);
+setFlags(records);
 partitionRecords.put(new TopicPartition(topic, partition), 
records);
 }
 }
@@ -241,32 +258,11 @@ public ProduceRequest(Struct struct, short version) {
 transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
 }
 
-private void validateRecords(short version, MemoryRecords records) {
-if (version >= 3) {
-Iterator iterator = 
records.batches().iterator();
-if (!iterator.hasNext())
-throw new InvalidRecordException("Produce requests with 
version " + version + " must have at least " +
-"one record batch");
-
-MutableRecordBatch entry = iterator.next();
-if (entry.magic() != RecordBatch.MAGIC_VALUE_V2)
-throw new InvalidRecordException("Produce requests with 
version " + version + " are only allowed to " +
-"contain record batches with magic version 2");
-if (version < 7 && entry.compressionType() == 
CompressionType.ZSTD) {
-throw new InvalidRecordException("Produce requests with 
version " + version + " are note allowed to " +
-"use ZStandard compression");
-}
-
-if (iterator.hasNext())
-throw new InvalidRecordException("Produce requests with 
version " + version + " are only allowed to " +
-"contain exactly one record batch");
-idempotent = entry.hasProducerId();
-transactional = entry.isTransactional();
-}
-
-// Note that we do not do similar validation for older versions to 
ensure compatibility with
-// clients which send the wrong magic version in the wrong version of 
the produce request. The broker
-// did not do this validation before, so we maintain that behavior 
here.
+private void setFlags(MemoryRecords records) {
+Iterator iterator = records.batches().iterator();
+MutableRecordBatch entry = iterator.next();
+idempotent = entry.hasProducerId();
+transactional = entry.isTransactional();
 }
 
 /**
@@ -394,6 +390,32 @@ public void clearPartitionRecords() {
 partitionRecords = null;
 }
 

[jira] [Resolved] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-12-10 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7549.

Resolution: Fixed

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-12-10 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715220#comment-16715220
 ] 

Guozhang Wang commented on KAFKA-7678:
--

There are one or two edge cases which can cause record collector to be closed 
multiple times, we have noticed them recently and are thinking about cleanup 
the classes along the calling hierarchy (i.e. from Task Manager -> Task -> 
RecordCollector) for it. One example is:

1) a task is *suspended*, with EOS turned on (like your case), the record 
collector is closed().
2) then the instance got killed (SIGTERM) , which causes all threads to be 
closed, which will then cause all their owned tasks to be *closed*. The same 
record collector close() call will be triggered again.

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1, 2.0.1, 2.1.0
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
> Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2
>
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-10 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715004#comment-16715004
 ] 

Viktor Somogyi commented on KAFKA-7703:
---

[~zsxwing], I just wanted to send a quick update that I have looked at the code 
and reproduced it based on your test and now I'm trying to figure out what's 
the best solution for this. I'll write an update once again when I have some 
solution proposal.

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: Shixiong Zhu
>Assignee: Viktor Somogyi
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7581) Issues in building kafka using gradle on a Ubuntu based docker container

2018-12-10 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714543#comment-16714543
 ] 

点儿郎当 commented on KAFKA-7581:
-

No progress, directly rebuilt version of Kafka 2.0. But there are often 
[java.io.IOException: Too many open files]. What is the problem? Linux unlimit 
has been adjusted, with 12 disks, each 8 T, more than 60,000/qps. Would it be 
impossible to carry the machine? Three 64G memory, 32 boxes of CPU. Checked GC, 
no problem.

> Issues in building kafka using gradle on a Ubuntu based docker container
> 
>
> Key: KAFKA-7581
> URL: https://issues.apache.org/jira/browse/KAFKA-7581
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Blocker
>
> The following issues are seen when kafka is built using gradle on a Ubuntu 
> based docker container:-
> /kafka-gradle/kafka-2.0.0/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:177:
>  File name too long
>  This can happen on some encrypted or legacy file systems. Please see SI-3623 
> for more details.
>  .foreach { txnMetadataCacheEntry =>
>  ^
>  56 warnings found
>  one error found
> > Task :core:compileScala FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
>  Execution failed for task ':core:compileScala'.
>  > Compilation failed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-12-10 Thread Sarvesh Tamba (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714524#comment-16714524
 ] 

Sarvesh Tamba commented on KAFKA-7580:
--

FYI, following are the environment details:-

p006vm10:~ # lscpu
Architecture: ppc64le
Byte Order: Little Endian
CPU(s): 16
On-line CPU(s) list: 0-15
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 16
NUMA node(s): 1
Model: 2.1 (pvr 004b 0201)
Model name: POWER8E (raw), altivec supported
Hypervisor vendor: KVM
Virtualization type: para
L1d cache: 64K
L1i cache: 32K
NUMA node0 CPU(s): 0-15

p006vm10:~ # arch
ppc64le

p006vm10:~ # cat /etc/os-release
NAME="SLES"
VERSION="12-SP2"
VERSION_ID="12.2"
PRETTY_NAME="SUSE Linux Enterprise Server 12 SP2"
ID="sles"
ANSI_COLOR="0;32"
CPE_NAME="cpe:/o:suse:sles:12:sp2"
p006vm10:~ #

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "P

[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-12-10 Thread Sarvesh Tamba (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714530#comment-16714530
 ] 

Sarvesh Tamba commented on KAFKA-7580:
--

To summarize, the "pure virtual method" issue is seen on both Ubuntu 16.04 and 
SLES 12 SP2, both as root & non-root users on both these environments.

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in the unit test code and then test for exception(needs code 
> changes in the unit tests):-
> root@p006vm18:/tmp# chattr +i /tmp/readOnlyD

[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-12-10 Thread Sarvesh Tamba (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714522#comment-16714522
 ] 

Sarvesh Tamba commented on KAFKA-7580:
--

[~mjsax] any plans to check up the issues of "pure virtual method"? Not been 
able to figure out why is this an issue and an intermittent/random one. Can you 
please check?

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in the unit test code and then test for exception(needs code 
> changes in the unit tests):-
> root@p006vm18:/tmp# ch

[jira] [Created] (KAFKA-7716) Unprocessed messages when Broker fails

2018-12-10 Thread Finbarr Naughton (JIRA)
Finbarr Naughton created KAFKA-7716:
---

 Summary: Unprocessed messages when Broker fails
 Key: KAFKA-7716
 URL: https://issues.apache.org/jira/browse/KAFKA-7716
 Project: Kafka
  Issue Type: Bug
  Components: core, streams
Affects Versions: 2.0.1, 1.0.0
Reporter: Finbarr Naughton


This occurs when running on Kubernetes on bare metal.

A Streams application with a single topology listening to two input topics A 
and B. A is read as a GlobalKTable, B as a KStream. The topology joins the 
stream to the GKTable and writes an updated message to topic A. The application 
is configured to use exactly_once processing.

There are three worker nodes. Kafka brokers are deployed as a statefulset on 
the three nodes using the helm chart from here 
-[https://github.com/helm/charts/tree/master/incubator/kafka] 

The application has three instances spread across the three nodes.

During a test, topic A is pre-populated with 50k messages over 5 minutes. Then 
50k messages with the same key-set are sent to topic B over 5 minutes. The 
expected behaviour is that Topic A will contain 50k updated messages 
afterwards. While all brokers are available this is the case, even when one of 
the application pods is deleted.

When a broker fails, however, a few expected updated messages fail to appear on 
topic A despite their existence on topic B.

 

More complete description here - 
[https://stackoverflow.com/questions/53557247/some-events-unprocessed-by-kafka-streams-application-on-kubernetes-on-bare-metal]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7581) Issues in building kafka using gradle on a Ubuntu based docker container

2018-12-10 Thread Sarvesh Tamba (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714498#comment-16714498
 ] 

Sarvesh Tamba commented on KAFKA-7581:
--

Is there any progress on this?

> Issues in building kafka using gradle on a Ubuntu based docker container
> 
>
> Key: KAFKA-7581
> URL: https://issues.apache.org/jira/browse/KAFKA-7581
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Blocker
>
> The following issues are seen when kafka is built using gradle on a Ubuntu 
> based docker container:-
> /kafka-gradle/kafka-2.0.0/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:177:
>  File name too long
>  This can happen on some encrypted or legacy file systems. Please see SI-3623 
> for more details.
>  .foreach { txnMetadataCacheEntry =>
>  ^
>  56 warnings found
>  one error found
> > Task :core:compileScala FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
>  Execution failed for task ':core:compileScala'.
>  > Compilation failed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-12-10 Thread Sarvesh Tamba (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714492#comment-16714492
 ] 

Sarvesh Tamba commented on KAFKA-7580:
--

Created a PR for "Fix for Unit Test 
'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root 
user" at [https://github.com/apache/kafka/pull/6020]

 

The issues of "pure virtual method" should be tracked separately.

> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  drwxr-xr-x 2 root root 4096 Nov 1 04:03 parent/
> Hence the unit test does not throw an exception - "ProcessorStateException" 
> when the readOnly temporary file directory is opened, and the unit test 
> rightly fails for a root user.
> Two approaches for resolving this failing unit test case:-
> 1.) Run the unit tests as non-root users(simplest).
> 2.) If running the unit test as root user, make the temporary file directory 
> as immutable in the unit test code and then test for exception

[jira] [Commented] (KAFKA-7580) Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root user

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714486#comment-16714486
 ] 

ASF GitHub Bot commented on KAFKA-7580:
---

sarveshtamba opened a new pull request #6020: Fix for Unit Test 
'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when run as root 
user(#KAFKA-7580)
URL: https://github.com/apache/kafka/pull/6020
 
 
   Fix for Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' 
fails when run as root user(#KAFKA-7580)
   Refer https://issues.apache.org/jira/browse/KAFKA-7580
   
   gradle unitTest passes successfully with message "BUILD SUCCESSFUL"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unit Test 'shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir' fails when 
> run as root user
> --
>
> Key: KAFKA-7580
> URL: https://issues.apache.org/jira/browse/KAFKA-7580
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1, 2.0.2
> Environment: Ubuntu 16.04.3 LTS
>Reporter: Sarvesh Tamba
>Priority: Minor
>
> Created a non-root user and ran the following command to execute the failiing 
> unit test:-
> ./gradlew streams:unitTest --tests 
> org.apache.kafka.streams.state.internals.RocksDBStoreTest.shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir
> For a root user, the test case fails:-
> =
> > Task :streams:testClasses UP-TO-DATE
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir FAILED
>  java.lang.AssertionError: Expected exception: 
> org.apache.kafka.streams.errors.ProcessorStateException
> 1 test completed, 1 failed
> > Task :streams:unitTest FAILED
> FAILURE: Build failed with an exception.
> * What went wrong:
> Execution failed for task ':streams:unitTest'.
> > There were failing tests. See the report at: 
> > file:///root/sarvesh/kafka-gradle/kafka-2.0.0/streams/build/reports/tests/unitTest/index.html
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or 
> --debug option to get more log output. Run with --scan to get full insights.
> * Get more help at https://help.gradle.org
> BUILD FAILED in 20s
> 26 actionable tasks: 2 executed, 24 up-to-date
> =
> However, for a non-root user the test cass passes as success:-
> =
> > Task :streams:testClasses
> > Task :streams:unitTest
> org.apache.kafka.streams.state.internals.RocksDBStoreTest > 
> shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir PASSED
> BUILD SUCCESSFUL in 45s
> 26 actionable tasks: 4 executed, 22 up-to-date
> =
> The failing unit test - 
> "shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir" creates a temporary 
> file directory and sets it as readOnly. The unit test is intended to throw an 
> exception - "ProcessorStateException", when the readOnly temporary file 
> directory is opened/accessed.
> By default, non-root users opening/accessing readOnly file directory is not 
> allowed and it rightly throws up an error/exception in the unit test(which is 
> the intention of the unit test and it passes for non-root users).
> sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ mkdir /tmp/readOnlyDir/parent
>  mkdir: cannot create directory â/tmp/readOnlyDir/parentâ: Permission denied
>  
>  sar@p006vm18:~/kafka-gradle/kafka-2.0.0$ ll /tmp/readOnlyDir/
>  ls: cannot access '/tmp/readOnlyDir/..': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/.': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/kid': Permission denied
>  ls: cannot access '/tmp/readOnlyDir/child': Permission denied
>  total 0
>  d? ? ? ? ? ? ./
>  d? ? ? ? ? ? ../
>  d? ? ? ? ? ? child/
>  d? ? ? ? ? ? kid/
> However, by default, root user can access any file in the system.:-
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 112
>  dr--rw-rw- 4 root root 4096 Nov 1 03:47 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 child/
>  drwxrwxr-x 2 sar sar 4096 Nov 1 03:47 kid/
>  
>  root@p006vm18:/tmp# mkdir /tmp/readOnlyDir/parent
>  
>  root@p006vm18:/tmp# ll /tmp/readOnlyDir/
>  total 116
>  dr--rw-rw- 5 root root 4096 Nov 1 04:03 ./
>  drwxrwxrwt 24 root root 98304 Nov 1 04:02 ../
>  drwxr-xr-x 2 root root 4096 Nov 1 03:44 chil

[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-12-10 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16714439#comment-16714439
 ] 

Jonathan Santilli commented on KAFKA-7678:
--

Hello [~guozhang] I can confirm the issue about calling the *.close()* method 
out of a null have been solved.

However, I do not know the reason why that was happening (I did not dig enough 
into the code to understand the reason).

Maybe we can create another Jira to explore it, what do you think?

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1, 2.0.1, 2.1.0
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
> Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2
>
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)