[jira] [Commented] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean

2020-02-07 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032739#comment-17032739
 ] 

Ted Yu commented on KAFKA-9504:
---

It seems the closing of metrics is not enough in terms of preventing memory 
leak:
{code}
Utils.closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", 
firstException);
Utils.closeQuietly(metrics, "consumer metrics", firstException);
{code}

> Memory leak in KafkaMetrics registered to MBean
> ---
>
> Key: KAFKA-9504
> URL: https://issues.apache.org/jira/browse/KAFKA-9504
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
>Reporter: Andreas Holmén
>Priority: Major
>
> After close() called on a KafkaConsumer some registered MBeans are not 
> unregistered causing leak.
>  
>  
> {code:java}
> import static 
> org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
> import java.lang.management.ManagementFactory;
> import java.util.HashMap;
> import java.util.Map;
> import javax.management.MBeanServer;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.serialization.ByteArrayDeserializer;
> public class Leaker {
>  private static String bootstrapServers = "hostname:9092";
>  
>  public static void main(String[] args) throws InterruptedException {
>   MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
>   Map props = new HashMap<>();
>   props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
>  
>   int beans = mBeanServer.getMBeanCount();
>   for (int i = 0; i < 100; i++) {
>KafkaConsumer consumer = new KafkaConsumer<>(props, new 
> ByteArrayDeserializer(), new ByteArrayDeserializer());
>consumer.close();
>   }
>   int newBeans = mBeanServer.getMBeanCount();
>   System.out.println("\nbeans delta: " + (newBeans - beans));
>  }
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [KAFKA-557] Add emit on change support for Kafka Streams

2020-02-05 Thread Ted Yu
Thanks for the comments, Matthias.

w.r.t. requirement of an `equals()` implementation, each template type
would have an equals() method. We can use the following code to know
whether it is provided by JVM or provided by user.

boolean customEquals = false;
try {
Class cls = value.getClass().getMethod("equals",
Object.class).getDeclaringClass();
if (!Object.class.equals(cls)) {
customEquals = true;
}
} catch (NoSuchMethodException nsme) {
// equals is always defined, this wouldn't hit
}

The next question is: what if the user doesn't provide equals() method ?
Would we automatically fall back to emit-on-update ?

Cheers

On Tue, Feb 4, 2020 at 1:37 PM Matthias J. Sax  wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> First a high level comment:
>
> Overall, I would like to make one step back, and make sure we are
> discussion on the same level. Originally, I understood this KIP as a
> proposed change of _semantics_, however, given the latest discussion
> it seems it's actually not -- it's more an _optimization_ proposal.
> Hence, we only need to make sure that this optimization does not break
> existing semantics. It this the right way to think about it?
>
> If yes, than it might actually be ok to have different behavior
> depending if there is a materialized KTable or not. So far, we never
> defined a public contract about our emit strategy and it seems this
> KIP does not define one either.
>
> Hence, I don't have as strong of an opinion about sending oldValues
> for example any longer. I guess the question is really, what can we
> implement in a reasonable way.
>
>
>
> Other comments:
>
>
> @Richard:
>
> Can you please add the KIP to the KIP overview table: It's missing
> (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Pro
> posals).
>
>
> @Bruno:
>
> You mentioned caching. I think it's irrelevant (orthogonal) and we can
> discuss this KIP without considering it.
>
>
> @John:
>
> > Even in the source table, we forward the updated record with the
> > higher of the two timestamps. So the example is more like:
>
> That is not correct. Currently, we forward with the smaller
> out-of-order timestamp (changing the timestamp would corrupt the data
> - -- we don't know, because we don't check, if the value is the same or
> a different one, hence, we must emit the out-of-order record as-is).
>
> If we start to do emit-on-change, we also need to emit a new record if
> the timestamp changes due to out-of-order data, hence, we would still
> need to emit  because that give us correct semantics: assume
> you have a filter() and afterward use the filter KTable in a
> stream-table join -- the lower T1 timestamp must be propagated to the
> filtered KTable to ensure that that the stream-table join compute the
> correct result.
>
>
>
> Your point about requiring an `equals()` implementation is actually a
> quite interesting one and boils down to my statement from above about
> "what can we actually implement". What I don't understand is:
>
> > This way, we still don't have to rely on the existence of an
> > equals() method, but if it is there, we can benefit from it.
>
> Your bullet point (2) says it uses `equals()` -- hence, it seems we
> actually to rely on it? Also, how can we detect if there is an
> `equals()` method to do the comparison? Would be fail if we don't have
> `equals()` nor corresponding serializes to do the comparison?
>
>
>
> > Wow, really good catch! Yes, we absolutely need metrics and logs if
> > we're going to drop any records. And, yes, we should propose
> > metrics and logs that are similar to the existing ones when we drop
> > records for other reasons.
>
> I am not sure about this point. In fact, we have already some no-ops
> in Kafka Streams in our join-operators and don't report any of those
> either. Emit-on-change is operator semantics and I don't see why we
> would need to have a metric for it? It seems to be quite different
> compared to dropping late or malformed records.
>
>
> - -Matthias
>
>
>
> On 2/4/20 7:13 AM, Thomas Becker wrote:
> > Thanks John for your thoughtful reply. Some comments inline.
> >
> >
> > On Mon, 2020-02-03 at 11:51 -0600, John Roesler wrote:
> >> [EXTERNAL EMAIL] Attention: This email was sent from outside
> >> TiVo. DO NOT CLICK any links or attachments unless you expected
> >> them. 
> >>
> >>
> >> Hi Tommy,
> >>
> >> Thanks for the context. I can see the attraction of considering
> >> these use cases together.
> >>
> >> To answer your question, if a part of the record is not relevant
> >> to downstream consumers, I was thinking you could just use a
> >> mapValue to remove it.
> >>
> >> E.g., suppose you wanted to do a join between two tables.
> >>
> >> employeeInfo.join( employeePayroll, (info, payroll) -> new
> >> Result(info.name(), payroll.salary()) )
> >>
> >> We only care about one attribute from the Info table (name), and
> >> one from the Payroll table (salary), and 

Re: [KAFKA-557] Add emit on change support for Kafka Streams

2020-02-02 Thread Ted Yu
w.r.t. new metric, there is already droppedRecordsSensor which logs:

"Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",

It seems we should introduce another metric which records the skipped
(duplicate) values.

This way, it is easier to observe the effect when this feature is in production.

Cheers


>
> -- Forwarded message -
> From: Richard Yu 
> Date: Sun, Feb 2, 2020 at 10:21 AM
> Subject: Re: [KAFKA-557] Add emit on change support for Kafka Streams
> To: 
>
>
> Hi Bruno,
>
> Thanks for the reply!
>
> I've included some basic description on the reporting strategies in the
> KIP (I might include more information on that later). I've also worked to
> add some more details on behavior changes as well as rejected alternatives.
> Hope it will help facilitate the process. :)
>
> I just want to add something on a relevant topic: we need metrics. I think
> this should also be included with this change for a number of reasons. For
> some users, they already know that their Streams application is
> experiencing a lot of no-op traffic. But that doesn't mean other users are
> aware of the same problem. Also, if we are dropping no-ops, then we might
> as well record exactly how many we have dropped out of how many total
> operations we've done. Therefore, I argue that we also include some metric
> which records this data and reports it to the user.
>
> Beyond that, let me know if we might need to address anything else. :)
>
> Cheers,
> Richard
>
>
>
> On Sun, Feb 2, 2020 at 3:57 AM Bruno Cadonna  wrote:
>
>> Hi,
>>
>> Richard, thank you for the updated KIP.
>>
>> Regarding your question about the survey, IMO the survey should
>> contain a brief description of the emit (report) strategy of each
>> system and a list of pros and cons. I personally would be interested
>> what emit strategy Flink uses.
>>
>> I have a few comments about the KIP and its documentation:
>>
>> KIP-specific:
>>
>> 1. I agree with Matthias that we should also include aggregations
>> where neither the value nor the timestamp change.
>>
>> 2. Regarding Matthias' concerns about the dependency of the result of
>> a stateless operation on the materialization, I have two
>> questions/observations:
>> a) Is the result not already dependent on the materialization since in
>> case of materlized results the cache would not emit all records for
>> the same key downstream?
>> b) Emitting more records from a non-materialized operation should not
>> affect the semantics because we are emitting changelog records. The
>> next time these changelog records are materialized the result should
>> be correct. Right? However, I see the issue when a KTable is
>> transformed to a KStream with `toStream()`. The stream would then
>> differ depending on the materialization. But that seems to me an issue
>> that is not specific to the emit strategy and that we already have
>> when we use a cache, don't we? Is it even an issue?
>>
>> 3. With out-of-order records we would emit more records. Let's assume
>> the following records
>> K, V, T3
>> K, V, T1
>> K, V, T2
>> with T1 < T2 < T3
>>
>> A KTable that reads this records in this order, would emit (assuming no
>> cache)
>> K, V, T3
>> K, V, T1
>>
>> The record at T3 is emitted because it is the first.
>> The record at T1 is emitted because T1 < T3.
>> The record at T2 is not emitted because T2 >= T1
>> Correct?
>>
>> Richard, it would be good to add a section about out-of-order records
>> to the KIP.
>>
>>
>> Documentation-specific:
>>
>> 1. I agree with John on his feedback on the KIP document. It is really
>> important to clearly state what this KIP will improve and what not,
>> otherwise it becomes hard to vote on the KIP and to decide whether the
>> KIP is fully implemented or not.
>>
>> 2. Could you please already state in the "Motivation" section of the
>> KIP where you list the current emit strategies that the emit strategy
>> only applies to operations that involve a KTable? Probably for most it
>> will be clear what you mean, but IMO KIPs should be easily
>> approachable and it doesn't cost much to add this information.
>>
>> 3. Could you please list the rejected suppress extension in the
>> "Rejected Alternatives" section?
>>
>> 4. In the discussion about materializing results of stateless
>> operations, could you please add that those stateless operations are
>> on KTables? IMO adding this information makes the KIP easier
>> approachable by people that are not that familiar with the matter.
>> Best,
>> Bruno
>>
>> On Sat, Feb 1, 2020 at 11:33 PM Richard Yu 
>> wrote:
>> >
>> > Hi all,
>> >
>> > You all have good points!
>> >
>> > I took a look, and I thought it over. After some thinking, it appears
>> the
>> > main point of contention is whether or not we can support emit on change
>> > for stateless operations. I agree with John in that we probably should
>> > restrict ourselves to materialized KTables:
>> >
>> >1. Loading any prior results would incur performance 

Re: [VOTE] KIP-526: Reduce Producer Metadata Lookups for Large Number of Topics

2020-01-28 Thread Ted Yu
+1

On Tue, Jan 28, 2020 at 10:52 AM Rajini Sivaram 
wrote:

> +1 (binding)
>
> Thanks for the KIP, Brian!
>
> Regards,
>
> Rajini
>
> On Thu, Jan 23, 2020 at 7:34 PM Jason Gustafson 
> wrote:
>
> > Sounds good. +1 from me.
> >
> > On Thu, Jan 23, 2020 at 9:00 AM Brian Byrne  wrote:
> >
> > > Thanks Jason,
> > >
> > > I'm in favor of the latter: metadata.max.idle.ms. I agree that
> > describing
> > > it as a "period" is inaccurate. With metadata.max.idle.ms, it also
> > aligns
> > > with metadata.max.age.ms for determining refresh period (which is an
> > > actual
> > > period).
> > >
> > > I've updated the docs.
> > >
> > > Thanks,
> > > Brian
> > >
> > > On Wed, Jan 22, 2020 at 6:19 PM Jason Gustafson 
> > > wrote:
> > >
> > > > Thanks for the proposal. Looks good overall. I wanted to suggest a
> > > possible
> > > > name change. I was considering something like `
> > > idle.metadata.expiration.ms
> > > > `
> > > > or maybe `metadata.max.idle.ms`. Thoughts?
> > > >
> > > > -Jason
> > > >
> > > >
> > > > On Tue, Jan 21, 2020 at 11:38 AM Guozhang Wang 
> > > wrote:
> > > >
> > > > > Got it.
> > > > >
> > > > > I was proposing that we do the "delayed async batch" but I think
> your
> > > > > argument for complexity and pushing it out of the scope is
> > convincing,
> > > so
> > > > > instead I propose we do the synchronous mini batching still but
> > > obviously
> > > > > it is already there :)  I'm +1 on the current proposal scope.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Tue, Jan 21, 2020 at 10:16 AM Brian Byrne 
> > > > wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > Ah, sorry, I misunderstood. Actually, this is solved for us
> today.
> > > How
> > > > > the
> > > > > > producer works is that it maintains at most one inflight metadata
> > > fetch
> > > > > > request at any time, where each request is tagged with the
> current
> > > > > > (monotonically increasing) request version. This version is
> bumped
> > > > > whenever
> > > > > > a new topic is encountered, and metadata fetching will continue
> to
> > > > > process
> > > > > > while the latest metadata response's version is below the current
> > > > > version.
> > > > > >
> > > > > > So if a metadata request is in flight, and a number of threads
> > > produce
> > > > to
> > > > > > new topics, they'll be added to the working set but the next
> > metadata
> > > > > > request won't take place until the outstanding one returns. So
> > their
> > > > > > updates will be batched together. As you suggest, we can have a
> > > simple
> > > > > list
> > > > > > that tracks unknown topics to isolate new vs. old topics.
> > > > > >
> > > > > > Thanks,
> > > > > > Brian
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Jan 21, 2020 at 10:04 AM Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Brian,
> > > > > > >
> > > > > > > I think I buy the complexity and extra end-to-end-latency
> > argument
> > > :)
> > > > > I'm
> > > > > > > fine with delaying the asynchronous tech fetching to future
> works
> > > and
> > > > > > keep
> > > > > > > the current KIP's scope as-is for now. Under that case can we
> > > > consider
> > > > > > just
> > > > > > > a minor implementation detail (since it is not affecting public
> > > APIs
> > > > we
> > > > > > > probably do not even need to list it, but just thinking loud
> > here):
> > > > > > >
> > > > > > > In your proposal when we request for a topic of unknown
> metadata,
> > > we
> > > > > are
> > > > > > > going to directly set the topic name as that singleton in the
> > > > request.
> > > > > > I'm
> > > > > > > wondering for the scenario that KAFKA-8904 described, if the
> > > > > > producer#send
> > > > > > > for thousands of new topics are triggered sequentially by a
> > single
> > > > > thread
> > > > > > > or concurrent threads? If it's the latter, and we expect in
> such
> > > > > > scenarios
> > > > > > > we may have multiple topics being requests within a very short
> > > time,
> > > > > then
> > > > > > > we can probably do sth. like this internally in a synchronized
> > > > manner:
> > > > > > >
> > > > > > > 1) put the topic name into a list, as "unknown topics", then
> > > > > > > 2) exhaust the list, and put all topics from that list to the
> > > > request;
> > > > > if
> > > > > > > the list is empty, it means it has been emptied by another
> thread
> > > so
> > > > we
> > > > > > > skip sending a new request and just wait for the returned
> > metadata
> > > > > > refresh.
> > > > > > >
> > > > > > > In most cases the list would just be a singleton with the one
> > that
> > > > > thread
> > > > > > > has just enqueued, but under extreme scenarios it can help
> > > batching a
> > > > > few
> > > > > > > topic names probably (of course, I'm thinking about very
> extreme
> > > > cases
> > > > > > > here, assuming that's was what we've seen in 8904). Since these
> > two
> > > > > steps
> > > > > > > are very light-weighted, doing that in 

[jira] [Resolved] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-24 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved KAFKA-9471.
---
Resolution: Duplicate

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-24 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved KAFKA-9471.
---
Resolution: Duplicate

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-24 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022970#comment-17022970
 ] 

Ted Yu commented on KAFKA-9471:
---

[~vitojeng]
Please let me know if I should proceed with this or, do you plan to include the 
exception throwing in KIP-216 ?

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-23 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022595#comment-17022595
 ] 

Ted Yu commented on KAFKA-9471:
---

[~vitojeng]
Looks like the following query depends on the empty collection for DEAD state:
{code}
@Test
public void shouldAllowToQueryAfterThreadDied() throws Exception {
{code}
It fails when exception is thrown.
What do you think ?

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-9471:
--
Comment: was deleted

(was: I will send out a PR soon alone the above line of thinking.)

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9471) Throw exception for DEAD StreamThread.State

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-9471:
--
Summary: Throw exception for DEAD StreamThread.State  (was: Return empty 
collection for PENDING_SHUTDOWN)

> Throw exception for DEAD StreamThread.State
> ---
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-9471:
--
Description: 
In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
If user cannot retry anymore, we should throw exception which is handled in the 
else block.

  was:
In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
PENDING_SHUTDOWN is precursor to DEAD state.
PENDING_SHUTDOWN should be treated the same way as DEAD.

This makes more sense than current behavior of throwing exception for 
PENDING_SHUTDOWN.


> Return empty collection for PENDING_SHUTDOWN
> 
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> If user cannot retry anymore, we should throw exception which is handled in 
> the else block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022589#comment-17022589
 ] 

Ted Yu commented on KAFKA-9471:
---

I will send out a PR soon alone the above line of thinking.

> Return empty collection for PENDING_SHUTDOWN
> 
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> PENDING_SHUTDOWN is precursor to DEAD state.
> PENDING_SHUTDOWN should be treated the same way as DEAD.
> This makes more sense than current behavior of throwing exception for 
> PENDING_SHUTDOWN.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9471:
-

 Summary: Return empty collection for PENDING_SHUTDOWN
 Key: KAFKA-9471
 URL: https://issues.apache.org/jira/browse/KAFKA-9471
 Project: Kafka
  Issue Type: Improvement
Reporter: Ted Yu
Assignee: Ted Yu


In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
PENDING_SHUTDOWN should be treated the same way as DEAD.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9471:
-

 Summary: Return empty collection for PENDING_SHUTDOWN
 Key: KAFKA-9471
 URL: https://issues.apache.org/jira/browse/KAFKA-9471
 Project: Kafka
  Issue Type: Improvement
Reporter: Ted Yu
Assignee: Ted Yu


In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
PENDING_SHUTDOWN should be treated the same way as DEAD.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-9471:
--
Description: 
In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
PENDING_SHUTDOWN is precursor to DEAD state.
PENDING_SHUTDOWN should be treated the same way as DEAD.

This makes more sense than current behavior of throwing exception for 
PENDING_SHUTDOWN.

  was:
In StreamThreadStateStoreProvider we have:
{code}
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
{code}
PENDING_SHUTDOWN should be treated the same way as DEAD.


> Return empty collection for PENDING_SHUTDOWN
> 
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>    Reporter: Ted Yu
>    Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> PENDING_SHUTDOWN is precursor to DEAD state.
> PENDING_SHUTDOWN should be treated the same way as DEAD.
> This makes more sense than current behavior of throwing exception for 
> PENDING_SHUTDOWN.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9471) Return empty collection for PENDING_SHUTDOWN

2020-01-23 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17022373#comment-17022373
 ] 

Ted Yu commented on KAFKA-9471:
---

[~mjsax][~guozhang]
Please comment.

> Return empty collection for PENDING_SHUTDOWN
> 
>
> Key: KAFKA-9471
> URL: https://issues.apache.org/jira/browse/KAFKA-9471
> Project: Kafka
>  Issue Type: Improvement
>    Reporter: Ted Yu
>    Assignee: Ted Yu
>Priority: Minor
>
> In StreamThreadStateStoreProvider we have:
> {code}
> if (streamThread.state() == StreamThread.State.DEAD) {
> return Collections.emptyList();
> {code}
> PENDING_SHUTDOWN is precursor to DEAD state.
> PENDING_SHUTDOWN should be treated the same way as DEAD.
> This makes more sense than current behavior of throwing exception for 
> PENDING_SHUTDOWN.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9464) Close the producer in completeShutdown

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved KAFKA-9464.
---
Resolution: Not A Problem

> Close the producer in completeShutdown
> --
>
> Key: KAFKA-9464
> URL: https://issues.apache.org/jira/browse/KAFKA-9464
> Project: Kafka
>  Issue Type: Bug
>    Reporter: Ted Yu
>    Assignee: Ted Yu
>Priority: Minor
>
> In StreamThread#completeShutdown, the producer (if not null) should be closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9464) Close the producer in completeShutdown

2020-01-23 Thread Ted Yu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved KAFKA-9464.
---
Resolution: Not A Problem

> Close the producer in completeShutdown
> --
>
> Key: KAFKA-9464
> URL: https://issues.apache.org/jira/browse/KAFKA-9464
> Project: Kafka
>  Issue Type: Bug
>    Reporter: Ted Yu
>    Assignee: Ted Yu
>Priority: Minor
>
> In StreamThread#completeShutdown, the producer (if not null) should be closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9465) Enclose consumer call with catching InvalidOffsetException

2020-01-22 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9465:
-

 Summary: Enclose consumer call with catching InvalidOffsetException
 Key: KAFKA-9465
 URL: https://issues.apache.org/jira/browse/KAFKA-9465
 Project: Kafka
  Issue Type: Improvement
Reporter: Ted Yu


In maybeUpdateStandbyTasks, the try block encloses restoreConsumer.poll and 
record handling.
Since InvalidOffsetException is thrown by restoreConsumer.poll, we should 
enclose this call in the try block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9465) Enclose consumer call with catching InvalidOffsetException

2020-01-22 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9465:
-

 Summary: Enclose consumer call with catching InvalidOffsetException
 Key: KAFKA-9465
 URL: https://issues.apache.org/jira/browse/KAFKA-9465
 Project: Kafka
  Issue Type: Improvement
Reporter: Ted Yu


In maybeUpdateStandbyTasks, the try block encloses restoreConsumer.poll and 
record handling.
Since InvalidOffsetException is thrown by restoreConsumer.poll, we should 
enclose this call in the try block.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9464) Close the producer in completeShutdown

2020-01-22 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9464:
-

 Summary: Close the producer in completeShutdown
 Key: KAFKA-9464
 URL: https://issues.apache.org/jira/browse/KAFKA-9464
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu


In StreamThread#completeShutdown, the producer (if not null) should be closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9464) Close the producer in completeShutdown

2020-01-22 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9464:
-

 Summary: Close the producer in completeShutdown
 Key: KAFKA-9464
 URL: https://issues.apache.org/jira/browse/KAFKA-9464
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu


In StreamThread#completeShutdown, the producer (if not null) should be closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9463) Transient failure in KafkaAdminClientTest.testListOffsets

2020-01-21 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9463:
-

 Summary: Transient failure in KafkaAdminClientTest.testListOffsets
 Key: KAFKA-9463
 URL: https://issues.apache.org/jira/browse/KAFKA-9463
 Project: Kafka
  Issue Type: Test
Reporter: Ted Yu


When running tests with Java 11, I got the following test failure:
{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testListOffsets(KafkaAdminClientTest.java:2336)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
for a node assignment.
{code}
KafkaAdminClientTest.testListOffsets passes when it is run alone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9463) Transient failure in KafkaAdminClientTest.testListOffsets

2020-01-21 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9463:
-

 Summary: Transient failure in KafkaAdminClientTest.testListOffsets
 Key: KAFKA-9463
 URL: https://issues.apache.org/jira/browse/KAFKA-9463
 Project: Kafka
  Issue Type: Test
Reporter: Ted Yu


When running tests with Java 11, I got the following test failure:
{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testListOffsets(KafkaAdminClientTest.java:2336)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
for a node assignment.
{code}
KafkaAdminClientTest.testListOffsets passes when it is run alone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9462) Correct exception message in DistributedHerder

2020-01-21 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9462:
-

 Summary: Correct exception message in DistributedHerder
 Key: KAFKA-9462
 URL: https://issues.apache.org/jira/browse/KAFKA-9462
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu


There are a few exception messages in DistributedHerder which were copied from 
other exception message.

This task corrects the messages to reflect actual condition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9462) Correct exception message in DistributedHerder

2020-01-21 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9462:
-

 Summary: Correct exception message in DistributedHerder
 Key: KAFKA-9462
 URL: https://issues.apache.org/jira/browse/KAFKA-9462
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu


There are a few exception messages in DistributedHerder which were copied from 
other exception message.

This task corrects the messages to reflect actual condition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-553: Disable all SSL protocols except TLSV1.2 by default.

2020-01-21 Thread Ted Yu
+1

On Tue, Jan 21, 2020 at 8:24 AM Rajini Sivaram 
wrote:

> +1 (binding)
>
> Thanks for the KIP!
>
> Regards,
>
> Rajini
>
>
> On Tue, Jan 21, 2020 at 3:43 PM Николай Ижиков 
> wrote:
>
> > Hello.
> >
> > I would like to start vote for KIP-553: Disable all SSL protocols except
> > TLSV1.2 by default.
> >
> > KIP -
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=142641956
> > Discussion thread -
> >
> https://lists.apache.org/thread.html/9c6201fe403a24f84fc3aa27f47dd06b718c1d80de0ee3412b9b877c%40%3Cdev.kafka.apache.org%3E
>


[jira] [Commented] (KAFKA-9461) Limit DEBUG statement size when logging failed record value

2020-01-21 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020262#comment-17020262
 ] 

Ted Yu commented on KAFKA-9461:
---

Since we may not see the complete record given any threshold for size limit, we 
can use hardcoded value when the limit is added.

> Limit DEBUG statement size when logging failed record value
> ---
>
> Key: KAFKA-9461
> URL: https://issues.apache.org/jira/browse/KAFKA-9461
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Nicolas Guyomar
>Priority: Minor
>
> Hi,
> It is possible with the current implementation that we log a full record 
> content at DEBUG level, which can overwhelmed log4j buffer and OOM it : 
> That stack trace was due to a 70MB messages refused by a broker 
> {code:java}
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3332)
> at 
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
> at java.lang.StringBuffer.append(StringBuffer.java:270)
> at 
> org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419)
> at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
> at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
> at 
> org.apache.log4j.RollingFileAppender.subAppend(RollingFileAppender.java:276)
> at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> at org.apache.log4j.Category.callAppenders(Category.java:206)
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.log(Category.java:856)
> at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:252)
> at 
> org.apache.kafka.connect.runtime.WorkerSourceTask$1.onCompletion(WorkerSourceTask.java:330){code}
>   in  
> [https://github.com/apache/kafka/blob/da4337271ef0b72643c0cf47ae51e69f79cf1901/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L348]
> Would it make sense to protect Connect directly in the ConnectRecord 
> toString() method and set a configurable limit ? 
>  Thank you
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9455) Consider using TreeMap for In-memory stores of Streams

2020-01-19 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019191#comment-17019191
 ] 

Ted Yu commented on KAFKA-9455:
---

Maybe we can also look at (profile) Maps from fastutil such as:

http://fastutil.di.unimi.it/docs/it/unimi/dsi/fastutil/objects/Object2ObjectSortedMap.html

> Consider using TreeMap for In-memory stores of Streams
> --
>
> Key: KAFKA-9455
> URL: https://issues.apache.org/jira/browse/KAFKA-9455
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> From [~ableegoldman]: It's worth noting that it might be a good idea to 
> switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap 
> allows us to safely perform range queries without copying over the entire 
> keyset, but the performance on point queries seems to scale noticeably worse 
> with the number of unique keys. Point queries are used by aggregations while 
> range queries are used by windowed joins, but of course both are available 
> within the PAPI and for interactive queries so it's hard to say which we 
> should prefer. Maybe rather than make that tradeoff we should have one 
> version for efficient range queries (a "JoinWindowStore") and one for 
> efficient point queries ("AggWindowStore") - or something. I know we've had 
> similar thoughts for a different RocksDB store layout for Joins (although I 
> can't find that ticket anywhere..), it seems like the in-memory stores could 
> benefit from a special "Join" version as well cc/ Guozhang Wang
> Here are some random thoughts:
> 1. For kafka streams processing logic (i.e. without IQ), it's better to make 
> all processing logic relying on point queries rather than range queries. 
> Right now the only processor that use range queries are, as mentioned above, 
> windowed stream-stream joins. I think we should consider using a different 
> window implementation for this (and as a result also get rid of the 
> retainDuplicate flags) to refactor the windowed stream-stream join operation.
> 2. With 1), range queries would only be exposed as IQ. Depending on its usage 
> frequency I think it makes lots of sense to optimize for single-point queries.
> Of course, even without step 1) we should still consider using tree-map for 
> windowed in-memory stores to have a better scaling effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-18 Thread Ted Yu
Looking at the current KIP-562:

bq. Create a taskId from the combination of store name and partition
provided by the user

I wonder if a single taskId would be used for the “all partitions” case.
If so, we need to choose a numerical value for the partition portion of the
taskId.

On Sat, Jan 18, 2020 at 10:27 AM John Roesler  wrote:

> Thanks, Ted!
>
> This makes sense, but it seems like we should lean towards explicit
> semantics in the public API. ‘-1’ meaning “all partitions” is reasonable,
> but not explicit. That’s why I suggested the Boolean for “all partitions”.
> I guess this also means getPartition() should either throw an exception or
> return null if the partition is unspecified.
>
> Thanks,
> John
>
> On Sat, Jan 18, 2020, at 08:43, Ted Yu wrote:
> > I wonder if the following two methods can be combined:
> >
> > Integer getPartition() // would be null if unset or if "all partitions"
> > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> >
> > into:
> >
> > Integer getPartition() // would be null if unset or -1 if "all
> partitions"
> >
> > Cheers
> >
> > On Fri, Jan 17, 2020 at 9:56 PM John Roesler 
> wrote:
> >
> > > Thanks, Navinder!
> > >
> > > I took a look at the KIP.
> > >
> > > We tend to use static factory methods instead of public constructors,
> and
> > > also builders for optional parameters.
> > >
> > > Given that, I think it would be more typical to have a factory method:
> > > storeQueryParams()
> > >
> > > and also builders for setting the optional parameters, like:
> > > withPartitions(List partitions)
> > > withStaleStoresEnabled()
> > > withStaleStoresDisabled()
> > >
> > >
> > > I was also thinking this over today, and it really seems like there are
> > > two main cases for specifying partitions,
> > > 1. you know exactly what partition you want. In this case, you'll only
> > > pass in a single number.
> > > 2. you want to get a handle on all the stores for this instance (the
> > > current behavior). In this case, it's not clear how to use
> withPartitions
> > > to achieve the goal, unless you want to apply a-priori knowledge of the
> > > number of partitions in the store. We could consider an empty list, or
> a
> > > null, to indicate "all", but that seems a little complicated.
> > >
> > > Thus, maybe it would actually be better to eschew withPartitions for
> now
> > > and instead just offer:
> > > withPartition(int partition)
> > > withAllLocalPartitions()
> > >
> > > and the getters:
> > > Integer getPartition() // would be null if unset or if "all partitions"
> > > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> > >
> > > Sorry, I know I'm stirring the pot, but what do you think about this?
> > >
> > > Oh, also, the KIP is missing the method signature for the new
> > > KafkaStreams#store overload.
> > >
> > > Thanks!
> > > -John
> > >
> > > On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > > > Hi all,
> > > > I have created a new
> > > > KIP:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > > > Please take a look if you get a chance.
> > > > ~Navinder
> > >
> >
>


Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-18 Thread Ted Yu
I wonder if the following two methods can be combined:

Integer getPartition() // would be null if unset or if "all partitions"
boolean getAllLocalPartitions() // true/false if "all partitions" requested

into:

Integer getPartition() // would be null if unset or -1 if "all partitions"

Cheers

On Fri, Jan 17, 2020 at 9:56 PM John Roesler  wrote:

> Thanks, Navinder!
>
> I took a look at the KIP.
>
> We tend to use static factory methods instead of public constructors, and
> also builders for optional parameters.
>
> Given that, I think it would be more typical to have a factory method:
> storeQueryParams()
>
> and also builders for setting the optional parameters, like:
> withPartitions(List partitions)
> withStaleStoresEnabled()
> withStaleStoresDisabled()
>
>
> I was also thinking this over today, and it really seems like there are
> two main cases for specifying partitions,
> 1. you know exactly what partition you want. In this case, you'll only
> pass in a single number.
> 2. you want to get a handle on all the stores for this instance (the
> current behavior). In this case, it's not clear how to use withPartitions
> to achieve the goal, unless you want to apply a-priori knowledge of the
> number of partitions in the store. We could consider an empty list, or a
> null, to indicate "all", but that seems a little complicated.
>
> Thus, maybe it would actually be better to eschew withPartitions for now
> and instead just offer:
> withPartition(int partition)
> withAllLocalPartitions()
>
> and the getters:
> Integer getPartition() // would be null if unset or if "all partitions"
> boolean getAllLocalPartitions() // true/false if "all partitions" requested
>
> Sorry, I know I'm stirring the pot, but what do you think about this?
>
> Oh, also, the KIP is missing the method signature for the new
> KafkaStreams#store overload.
>
> Thanks!
> -John
>
> On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > Hi all,
> > I have created a new
> > KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > Please take a look if you get a chance.
> > ~Navinder
>


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-18 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018602#comment-17018602
 ] 

Ted Yu commented on KAFKA-9450:
---

w.r.t. separate column family, since the data in this family tends to be small 
compared to the data family, wouldn't we end up with small files similar to 
rocksdb memtable flush ?

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:26 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookee

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:22 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
{code}
public boolean await(long timeout, TimeUnit unit)
{code}
so that the execution time of handleRequests() can be bounded.
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
{code}


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.
```
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
```

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.s

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/18/20 2:21 AM:


Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.
```
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 6a0809e16..32e4380c0 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -165,7 +165,7 @@ class ZooKeeperClient(connectString: String,
   countDownLatch.countDown()
 }
   }
-  countDownLatch.await()
+  countDownLatch.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)
   responseQueue.asScala.toBuffer
 }
   }
```


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.ja

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/17/20 10:42 PM:
-

Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute (yet to be verified by whether there 
was exception in this code path from server log).
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.


was (Author: yuzhih...@gmail.com):
Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute.
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Work

[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018368#comment-17018368
 ] 

Ted Yu commented on KAFKA-8532:
---

Looking at js2.log , I am not sure deadlock was observed. Maybe 
handleRequests() took very long to execute.
I wonder if we can utilize the following form of await:
```
public boolean await(long timeout, TimeUnit unit)
```
so that the execution time of handleRequests() can be bounded.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchroni

[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018352#comment-17018352
 ] 

Ted Yu commented on KAFKA-8532:
---

Looking at KafkaController.scala in trunk, I don't see 
Expire.waitUntilProcessingStarted shown in the stack trace.
It seems the class has gone through refactoring / bug fix.

[~lbdai3190]
If you can attach server log, that may help us find the root cause.

Thanks

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchroni

[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018175#comment-17018175
 ] 

Ted Yu commented on KAFKA-8532:
---

Created https://github.com/apache/kafka/pull/7978

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.ja

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-17 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17018152#comment-17018152
 ] 

Ted Yu edited comment on KAFKA-8532 at 1/17/20 4:13 PM:


How about making the following change ?
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 7b995931f..6a0809e16 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String,
   inReadLock(initializationLock) {
 send(request) { response =>
   responseQueue.add(response)
-  inFlightRequests.release()
-  countDownLatch.countDown()
 }
   }
-} catch {
-  case e: Throwable =>
-inFlightRequests.release()
-throw e
+   } finally {
+  inFlightRequests.release()
+  countDownLatch.countDown()
 }
   }
   countDownLatch.await()
{code}
countDownLatch is handled consistently with inFlightRequests.

I have run through core:test which passed.

I can send out a PR.


was (Author: yuzhih...@gmail.com):
How about making the following change ?
{code}
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 7b995931f..6a0809e16 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -158,14 +158,11 @@ class ZooKeeperClient(connectString: String,
   inReadLock(initializationLock) {
 send(request) { response =>
   responseQueue.add(response)
-  inFlightRequests.release()
-  countDownLatch.countDown()
 }
   }
-} catch {
-  case e: Throwable =>
-inFlightRequests.release()
-throw e
+   } finally {
+  inFlightRequests.release()
+  countDownLatch.countDown()
 }
   }
   countDownLatch.await()
{code}
countDownLatch is handled consistently with inFlightRequests.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: js.log, js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$D

Re: [VOTE] Apache Bahir 2.4.0 (RC1)

2019-09-21 Thread Ted Yu
+1

On Fri, Sep 20, 2019 at 12:37 PM Luciano Resende 
wrote:

> Off course, my +1
> I know a lot of us are just between travels (returning from ApacheCon)
> but are any other volunteers available to review the release?
>
> On Fri, Sep 13, 2019 at 10:38 AM Luciano Resende 
> wrote:
> >
> > Dear community member,
> >
> > Please vote to approve the release of Apache Bahir 2.4.0 (RC1) based on
> > Apache Spark 2.4.0.
> >
> > Tag: v2.4.0-rc1 (f908ec0dc1bbc4c6d11cde446f2bfd89ea39155f)
> >
> > https://github.com/apache/bahir/tree/v2.4.0-rc1
> >
> > Release files:
> >
> > https://repository.apache.org/content/repositories/orgapachebahir-1031
> >
> > Source distribution:
> >
> > https://dist.apache.org/repos/dist/dev/bahir/bahir-spark/2.4.0-rc1/
> >
> > The vote is open for at least 72 hours and passes if a majority of at
> least
> > 3 +1 PMC votes are cast.
> >
> >   [ ] +1 Release this package as Apache Bahir 2.4.0
> >   [ ] -1 Do not release this package because ...
> >
> >
> > Thanks for your vote!
> >
> > --
> > Luciano Resende
> > http://twitter.com/lresende1975
> > http://lresende.blogspot.com/
>
>
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>


Re: [VOTE] Apache Bahir 2.3.4 (RC1)

2019-09-21 Thread Ted Yu
+1

On Fri, Sep 20, 2019 at 12:37 PM Luciano Resende 
wrote:

> Off course, my +1
> I know a lot of us are just between travels (returning from ApacheCon)
> but are any other volunteers available to review the release?
>
> On Wed, Sep 11, 2019 at 5:42 PM Luciano Resende 
> wrote:
> >
> > Dear community member,
> >
> > Please vote to approve the release of Apache Bahir 2.3.4 (RC1) based on
> > Apache Spark 2.3.4.
> >
> > Tag: v2.3.4-rc1 (716107f420ac3e0afd76e61b74069e551d9a7e15)
> >
> > https://github.com/apache/bahir/tree/v2.3.4-rc1
> >
> > Release files:
> >
> > https://repository.apache.org/content/repositories/orgapachebahir-1030
> >
> > Source distribution:
> >
> > https://dist.apache.org/repos/dist/dev/bahir/bahir-spark/2.3.4-rc1/
> >
> > Also, here is a list of changes between  v2.3.3 and v2.3.4-rc1 tags,
> > which summarize in updating Apache Spark to 2.3.4 and release build
> > preparation.
> >
> > https://github.com/apache/bahir/compare/v2.3.3...v2.3.4-rc1
> >
> > The vote is open for at least 72 hours and passes if a majority of at
> least
> > 3 +1 PMC votes are cast.
> >
> >   [ ] +1 Release this package as Apache Bahir 2.3.4
> >   [ ] -1 Do not release this package because ...
> >
> >
> > Thanks for your vote!
> >
> > --
> > Luciano Resende
> > http://twitter.com/lresende1975
> > http://lresende.blogspot.com/
>
>
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>


Re: [VOTE] Apache Bahir 2.2.3 (RC1)

2019-05-31 Thread Ted Yu
+1

On Thu, May 30, 2019 at 9:01 AM Luciano Resende 
wrote:

> Thanks, Christian,
>
> We are still looking for one more vote. Note that we had two
> concurrent votes going on and this is a different vote thread (e.g.
> 2.2.3 versus 2.3.3)
>
> On Tue, May 28, 2019 at 12:20 AM Christian Kadner 
> wrote:
> >
> > +1
> >
> > Sorry for the delay, I was traveling last week.
> >
> > Thanks Luciano!
> >
> > ~ Christian
> >
> > On 2019/05/19 14:53:32, Luciano Resende  wrote:
> > > Dear community member,
> > >
> > > Please vote to approve the release of Apache Bahir 2.2.3 (RC1) based on
> > > Apache Spark 2.2.3.
> > >
> > > Tag: v2.2.3-rc1 (963c644ff96615bf53ed6570abcf6930d1532776)
> > >
> > > https://github.com/apache/bahir/tree/v2.2.3-rc1
> > >
> > > Release files:
> > >
> > > https://repository.apache.org/content/repositories/orgapachebahir-1028
> > >
> > > Source distribution:
> > >
> > > https://dist.apache.org/repos/dist/dev/bahir/bahir-spark/2.2.3-rc1/
> > >
> > >
> > > The vote is open for at least 72 hours and passes if a majority of at
> least
> > > 3 +1 PMC votes are cast.
> > >
> > >   [ ] +1 Release this package as Apache Bahir 2.2.3
> > >   [ ] -1 Do not release this package because ...
> > >
> > >
> > > Thanks for your vote!
> > >
> > > --
> > > Luciano Resende
> > > http://twitter.com/lresende1975
> > > http://lresende.blogspot.com/
> > >
>
>
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>


Re: [VOTE] Apache Bahir 2.3.3 (RC1)

2019-05-26 Thread Ted Yu
+1

On Sat, May 25, 2019 at 5:51 PM Luciano Resende 
wrote:

> Off course, my +1 for the release.
>
> We are still looking for an extra vote to approve the release, please
> take a little time and help approve the release.
>
> On Sun, May 19, 2019 at 9:15 AM Luciano Resende 
> wrote:
> >
> > Dear community member,
> >
> > Please vote to approve the release of Apache Bahir 2.3.3 (RC1) based on
> > Apache Spark 2.3.3.
> >
> > Tag: v2.3.3-rc1 (e29034cad9bec11da1b81324b1f67118772861d2)
> >
> > https://github.com/apache/bahir/tree/v2.3.3-rc1
> >
> > Release files:
> >
> > https://repository.apache.org/content/repositories/orgapachebahir-1029
> >
> > Source distribution:
> >
> > https://dist.apache.org/repos/dist/dev/bahir/bahir-spark/2.3.3-rc1/
> >
> >
> > The vote is open for at least 72 hours and passes if a majority of at
> least
> > 3 +1 PMC votes are cast.
> >
> >   [ ] +1 Release this package as Apache Bahir 2.3.3
> >   [ ] -1 Do not release this package because ...
> >
> >
> > Thanks for your vote!
> >
> > --
> > Luciano Resende
> > http://twitter.com/lresende1975
> > http://lresende.blogspot.com/
>
>
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>


[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-05-23 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16846843#comment-16846843
 ] 

Ted Yu commented on KAFKA-5998:
---

Can you move state directory outside of /tmp which is subject to cleaning by 
the OS ?

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
>  

[jira] [Resolved] (FLINK-10446) Use the "guava beta checker" plugin to keep off of @Beta API

2019-04-29 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-10446.

Resolution: Won't Fix

> Use the "guava beta checker" plugin to keep off of @Beta API
> 
>
> Key: FLINK-10446
> URL: https://issues.apache.org/jira/browse/FLINK-10446
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Ji Liu
>Priority: Major
>
> The Guava people publish an Error Prone plugin to detect when stuff that's 
> annotated with @Beta gets used. Those things shouldn't be used because the 
> project gives no promises about deprecating before removal.
> plugin:
> https://github.com/google/guava-beta-checker



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-26 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827123#comment-16827123
 ] 

Ted Yu edited comment on KAFKA-5998 at 4/26/19 5:31 PM:


https://pastebin.com/vyvQ8pkF shows what I mentioned earlier.

In StateDirectory#cleanRemovedTasks, we use both last modification time of the 
directory and whether the task has been committed as criteria. When both are 
satisfied, we clean the directory for the task.


was (Author: yuzhih...@gmail.com):
https://pastebin.com/Y247UZgb shows what I mentioned earlier (incomplete).

In StateDirectory#cleanRemovedTasks, we use both last modification time of the 
directory and whether the task has been committed as criteria. When both are 
satisfied, we clean the directory for the task.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundExc

[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-26 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827123#comment-16827123
 ] 

Ted Yu edited comment on KAFKA-5998 at 4/26/19 5:12 PM:


https://pastebin.com/Y247UZgb shows what I mentioned earlier (incomplete).

In StateDirectory#cleanRemovedTasks, we use both last modification time of the 
directory and whether the task has been committed as criteria. When both are 
satisfied, we clean the directory for the task.


was (Author: yuzhih...@gmail.com):
https://pastebin.com/Y247UZgb shows what I mentioned earlier.
In StateDirectory#cleanRemovedTasks, we don't rely on last modification time - 
we check whether the task has been committed.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-26 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827123#comment-16827123
 ] 

Ted Yu commented on KAFKA-5998:
---

https://pastebin.com/Y247UZgb shows what I mentioned earlier.
In StateDirectory#cleanRemovedTasks, we don't rely on last modification time - 
we check whether the task has been committed.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-26 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16827089#comment-16827089
 ] 

Ted Yu commented on KAFKA-5998:
---

[~mparthas]:
>From your comment on Apr 8th, it seems that making the value for 
>"state.cleanup.delay.ms" longer didn't avoid .checkpoint.tmp disappearing.

Did you see log similar to the following prior to the error ?
{code}
April 25th 2019, 21:03:49.332 2019-04-25 21:03:49,332 INFO 
[org.apache.kafka.streams.processor.internals.StateDirectory] 
(application-fde85da6-9d2f-4457-8bdb-ea1c78c8c1e2-CleanupThread) - 
[short-component-name:; transaction-id:; user-id:; creation-time:] 
stream-thread [application-fde85da6-9d2f-4457-8bdb-ea1c78c8c1e2-CleanupThread] 
Deleting obsolete state directory 1_1 for task 1_1 as 813332ms has elapsed 
(cleanup delay is 60ms).
{code}
If so, that might indicate the new value for "state.cleanup.delay.ms" was still 
short.
If not, there could be other reason for the problem.

In case the log has been swapped out, please keep an eye for future occurrences.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThr

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-20 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822564#comment-16822564
 ] 

Ted Yu commented on KAFKA-5998:
---

One doubt I had after reading recent comments is that, if the state directory 
is used by more than one process, wouldn't the problem manifest itself other 
than disappearing checkpoint file (such as corrupted checkpoint) ?

My former comment on reference counting was not refined (pending answer to the 
above doubt, we may come up with good solution).

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
>   

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-20 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822446#comment-16822446
 ] 

Ted Yu commented on KAFKA-5998:
---

I wonder if we can use a file, which records reference count (for processor 
instances), alongside checkpoint file.

Checkpoint file is only deleted when count reaches zero.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-06 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811681#comment-16811681
 ] 

Ted Yu commented on KAFKA-5998:
---

I made the following change:
{code}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index badaa36cd..be29ebe56 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -138,6 +138,7 @@ public abstract class AbstractJoinIntegrationTest {
 
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
LongDeserializer.class);
 
RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);

+STREAMS_CONFIG.put(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, "1");
 STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
 STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, 
true);
 STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
{code}
For the following check in KafkaStreams#start:
{code}
if (state == State.RUNNING) {
stateDirectory.cleanRemovedTasks(cleanupDelay);
{code}
The state was REBALANCING during TableTableJoinIntegrationTest.
The cleaning was not triggered.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-06 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811603#comment-16811603
 ] 

Ted Yu commented on KAFKA-5998:
---

It seems StreamTask can keep track of active tasks. After commit() finishes, 
StreamTask can inform stateDirectory.cleanRemovedTasks that the underlying task 
is subject to cleaning.
This would be more robust than depending on period of inactivity alone for 
cleaning.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
>   

[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-06 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811551#comment-16811551
 ] 

Ted Yu edited comment on KAFKA-5998 at 4/6/19 10:55 AM:


Cleanup delay is controlled by:
{code}
public static final String STATE_CLEANUP_DELAY_MS_CONFIG = 
"state.cleanup.delay.ms";
{code}
Maybe longer value can be specified so that CleanupThread doesn't clean up the 
task directory prematurely (in case task directory hasn't been modified for a 
while).


was (Author: yuzhih...@gmail.com):
Cleanup delay is controlled by:
{code}
public static final String STATE_CLEANUP_DELAY_MS_CONFIG = 
"state.cleanup.delay.ms";
{code}
Maybe longer value can be specified so that CleanupThread doesn't clean up the 
task directory prematurely.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-06 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811551#comment-16811551
 ] 

Ted Yu commented on KAFKA-5998:
---

Cleanup delay is controlled by:
{code}
public static final String STATE_CLEANUP_DELAY_MS_CONFIG = 
"state.cleanup.delay.ms";
{code}
Maybe longer value can be specified so that CleanupThread doesn't clean up the 
task directory prematurely.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> 

[jira] [Comment Edited] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794549#comment-16794549
 ] 

Ted Yu edited comment on KAFKA-3729 at 3/18/19 3:06 AM:


3729.v6.txt shows the progress toward passing StreamsConfig to Store#init().

The assertion in KStreamAggregationDedupIntegrationTest#shouldGroupByKey passes.
The assertion fails without proper Store init.

Note, PR #6461 doesn't require KIP.


was (Author: yuzhih...@gmail.com):
3729.v6.txt shows the progress toward passing StreamsConfig to Store#init().

The assertion in KStreamAggregationDedupIntegrationTest#shouldGroupByKey passes.
The assertion fails without proper Store init.

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-3729:
--
Attachment: (was: 3729.v6.txt)

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794549#comment-16794549
 ] 

Ted Yu edited comment on KAFKA-3729 at 3/17/19 6:39 PM:


3729.v6.txt shows the progress toward passing StreamsConfig to Store#init().

The assertion in KStreamAggregationDedupIntegrationTest#shouldGroupByKey passes.
The assertion fails without proper Store init.


was (Author: yuzhih...@gmail.com):
3729.v6.txt shows the progress toward passing StreamsConfig to Store#init().

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-3729:
--
Attachment: 3729.v6.txt

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16794549#comment-16794549
 ] 

Ted Yu commented on KAFKA-3729:
---

3729.v6.txt shows the progress toward passing StreamsConfig to Store#init().

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-17 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-3729:
--
Attachment: 3729.v6.txt

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Assignee: Ted Yu
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt, 3729.v6.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-07 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787312#comment-16787312
 ] 

Ted Yu commented on KAFKA-3729:
---

https://github.com/apache/kafka/pull/6399

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-07 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16787272#comment-16787272
 ] 

Ted Yu commented on KAFKA-3729:
---

The following seems to result in compilation error:
{code}
/Users/yute/kafka/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:667:
 warning: [unchecked] unchecked call to configure(Map,boolean) as a 
member of the raw type Serializer
sn.getKeySer().configure(config.originals(), true);
^
/Users/yute/kafka/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:668:
 warning: [unchecked] unchecked call to configure(Map,boolean) as a 
member of the raw type Serializer
sn.getValueSer().configure(config.originals(), false);
  ^
{code}
I wonder how the warnings can be suppressed.
I checked existing calls to configure() which doesn't give me much clue.

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-05 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16785119#comment-16785119
 ] 

Ted Yu commented on KAFKA-3729:
---

Attached tentative patch.
If it is on right track, I can send out a PR.

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-05 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-3729:
--
Attachment: 3729.txt

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
> Attachments: 3729.txt
>
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-05 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16784755#comment-16784755
 ] 

Ted Yu commented on KAFKA-3729:
---

Can AbstractProcessorContext#appConfigs() be used to obtain the Map which 
configure() uses ?

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-04 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-3729:
--
Comment: was deleted

(was: Joined class was added Sep 6 10:55:43 2017

Guozhang's comment above was made in 2016.

I wonder what needs to be done for this JIRA based on the current codebase.)

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-04 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16784019#comment-16784019
 ] 

Ted Yu commented on KAFKA-3729:
---

Joined class was added Sep 6 10:55:43 2017

Guozhang's comment above was made in 2016.

I wonder what needs to be done for this JIRA based on the current codebase.

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3729) Auto-configure non-default SerDes passed alongside the topology builder

2019-03-04 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16783778#comment-16783778
 ] 

Ted Yu commented on KAFKA-3729:
---

I wonder if a third person can work on this - considering the lack of response 
for two years.

>  Auto-configure non-default SerDes passed alongside the topology builder
> 
>
> Key: KAFKA-3729
> URL: https://issues.apache.org/jira/browse/KAFKA-3729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Fred Patton
>Priority: Major
>  Labels: api, newbie
>
> From Guozhang Wang:
> "Only default serdes provided through configs are auto-configured today. But 
> we could auto-configure other serdes passed alongside the topology builder as 
> well."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Upgrading to Scala 2.12

2019-01-22 Thread Ted Yu
+1 on keeping backward compatibility with Scala 2.11
 Original message From: Łukasz Antoniak 
 Date: 1/22/19  2:54 AM  (GMT-08:00) To: 
dev@bahir.apache.org Subject: Upgrading to Scala 2.12 
Team,

Since version 2.4.0 Spark team decided to introduce support for Scala 2.12
and remain 2.11 for backward compatibility. As part of BAHIR-107, I have
been working to upgrade code base to Scala 2.12 and Spark 2.4.0 (
https://github.com/apache/bahir/pull/76/files). Do we plan to continue
support for Scala 2.11? In my opinion, Bahir should follow Spark in terms
of Scala binary compatibility. I have already checked that transitive
dependencies that I had to upgrade (org.json4s:json4s-jackson and
com.twitter:algebird-core) provide artifacts for both - Scala 2.11 and 2.12.

Lukasz


[jira] [Commented] (HADOOP-16018) DistCp won't reassemble chunks when blocks per chunk > 0

2018-12-22 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/HADOOP-16018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16727525#comment-16727525
 ] 

Ted Yu commented on HADOOP-16018:
-

Looking at 
https://github.com/apache/hadoop/commits/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
 , it was not touched by HADOOP-15850

> DistCp won't reassemble chunks when blocks per chunk > 0
> 
>
> Key: HADOOP-16018
> URL: https://issues.apache.org/jira/browse/HADOOP-16018
> Project: Hadoop Common
>  Issue Type: Bug
>  Components: tools/distcp
>Affects Versions: 3.2.0, 2.9.2
>Reporter: Kai X
>Priority: Major
>
> I was investigating why hadoop-distcp-2.9.2 won't reassemble chunks of the 
> same file when blocks per chunk has been set > 0.
> In the CopyCommitter::commitJob, this logic can prevent chunks from 
> reassembling if blocks per chunk is equal to 0:
> {code:java}
> if (blocksPerChunk > 0) {
>   concatFileChunks(conf);
> }
> {code}
> Then in CopyCommitter's ctor, blocksPerChunk is initialised from the config:
> {code:java}
> blocksPerChunk = context.getConfiguration().getInt(
> DistCpOptionSwitch.BLOCKS_PER_CHUNK.getConfigLabel(), 0);
> {code}
>  
> But here the config key DistCpOptionSwitch.BLOCKS_PER_CHUNK.getConfigLabel() 
> will always returns empty string because it is constructed without config 
> label:
> {code:java}
> BLOCKS_PER_CHUNK("",
> new Option("blocksperchunk", true, "If set to a positive value, files"
> + "with more blocks than this value will be split into chunks of "
> + " blocks to be transferred in parallel, and "
> + "reassembled on the destination. By default,  is "
> + "0 and the files will be transmitted in their entirety without "
> + "splitting. This switch is only applicable when the source file "
> + "system implements getBlockLocations method and the target file "
> + "system implements concat method"))
> {code}
> As a result it will fall back to the default value 0 for blocksPerChunk, and 
> prevent the chunks from reassembling.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[jira] [Commented] (HBASE-21619) Fix warning message caused by incorrect ternary operator evaluation

2018-12-19 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/HBASE-21619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725397#comment-16725397
 ] 

Ted Yu commented on HBASE-21619:


lgtm

> Fix warning message caused by incorrect ternary operator evaluation
> ---
>
> Key: HBASE-21619
> URL: https://issues.apache.org/jira/browse/HBASE-21619
> Project: HBase
>  Issue Type: Bug
>Reporter: Wei-Chiu Chuang
>Assignee: Wei-Chiu Chuang
>Priority: Trivial
> Attachments: HBASE-21619.master.001.patch
>
>
> {code:title=LoadIncrementalHFiles#doBulkLoad}
> LOG.warn(
>   "Bulk load operation did not find any files to load in " + 
> "directory " + hfofDir != null
>   ? hfofDir.toUri().toString()
>   : "" + ".  Does it contain files in " +
>   "subdirectories that correspond to column family names?");
> {code}
> JDK complains {{"Bulk load operation did not find any files to load in " + 
> "directory " + hfofDir != null}} is always true, which is not what is 
> intended, and that produces a wrong message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [PROPOSAL] Move to gitbox.apache.org

2018-12-07 Thread Ted Yu
+1
 Original message From: Prashant Sharma  
Date: 12/7/18  9:01 PM  (GMT-08:00) To: dev@bahir.apache.org Subject: Re: 
[PROPOSAL] Move to gitbox.apache.org 
+1



On Sat, Dec 8, 2018 at 2:56 AM Christian Kadner  wrote:

> +1
>
> Thanks JB!
>
> On 2018/12/07 17:20:59, Luciano Resende  wrote:
> > I think we can drive this with consensus (unless infra requires a vote)
> >
> > +1
> > On Fri, Dec 7, 2018 at 9:05 AM Jean-Baptiste Onofré 
> wrote:
> > >
> > > Hi all,
> > >
> > > our repositories are currently located on git-wip-us.apache.org.
> > >
> > > This service will be decommissioned in the coming months.
> > >
> > > I propose to move to gitbox.apache.org.
> > >
> > > I can start a formal vote and if it's OK, I'm volunteer to deal with
> infra.
> > >
> > > Thoughts ?
> > >
> > > Regards
> > > JB
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> >
> >
> >
> > --
> > Luciano Resende
> > http://twitter.com/lresende1975
> > http://lresende.blogspot.com/
> >
>


[jira] [Updated] (HBASE-21246) Introduce WALIdentity interface

2018-12-07 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21246:
---
Status: Patch Available  (was: Open)

> Introduce WALIdentity interface
> ---
>
> Key: HBASE-21246
> URL: https://issues.apache.org/jira/browse/HBASE-21246
> Project: HBase
>  Issue Type: Sub-task
>    Reporter: Ted Yu
>    Assignee: Ted Yu
>Priority: Major
> Fix For: HBASE-20952
>
> Attachments: 21246.003.patch, 21246.20.txt, 21246.21.txt, 
> 21246.23.txt, 21246.24.txt, 21246.25.txt, 21246.26.txt, 21246.34.txt, 
> 21246.37.txt, 21246.39.txt, 21246.41.txt, 21246.43.txt, 
> 21246.HBASE-20952.001.patch, 21246.HBASE-20952.002.patch, 
> 21246.HBASE-20952.004.patch, 21246.HBASE-20952.005.patch, 
> 21246.HBASE-20952.007.patch, 21246.HBASE-20952.008.patch, 
> HBASE-21246.master.001.patch, replication-src-creates-wal-reader.jpg, 
> wal-factory-providers.png, wal-providers.png, wal-splitter-reader.jpg, 
> wal-splitter-writer.jpg
>
>
> We are introducing WALIdentity interface so that the WAL representation can 
> be decoupled from distributed filesystem.
> The interface provides getName method whose return value can represent 
> filename in distributed filesystem environment or, the name of the stream 
> when the WAL is backed by log stream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] Apache Bahir 2.3.1 (RC1)

2018-12-01 Thread Ted Yu
+1

Ran test suite which passed.

On Fri, Nov 30, 2018 at 9:13 PM Jean-Baptiste Onofré 
wrote:

> +1 (binding)
>
> Regards
> JB
>
> On 30/11/2018 17:11, Luciano Resende wrote:
> > Dear community member,
> >
> > Please vote to approve the release of Apache Bahir 2.3.1 (RC1) based on
> > Apache Spark 2.3.1.
> >
> > Tag: v2.3.1-rc1 (6c4d67e0a99bcbbad199b7d1d26f3624491070b4)
> >
> > https://github.com/apache/bahir/tree/v2.3.1-rc1
> >
> > Release files:
> >
> > https://repository.apache.org/content/repositories/orgapachebahir-1026
> >
> > Source distribution:
> >
> > https://dist.apache.org/repos/dist/dev/bahir/bahir-spark/2.3.1-rc1/
> >
> >
> > The vote is open for at least 72 hours and passes if a majority of at
> least
> > 3 +1 PMC votes are cast.
> >
> >   [ ] +1 Release this package as Apache Bahir 2.3.1
> >   [ ] -1 Do not release this package because ...
> >
> >
> > Thanks for your vote!
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


[jira] [Commented] (HBASE-21479) Individual tests in TestHRegionReplayEvents class are failing

2018-12-01 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/HBASE-21479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16705903#comment-16705903
 ] 

Ted Yu commented on HBASE-21479:


Ran 
TestHRegionReplayEvents#testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent 
locally which passed

> Individual tests in TestHRegionReplayEvents class are failing
> -
>
> Key: HBASE-21479
> URL: https://issues.apache.org/jira/browse/HBASE-21479
> Project: HBase
>  Issue Type: Bug
>    Reporter: Ted Yu
>Assignee: Peter Somogyi
>Priority: Major
> Fix For: 3.0.0, 2.2.0, 2.1.2
>
> Attachments: HBASE-21479-v1.patch, testHRegionReplayEvents-output.txt
>
>
> The test fails in both master branch and branch-2 :
> {code}
> testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent(org.apache.hadoop.hbase.regionserver.TestHRegionReplayEvents)
>   Time elapsed: 3.74 sec  <<< ERROR!
> java.lang.IndexOutOfBoundsException: Index: 2, Size: 1
>   at 
> org.apache.hadoop.hbase.regionserver.TestHRegionReplayEvents.testSkippingEditsWithSmallerSeqIdAfterRegionOpenEvent(TestHRegionReplayEvents.java:1042)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] Apache Bahir 2.3.2 (RC1)

2018-12-01 Thread Ted Yu
+1

Ran test suite which passed

On Fri, Nov 30, 2018 at 12:57 PM Luciano Resende 
wrote:

> Dear community member,
>
> Please vote to approve the release of Apache Bahir 2.3.2 (RC1) based on
> Apache Spark 2.3.2.
>
> Tag: v2.3.2-rc1 (6d24b270fd713352faf1c93f08f835e39e496489)
>
> https://github.com/apache/bahir/tree/v2.3.2-rc1
>
> Release files:
>
> https://repository.apache.org/content/repositories/orgapachebahir-1027
>
> Source distribution:
>
> https://dist.apache.org/repos/dist/dev/bahir/bahir-spark/2.3.2-rc1/
>
>
> The vote is open for at least 72 hours and passes if a majority of at least
> 3 +1 PMC votes are cast.
>
>   [ ] +1 Release this package as Apache Bahir 2.3.2
>   [ ] -1 Do not release this package because ...
>
>
> Thanks for your vote!
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>


Re: [VOTE] Apache Bahir 2.3.0 (RC1)

2018-11-30 Thread Ted Yu
+1

Ran test suite which passed.

On Fri, Nov 30, 2018 at 6:43 AM Luciano Resende 
wrote:

> Dear community member,
>
> Please vote to approve the release of Apache Bahir 2.3.0 (RC1) based on
> Apache Spark 2.3.0.
>
> Tag: v2.3.0-rc1 (e2d138bd2a927d0ab42e45739d8938eefafca352)
>
> https://github.com/apache/bahir/tree/v2.3.0-rc1
>
> Release files:
>
> https://repository.apache.org/content/repositories/orgapachebahir-1025
>
> Source distribution:
>
> https://dist.apache.org/repos/dist/dev/bahir/bahir-spark/2.3.0-rc1/
>
>
> The vote is open for at least 72 hours and passes if a majority of at least
> 3 +1 PMC votes are cast.
>
>   [ ] +1 Release this package as Apache Bahir 2.3.0
>   [ ] -1 Do not release this package because ...
>
>
> Thanks for your vote!
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>


[jira] [Updated] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-26 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21511:
---
Fix Version/s: 1.4.9
   1.3.3

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>    Assignee: Ted Yu
>Priority: Minor
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 1.4.9, 2.1.2
>
> Attachments: 21511.branch-1.v3.txt, 21511.branch-1.v4.txt, 
> 21511.v1.txt, 21511.v2.txt, 21511.v3.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-25 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21511:
---
   Resolution: Fixed
 Hadoop Flags: Reviewed
Fix Version/s: (was: 2.0.4)
   (was: 1.4.9)
   (was: 1.3.3)
   Status: Resolved  (was: Patch Available)

Thanks for the review, Zheng.

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>    Assignee: Ted Yu
>Priority: Minor
> Fix For: 3.0.0, 1.5.0, 2.2.0, 2.1.2
>
> Attachments: 21511.branch-1.v3.txt, 21511.branch-1.v4.txt, 
> 21511.v1.txt, 21511.v2.txt, 21511.v3.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-25 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21511:
---
Attachment: 21511.branch-1.v4.txt

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>    Assignee: Ted Yu
>Priority: Minor
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 1.4.9, 2.1.2, 2.0.4
>
> Attachments: 21511.branch-1.v3.txt, 21511.branch-1.v4.txt, 
> 21511.v1.txt, 21511.v2.txt, 21511.v3.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-25 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21511:
---
Attachment: 21511.branch-1.v3.txt

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>    Assignee: Ted Yu
>Priority: Minor
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 1.4.9, 2.1.2, 2.0.4
>
> Attachments: 21511.branch-1.v3.txt, 21511.v1.txt, 21511.v2.txt, 
> 21511.v3.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-25 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu reassigned HBASE-21511:
--

Assignee: Ted Yu

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>    Assignee: Ted Yu
>Priority: Minor
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 1.4.9, 2.1.2, 2.0.4
>
> Attachments: 21511.v1.txt, 21511.v2.txt, 21511.v3.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-24 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21511:
---
Attachment: 21511.v3.txt

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>Priority: Minor
> Attachments: 21511.v1.txt, 21511.v2.txt, 21511.v3.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-24 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21511:
---
Attachment: 21511.v2.txt

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>Priority: Minor
> Attachments: 21511.v1.txt, 21511.v2.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21387) Race condition surrounding in progress snapshot handling in snapshot cache leads to loss of snapshot files

2018-11-24 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21387:
---
Attachment: 21511.v2.txt

> Race condition surrounding in progress snapshot handling in snapshot cache 
> leads to loss of snapshot files
> --
>
> Key: HBASE-21387
> URL: https://issues.apache.org/jira/browse/HBASE-21387
> Project: HBase
>  Issue Type: Bug
>  Components: snapshots
>Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
>  Labels: snapshot
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 2.0.3, 1.4.9, 2.1.2, 1.2.10
>
> Attachments: 0001-UT.patch, 21387-suggest.txt, 21387.addendum.txt, 
> 21387.dbg.txt, 21387.v10.txt, 21387.v11.txt, 21387.v12.txt, 21387.v2.txt, 
> 21387.v3.txt, 21387.v6.txt, 21387.v7.txt, 21387.v8.txt, 21387.v9.txt, 
> 21511.v2.txt, HBASE-21387.branch-1.2.patch, HBASE-21387.branch-1.3.patch, 
> HBASE-21387.branch-1.patch, HBASE-21387.v13.patch, HBASE-21387.v14.patch, 
> HBASE-21387.v15.patch, HBASE-21387.v16.patch, HBASE-21387.v17.patch, 
> two-pass-cleaner.v4.txt, two-pass-cleaner.v6.txt, two-pass-cleaner.v9.txt
>
>
> During recent report from customer where ExportSnapshot failed:
> {code}
> 2018-10-09 18:54:32,559 ERROR [VerifySnapshot-pool1-t2] 
> snapshot.SnapshotReferenceUtil: Can't find hfile: 
> 44f6c3c646e84de6a63fe30da4fcb3aa in the real 
> (hdfs://in.com:8020/apps/hbase/data/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  or archive 
> (hdfs://in.com:8020/apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  directory for the primary table. 
> {code}
> We found the following in log:
> {code}
> 2018-10-09 18:54:23,675 DEBUG 
> [00:16000.activeMasterManager-HFileCleaner.large-1539035367427] 
> cleaner.HFileCleaner: Removing: 
> hdfs:///apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa 
> from archive
> {code}
> The root cause is race condition surrounding in progress snapshot(s) handling 
> between refreshCache() and getUnreferencedFiles().
> There are two callers of refreshCache: one from RefreshCacheTask#run and the 
> other from SnapshotHFileCleaner.
> Let's look at the code of refreshCache:
> {code}
>   if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
> {code}
> whose intention is to exclude in progress snapshot(s).
> Suppose when the RefreshCacheTask runs refreshCache, there is some in 
> progress snapshot (about to finish).
> When SnapshotHFileCleaner calls getUnreferencedFiles(), it sees that 
> lastModifiedTime is up to date. So cleaner proceeds to check in progress 
> snapshot(s). However, the snapshot has completed by that time, resulting in 
> some file(s) deemed unreferenced.
> Here is timeline given by Josh illustrating the scenario:
> At time T0, we are checking if F1 is referenced. At time T1, there is a 
> snapshot S1 in progress that is referencing a file F1. refreshCache() is 
> called, but no completed snapshot references F1. At T2, the snapshot S1, 
> which references F1, completes. At T3, we check in-progress snapshots and S1 
> is not included. Thus, F1 is marked as unreferenced even though S1 references 
> it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-24 Thread Ted Yu (JIRA)
Ted Yu created HBASE-21511:
--

 Summary: Remove in progress snapshot check in 
SnapshotFileCache#getUnreferencedFiles
 Key: HBASE-21511
 URL: https://issues.apache.org/jira/browse/HBASE-21511
 Project: HBase
  Issue Type: Improvement
Reporter: Ted Yu
 Attachments: 21511.v1.txt

During review of HBASE-21387, [~Apache9] mentioned that the check for in 
progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
needed now that snapshot hfile cleaner and taking snapshot are mutually 
exclusive.

This issue is to address the review comment by removing the check for in 
progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-24 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21511:
---
Attachment: 21511.v1.txt

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>Priority: Minor
> Attachments: 21511.v1.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-24 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21511:
---
Attachment: (was: 21511.v1.txt)

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>Priority: Minor
> Attachments: 21511.v1.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-24 Thread Ted Yu (JIRA)
Ted Yu created HBASE-21511:
--

 Summary: Remove in progress snapshot check in 
SnapshotFileCache#getUnreferencedFiles
 Key: HBASE-21511
 URL: https://issues.apache.org/jira/browse/HBASE-21511
 Project: HBase
  Issue Type: Improvement
Reporter: Ted Yu
 Attachments: 21511.v1.txt

During review of HBASE-21387, [~Apache9] mentioned that the check for in 
progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
needed now that snapshot hfile cleaner and taking snapshot are mutually 
exclusive.

This issue is to address the review comment by removing the check for in 
progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-24 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21511:
---
Status: Patch Available  (was: Open)

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>Priority: Minor
> Attachments: 21511.v1.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21511) Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles

2018-11-24 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21511:
---
Attachment: 21511.v1.txt

> Remove in progress snapshot check in SnapshotFileCache#getUnreferencedFiles
> ---
>
> Key: HBASE-21511
> URL: https://issues.apache.org/jira/browse/HBASE-21511
> Project: HBase
>  Issue Type: Improvement
>    Reporter: Ted Yu
>Priority: Minor
> Attachments: 21511.v1.txt
>
>
> During review of HBASE-21387, [~Apache9] mentioned that the check for in 
> progress snapshots in SnapshotFileCache#getUnreferencedFiles is no longer 
> needed now that snapshot hfile cleaner and taking snapshot are mutually 
> exclusive.
> This issue is to address the review comment by removing the check for in 
> progress snapshots.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (HBASE-21387) Race condition surrounding in progress snapshot handling in snapshot cache leads to loss of snapshot files

2018-11-23 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/HBASE-21387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697623#comment-16697623
 ] 

Ted Yu commented on HBASE-21387:


Ran the failed test locally with addendum which passed.

> Race condition surrounding in progress snapshot handling in snapshot cache 
> leads to loss of snapshot files
> --
>
> Key: HBASE-21387
> URL: https://issues.apache.org/jira/browse/HBASE-21387
> Project: HBase
>  Issue Type: Bug
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
>  Labels: snapshot
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 2.0.3, 1.4.9, 2.1.2, 1.2.10
>
> Attachments: 0001-UT.patch, 21387-suggest.txt, 21387.addendum.txt, 
> 21387.dbg.txt, 21387.v10.txt, 21387.v11.txt, 21387.v12.txt, 21387.v2.txt, 
> 21387.v3.txt, 21387.v6.txt, 21387.v7.txt, 21387.v8.txt, 21387.v9.txt, 
> HBASE-21387.branch-1.2.patch, HBASE-21387.branch-1.3.patch, 
> HBASE-21387.branch-1.patch, HBASE-21387.v13.patch, HBASE-21387.v14.patch, 
> HBASE-21387.v15.patch, HBASE-21387.v16.patch, HBASE-21387.v17.patch, 
> two-pass-cleaner.v4.txt, two-pass-cleaner.v6.txt, two-pass-cleaner.v9.txt
>
>
> During recent report from customer where ExportSnapshot failed:
> {code}
> 2018-10-09 18:54:32,559 ERROR [VerifySnapshot-pool1-t2] 
> snapshot.SnapshotReferenceUtil: Can't find hfile: 
> 44f6c3c646e84de6a63fe30da4fcb3aa in the real 
> (hdfs://in.com:8020/apps/hbase/data/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  or archive 
> (hdfs://in.com:8020/apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  directory for the primary table. 
> {code}
> We found the following in log:
> {code}
> 2018-10-09 18:54:23,675 DEBUG 
> [00:16000.activeMasterManager-HFileCleaner.large-1539035367427] 
> cleaner.HFileCleaner: Removing: 
> hdfs:///apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa 
> from archive
> {code}
> The root cause is race condition surrounding in progress snapshot(s) handling 
> between refreshCache() and getUnreferencedFiles().
> There are two callers of refreshCache: one from RefreshCacheTask#run and the 
> other from SnapshotHFileCleaner.
> Let's look at the code of refreshCache:
> {code}
>   if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
> {code}
> whose intention is to exclude in progress snapshot(s).
> Suppose when the RefreshCacheTask runs refreshCache, there is some in 
> progress snapshot (about to finish).
> When SnapshotHFileCleaner calls getUnreferencedFiles(), it sees that 
> lastModifiedTime is up to date. So cleaner proceeds to check in progress 
> snapshot(s). However, the snapshot has completed by that time, resulting in 
> some file(s) deemed unreferenced.
> Here is timeline given by Josh illustrating the scenario:
> At time T0, we are checking if F1 is referenced. At time T1, there is a 
> snapshot S1 in progress that is referencing a file F1. refreshCache() is 
> called, but no completed snapshot references F1. At T2, the snapshot S1, 
> which references F1, completes. At T3, we check in-progress snapshots and S1 
> is not included. Thus, F1 is marked as unreferenced even though S1 references 
> it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21387) Race condition surrounding in progress snapshot handling in snapshot cache leads to loss of snapshot files

2018-11-23 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21387:
---
Status: Patch Available  (was: Reopened)

> Race condition surrounding in progress snapshot handling in snapshot cache 
> leads to loss of snapshot files
> --
>
> Key: HBASE-21387
> URL: https://issues.apache.org/jira/browse/HBASE-21387
> Project: HBase
>  Issue Type: Bug
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
>  Labels: snapshot
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 2.0.3, 1.4.9, 2.1.2, 1.2.10
>
> Attachments: 0001-UT.patch, 21387-suggest.txt, 21387.addendum.txt, 
> 21387.dbg.txt, 21387.v10.txt, 21387.v11.txt, 21387.v12.txt, 21387.v2.txt, 
> 21387.v3.txt, 21387.v6.txt, 21387.v7.txt, 21387.v8.txt, 21387.v9.txt, 
> HBASE-21387.branch-1.2.patch, HBASE-21387.branch-1.3.patch, 
> HBASE-21387.branch-1.patch, HBASE-21387.v13.patch, HBASE-21387.v14.patch, 
> HBASE-21387.v15.patch, HBASE-21387.v16.patch, HBASE-21387.v17.patch, 
> two-pass-cleaner.v4.txt, two-pass-cleaner.v6.txt, two-pass-cleaner.v9.txt
>
>
> During recent report from customer where ExportSnapshot failed:
> {code}
> 2018-10-09 18:54:32,559 ERROR [VerifySnapshot-pool1-t2] 
> snapshot.SnapshotReferenceUtil: Can't find hfile: 
> 44f6c3c646e84de6a63fe30da4fcb3aa in the real 
> (hdfs://in.com:8020/apps/hbase/data/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  or archive 
> (hdfs://in.com:8020/apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  directory for the primary table. 
> {code}
> We found the following in log:
> {code}
> 2018-10-09 18:54:23,675 DEBUG 
> [00:16000.activeMasterManager-HFileCleaner.large-1539035367427] 
> cleaner.HFileCleaner: Removing: 
> hdfs:///apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa 
> from archive
> {code}
> The root cause is race condition surrounding in progress snapshot(s) handling 
> between refreshCache() and getUnreferencedFiles().
> There are two callers of refreshCache: one from RefreshCacheTask#run and the 
> other from SnapshotHFileCleaner.
> Let's look at the code of refreshCache:
> {code}
>   if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
> {code}
> whose intention is to exclude in progress snapshot(s).
> Suppose when the RefreshCacheTask runs refreshCache, there is some in 
> progress snapshot (about to finish).
> When SnapshotHFileCleaner calls getUnreferencedFiles(), it sees that 
> lastModifiedTime is up to date. So cleaner proceeds to check in progress 
> snapshot(s). However, the snapshot has completed by that time, resulting in 
> some file(s) deemed unreferenced.
> Here is timeline given by Josh illustrating the scenario:
> At time T0, we are checking if F1 is referenced. At time T1, there is a 
> snapshot S1 in progress that is referencing a file F1. refreshCache() is 
> called, but no completed snapshot references F1. At T2, the snapshot S1, 
> which references F1, completes. At T3, we check in-progress snapshots and S1 
> is not included. Thus, F1 is marked as unreferenced even though S1 references 
> it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (HBASE-21387) Race condition surrounding in progress snapshot handling in snapshot cache leads to loss of snapshot files

2018-11-23 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/HBASE-21387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16697520#comment-16697520
 ] 

Ted Yu commented on HBASE-21387:


TestSnapshotFileCache fails across branches.

https://builds.apache.org/job/HBase-Flaky-Tests/job/master/1987/testReport/junit/org.apache.hadoop.hbase.master.snapshot/TestSnapshotFileCache/

One condition on SnapshotFileCache was incorrect, resulting in the following 
being logged repeatedly.
{code}
  LOG.warn("Not checking unreferenced files since snapshot is running, 
it will "
  + "skip to clean the HFiles this time");
{code}
With addendum, the test passes.

> Race condition surrounding in progress snapshot handling in snapshot cache 
> leads to loss of snapshot files
> --
>
> Key: HBASE-21387
> URL: https://issues.apache.org/jira/browse/HBASE-21387
> Project: HBase
>      Issue Type: Bug
>        Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
>  Labels: snapshot
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 2.0.3, 1.4.9, 2.1.2, 1.2.10
>
> Attachments: 0001-UT.patch, 21387-suggest.txt, 21387.addendum.txt, 
> 21387.dbg.txt, 21387.v10.txt, 21387.v11.txt, 21387.v12.txt, 21387.v2.txt, 
> 21387.v3.txt, 21387.v6.txt, 21387.v7.txt, 21387.v8.txt, 21387.v9.txt, 
> HBASE-21387.branch-1.2.patch, HBASE-21387.branch-1.3.patch, 
> HBASE-21387.branch-1.patch, HBASE-21387.v13.patch, HBASE-21387.v14.patch, 
> HBASE-21387.v15.patch, HBASE-21387.v16.patch, HBASE-21387.v17.patch, 
> two-pass-cleaner.v4.txt, two-pass-cleaner.v6.txt, two-pass-cleaner.v9.txt
>
>
> During recent report from customer where ExportSnapshot failed:
> {code}
> 2018-10-09 18:54:32,559 ERROR [VerifySnapshot-pool1-t2] 
> snapshot.SnapshotReferenceUtil: Can't find hfile: 
> 44f6c3c646e84de6a63fe30da4fcb3aa in the real 
> (hdfs://in.com:8020/apps/hbase/data/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  or archive 
> (hdfs://in.com:8020/apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  directory for the primary table. 
> {code}
> We found the following in log:
> {code}
> 2018-10-09 18:54:23,675 DEBUG 
> [00:16000.activeMasterManager-HFileCleaner.large-1539035367427] 
> cleaner.HFileCleaner: Removing: 
> hdfs:///apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa 
> from archive
> {code}
> The root cause is race condition surrounding in progress snapshot(s) handling 
> between refreshCache() and getUnreferencedFiles().
> There are two callers of refreshCache: one from RefreshCacheTask#run and the 
> other from SnapshotHFileCleaner.
> Let's look at the code of refreshCache:
> {code}
>   if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
> {code}
> whose intention is to exclude in progress snapshot(s).
> Suppose when the RefreshCacheTask runs refreshCache, there is some in 
> progress snapshot (about to finish).
> When SnapshotHFileCleaner calls getUnreferencedFiles(), it sees that 
> lastModifiedTime is up to date. So cleaner proceeds to check in progress 
> snapshot(s). However, the snapshot has completed by that time, resulting in 
> some file(s) deemed unreferenced.
> Here is timeline given by Josh illustrating the scenario:
> At time T0, we are checking if F1 is referenced. At time T1, there is a 
> snapshot S1 in progress that is referencing a file F1. refreshCache() is 
> called, but no completed snapshot references F1. At T2, the snapshot S1, 
> which references F1, completes. At T3, we check in-progress snapshots and S1 
> is not included. Thus, F1 is marked as unreferenced even though S1 references 
> it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21387) Race condition surrounding in progress snapshot handling in snapshot cache leads to loss of snapshot files

2018-11-23 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21387:
---
Attachment: 21387.addendum.txt

> Race condition surrounding in progress snapshot handling in snapshot cache 
> leads to loss of snapshot files
> --
>
> Key: HBASE-21387
> URL: https://issues.apache.org/jira/browse/HBASE-21387
> Project: HBase
>  Issue Type: Bug
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
>  Labels: snapshot
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 2.0.3, 1.4.9, 2.1.2, 1.2.10
>
> Attachments: 0001-UT.patch, 21387-suggest.txt, 21387.addendum.txt, 
> 21387.dbg.txt, 21387.v10.txt, 21387.v11.txt, 21387.v12.txt, 21387.v2.txt, 
> 21387.v3.txt, 21387.v6.txt, 21387.v7.txt, 21387.v8.txt, 21387.v9.txt, 
> HBASE-21387.branch-1.2.patch, HBASE-21387.branch-1.3.patch, 
> HBASE-21387.branch-1.patch, HBASE-21387.v13.patch, HBASE-21387.v14.patch, 
> HBASE-21387.v15.patch, HBASE-21387.v16.patch, HBASE-21387.v17.patch, 
> two-pass-cleaner.v4.txt, two-pass-cleaner.v6.txt, two-pass-cleaner.v9.txt
>
>
> During recent report from customer where ExportSnapshot failed:
> {code}
> 2018-10-09 18:54:32,559 ERROR [VerifySnapshot-pool1-t2] 
> snapshot.SnapshotReferenceUtil: Can't find hfile: 
> 44f6c3c646e84de6a63fe30da4fcb3aa in the real 
> (hdfs://in.com:8020/apps/hbase/data/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  or archive 
> (hdfs://in.com:8020/apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  directory for the primary table. 
> {code}
> We found the following in log:
> {code}
> 2018-10-09 18:54:23,675 DEBUG 
> [00:16000.activeMasterManager-HFileCleaner.large-1539035367427] 
> cleaner.HFileCleaner: Removing: 
> hdfs:///apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa 
> from archive
> {code}
> The root cause is race condition surrounding in progress snapshot(s) handling 
> between refreshCache() and getUnreferencedFiles().
> There are two callers of refreshCache: one from RefreshCacheTask#run and the 
> other from SnapshotHFileCleaner.
> Let's look at the code of refreshCache:
> {code}
>   if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
> {code}
> whose intention is to exclude in progress snapshot(s).
> Suppose when the RefreshCacheTask runs refreshCache, there is some in 
> progress snapshot (about to finish).
> When SnapshotHFileCleaner calls getUnreferencedFiles(), it sees that 
> lastModifiedTime is up to date. So cleaner proceeds to check in progress 
> snapshot(s). However, the snapshot has completed by that time, resulting in 
> some file(s) deemed unreferenced.
> Here is timeline given by Josh illustrating the scenario:
> At time T0, we are checking if F1 is referenced. At time T1, there is a 
> snapshot S1 in progress that is referencing a file F1. refreshCache() is 
> called, but no completed snapshot references F1. At T2, the snapshot S1, 
> which references F1, completes. At T3, we check in-progress snapshots and S1 
> is not included. Thus, F1 is marked as unreferenced even though S1 references 
> it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (HBASE-21387) Race condition surrounding in progress snapshot handling in snapshot cache leads to loss of snapshot files

2018-11-23 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu reopened HBASE-21387:


> Race condition surrounding in progress snapshot handling in snapshot cache 
> leads to loss of snapshot files
> --
>
> Key: HBASE-21387
> URL: https://issues.apache.org/jira/browse/HBASE-21387
> Project: HBase
>  Issue Type: Bug
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
>  Labels: snapshot
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 2.0.3, 1.4.9, 2.1.2, 1.2.10
>
> Attachments: 0001-UT.patch, 21387-suggest.txt, 21387.dbg.txt, 
> 21387.v10.txt, 21387.v11.txt, 21387.v12.txt, 21387.v2.txt, 21387.v3.txt, 
> 21387.v6.txt, 21387.v7.txt, 21387.v8.txt, 21387.v9.txt, 
> HBASE-21387.branch-1.2.patch, HBASE-21387.branch-1.3.patch, 
> HBASE-21387.branch-1.patch, HBASE-21387.v13.patch, HBASE-21387.v14.patch, 
> HBASE-21387.v15.patch, HBASE-21387.v16.patch, HBASE-21387.v17.patch, 
> two-pass-cleaner.v4.txt, two-pass-cleaner.v6.txt, two-pass-cleaner.v9.txt
>
>
> During recent report from customer where ExportSnapshot failed:
> {code}
> 2018-10-09 18:54:32,559 ERROR [VerifySnapshot-pool1-t2] 
> snapshot.SnapshotReferenceUtil: Can't find hfile: 
> 44f6c3c646e84de6a63fe30da4fcb3aa in the real 
> (hdfs://in.com:8020/apps/hbase/data/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  or archive 
> (hdfs://in.com:8020/apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  directory for the primary table. 
> {code}
> We found the following in log:
> {code}
> 2018-10-09 18:54:23,675 DEBUG 
> [00:16000.activeMasterManager-HFileCleaner.large-1539035367427] 
> cleaner.HFileCleaner: Removing: 
> hdfs:///apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa 
> from archive
> {code}
> The root cause is race condition surrounding in progress snapshot(s) handling 
> between refreshCache() and getUnreferencedFiles().
> There are two callers of refreshCache: one from RefreshCacheTask#run and the 
> other from SnapshotHFileCleaner.
> Let's look at the code of refreshCache:
> {code}
>   if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
> {code}
> whose intention is to exclude in progress snapshot(s).
> Suppose when the RefreshCacheTask runs refreshCache, there is some in 
> progress snapshot (about to finish).
> When SnapshotHFileCleaner calls getUnreferencedFiles(), it sees that 
> lastModifiedTime is up to date. So cleaner proceeds to check in progress 
> snapshot(s). However, the snapshot has completed by that time, resulting in 
> some file(s) deemed unreferenced.
> Here is timeline given by Josh illustrating the scenario:
> At time T0, we are checking if F1 is referenced. At time T1, there is a 
> snapshot S1 in progress that is referencing a file F1. refreshCache() is 
> called, but no completed snapshot references F1. At T2, the snapshot S1, 
> which references F1, completes. At T3, we check in-progress snapshots and S1 
> is not included. Thus, F1 is marked as unreferenced even though S1 references 
> it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (HBASE-21387) Race condition surrounding in progress snapshot handling in snapshot cache leads to loss of snapshot files

2018-11-23 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu reopened HBASE-21387:


> Race condition surrounding in progress snapshot handling in snapshot cache 
> leads to loss of snapshot files
> --
>
> Key: HBASE-21387
> URL: https://issues.apache.org/jira/browse/HBASE-21387
> Project: HBase
>  Issue Type: Bug
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
>  Labels: snapshot
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 2.0.3, 1.4.9, 2.1.2, 1.2.10
>
> Attachments: 0001-UT.patch, 21387-suggest.txt, 21387.dbg.txt, 
> 21387.v10.txt, 21387.v11.txt, 21387.v12.txt, 21387.v2.txt, 21387.v3.txt, 
> 21387.v6.txt, 21387.v7.txt, 21387.v8.txt, 21387.v9.txt, 
> HBASE-21387.branch-1.2.patch, HBASE-21387.branch-1.3.patch, 
> HBASE-21387.branch-1.patch, HBASE-21387.v13.patch, HBASE-21387.v14.patch, 
> HBASE-21387.v15.patch, HBASE-21387.v16.patch, HBASE-21387.v17.patch, 
> two-pass-cleaner.v4.txt, two-pass-cleaner.v6.txt, two-pass-cleaner.v9.txt
>
>
> During recent report from customer where ExportSnapshot failed:
> {code}
> 2018-10-09 18:54:32,559 ERROR [VerifySnapshot-pool1-t2] 
> snapshot.SnapshotReferenceUtil: Can't find hfile: 
> 44f6c3c646e84de6a63fe30da4fcb3aa in the real 
> (hdfs://in.com:8020/apps/hbase/data/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  or archive 
> (hdfs://in.com:8020/apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  directory for the primary table. 
> {code}
> We found the following in log:
> {code}
> 2018-10-09 18:54:23,675 DEBUG 
> [00:16000.activeMasterManager-HFileCleaner.large-1539035367427] 
> cleaner.HFileCleaner: Removing: 
> hdfs:///apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa 
> from archive
> {code}
> The root cause is race condition surrounding in progress snapshot(s) handling 
> between refreshCache() and getUnreferencedFiles().
> There are two callers of refreshCache: one from RefreshCacheTask#run and the 
> other from SnapshotHFileCleaner.
> Let's look at the code of refreshCache:
> {code}
>   if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
> {code}
> whose intention is to exclude in progress snapshot(s).
> Suppose when the RefreshCacheTask runs refreshCache, there is some in 
> progress snapshot (about to finish).
> When SnapshotHFileCleaner calls getUnreferencedFiles(), it sees that 
> lastModifiedTime is up to date. So cleaner proceeds to check in progress 
> snapshot(s). However, the snapshot has completed by that time, resulting in 
> some file(s) deemed unreferenced.
> Here is timeline given by Josh illustrating the scenario:
> At time T0, we are checking if F1 is referenced. At time T1, there is a 
> snapshot S1 in progress that is referencing a file F1. refreshCache() is 
> called, but no completed snapshot references F1. At T2, the snapshot S1, 
> which references F1, completes. At T3, we check in-progress snapshots and S1 
> is not included. Thus, F1 is marked as unreferenced even though S1 references 
> it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (HBASE-21387) Race condition surrounding in progress snapshot handling in snapshot cache leads to loss of snapshot files

2018-11-22 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/HBASE-21387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated HBASE-21387:
---
Hadoop Flags: Reviewed
Release Note: To prevent race condition between in progress snapshot 
(performed by TakeSnapshotHandler) and HFileCleaner which results in data loss, 
this JIRA introduced mutual exclusion between taking snapshot and running 
HFileCleaner. That is, at any given moment, either some snapshot can be taken 
or, HFileCleaner checks hfiles which are not referenced, but not both can be 
running.

> Race condition surrounding in progress snapshot handling in snapshot cache 
> leads to loss of snapshot files
> --
>
> Key: HBASE-21387
> URL: https://issues.apache.org/jira/browse/HBASE-21387
> Project: HBase
>  Issue Type: Bug
>    Reporter: Ted Yu
>Assignee: Ted Yu
>Priority: Major
>  Labels: snapshot
> Fix For: 3.0.0, 1.5.0, 1.3.3, 2.2.0, 2.0.3, 1.4.9, 2.1.2, 1.2.10
>
> Attachments: 0001-UT.patch, 21387-suggest.txt, 21387.dbg.txt, 
> 21387.v10.txt, 21387.v11.txt, 21387.v12.txt, 21387.v2.txt, 21387.v3.txt, 
> 21387.v6.txt, 21387.v7.txt, 21387.v8.txt, 21387.v9.txt, 
> HBASE-21387.branch-1.2.patch, HBASE-21387.branch-1.3.patch, 
> HBASE-21387.branch-1.patch, HBASE-21387.v13.patch, HBASE-21387.v14.patch, 
> HBASE-21387.v15.patch, HBASE-21387.v16.patch, HBASE-21387.v17.patch, 
> two-pass-cleaner.v4.txt, two-pass-cleaner.v6.txt, two-pass-cleaner.v9.txt
>
>
> During recent report from customer where ExportSnapshot failed:
> {code}
> 2018-10-09 18:54:32,559 ERROR [VerifySnapshot-pool1-t2] 
> snapshot.SnapshotReferenceUtil: Can't find hfile: 
> 44f6c3c646e84de6a63fe30da4fcb3aa in the real 
> (hdfs://in.com:8020/apps/hbase/data/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  or archive 
> (hdfs://in.com:8020/apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa)
>  directory for the primary table. 
> {code}
> We found the following in log:
> {code}
> 2018-10-09 18:54:23,675 DEBUG 
> [00:16000.activeMasterManager-HFileCleaner.large-1539035367427] 
> cleaner.HFileCleaner: Removing: 
> hdfs:///apps/hbase/data/archive/data/.../a/44f6c3c646e84de6a63fe30da4fcb3aa 
> from archive
> {code}
> The root cause is race condition surrounding in progress snapshot(s) handling 
> between refreshCache() and getUnreferencedFiles().
> There are two callers of refreshCache: one from RefreshCacheTask#run and the 
> other from SnapshotHFileCleaner.
> Let's look at the code of refreshCache:
> {code}
>   if (!name.equals(SnapshotDescriptionUtils.SNAPSHOT_TMP_DIR_NAME)) {
> {code}
> whose intention is to exclude in progress snapshot(s).
> Suppose when the RefreshCacheTask runs refreshCache, there is some in 
> progress snapshot (about to finish).
> When SnapshotHFileCleaner calls getUnreferencedFiles(), it sees that 
> lastModifiedTime is up to date. So cleaner proceeds to check in progress 
> snapshot(s). However, the snapshot has completed by that time, resulting in 
> some file(s) deemed unreferenced.
> Here is timeline given by Josh illustrating the scenario:
> At time T0, we are checking if F1 is referenced. At time T1, there is a 
> snapshot S1 in progress that is referencing a file F1. refreshCache() is 
> called, but no completed snapshot references F1. At T2, the snapshot S1, 
> which references F1, completes. At T3, we check in-progress snapshots and S1 
> is not included. Thus, F1 is marked as unreferenced even though S1 references 
> it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    1   2   3   4   5   6   7   8   9   10   >