[jira] [Resolved] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2020-05-14 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-7965.

Resolution: Fixed

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 1.1.1, 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: David Jacot
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-14 Thread Bruno Cadonna
Hi Sophie,

Thank you for the KIP.

The KIP looks good to me.

50th percentile:
I think we do not need it now. If we need it, we can add it. Here the
old truism applies: Adding is always easier than removing.

processor-node-level metrics:
I think it is good to have the staleness metrics also on
processor-node-level. If we do not want to record them on all
processor nodes, you could restrict the recording to stateful
processor-nodes, since those are the ones that would benefit most from
the staleness metrics.

Best,
Bruno

On Thu, May 14, 2020 at 4:15 AM Sophie Blee-Goldman  wrote:
>
> Yeah, the specific reason was just to align with the current metrics.
>
> Is it better to conform than to be right? History has a lot to say on that
> matter
> but I'm not sure how much of it applies to the fine details of metrics
> naming :P
>
> More seriously, I figured if people are looking at this metric they're
> likely to
> be looking at all the others. Then naming this one "-mean" would probably
> lead some to conclude that the "-avg" suffix in the other metrics has a
> different meaning.
>
> As for the percentiles, I actually like p99 (and p75) better. I'll swap
> that out
>
> On Wed, May 13, 2020 at 7:07 PM John Roesler  wrote:
>
> > Thanks Sophie,
> >
> > I hope this isn't too nit-picky, but is there a reason to choose "avg"
> > instead
> > of "mean"? Maybe this is too paranoid, and I might be oversensitive because
> > of the mistake I just made earlier, but it strikes me that "avg" is
> > actually
> > ambiguous, as it refers to a family of statistics, whereas "mean" is
> > specific.
> > I see other Kafka metrics with "avg", but none with "mean"; was that the
> > reason? If so, I'm +1.
> >
> > Regarding the names of the percentile, I actually couldn't find _any_ other
> > metrics that use percentile. Was there a reason to choose "99th" as opposed
> > to "p99" or any other scheme? This is not a criticism, I'm just primarily
> > asking
> > for consistency's sake.
> >
> > Thanks again,
> > -John
> >
> > On Wed, May 13, 2020, at 19:19, Sophie Blee-Goldman wrote:
> > > Alright, I can get behind adding the min metric for the sake of pretty
> > > graphs
> > > (and trivial computation).
> > >
> > > I'm still on the fence regarding the mean (or 50th percentile) but I can
> > see
> > > how users might expect it and find it a bit disorienting not to have. So
> > the
> > > updated proposed metrics are
> > >
> > >
> > >- record-staleness-max [ms]
> > >- record-staleness-99th [ms] *(99th percentile)*
> > >- record-staleness-75th [ms] *(75th percentile)*
> > >- record-staleness-avg [ms] *(mean)*
> > >- record-staleness-min [ms]
> > >
> > >
> > > On Wed, May 13, 2020 at 4:42 PM John Roesler 
> > wrote:
> > >
> > > > Oh boy, I never miss an opportunity to embarrass myself. I guess the
> > mean
> > > > seems more interesting to me than the median, but neither are as
> > > > interesting as the higher percentiles (99th and max).
> > > >
> > > > Min isn’t really important for any SLAs, but it does round out the
> > mental
> > > > picture of the distribution. I’ve always graphed min along with the
> > other
> > > > metrics to help me understand how fast the system can be, which helps
> > in
> > > > optimization decisions. It’s also a relatively inexpensive metric to
> > > > compute, so it might be nice to just throw it in.
> > > >
> > > > On Wed, May 13, 2020, at 18:18, Sophie Blee-Goldman wrote:
> > > > > G1:
> > > > > I was considering it as the "end-to-end latency *up* to the specific
> > > > task"
> > > > > but
> > > > > I'm happy with "record-staleness" if that drives the point home
> > better.
> > > > So
> > > > > it's the
> > > > > "staleness of the record when it is received by that task" -- will
> > update
> > > > > the KIP
> > > > >
> > > > > B1/J:
> > > > > I'm struggling to imagine a case where the min would actually be
> > useful,
> > > > > rather than
> > > > > just intellectually interesting. I don't feel strongly that we
> > shouldn't
> > > > > add it, but that's
> > > > > why I didn't include it from the start. Can you enlighten me with an
> > > > > example?
> > > > >
> > > > > I was also vaguely concerned about the overhead of adding multiple
> > > > > percentile
> > > > > metrics. Do we have any data to indicate what kind of performance
> > hit we
> > > > > take on
> > > > > metrics computation?
> > > > >
> > > > > Also, not to be too pedantic but the 50th percentile would be the
> > median
> > > > > not the
> > > > > mean. Would you propose to add the mean *and* the 50th percentile, or
> > > > just
> > > > > one
> > > > > of the two?
> > > > >
> > > > > Thanks all!
> > > > > Sophie
> > > > >
> > > > > On Wed, May 13, 2020 at 3:34 PM John Roesler 
> > > > wrote:
> > > > >
> > > > > > Hello all, and thanks for the KIP, Sophie,
> > > > > >
> > > > > > Just some comments on the discussion so far:
> > > > > >
> > > > > > B2/G1:
> > > > > > In principle, it shouldn't matter whether we report "spans" or

Re: [DISCUSS] KIP-572: Improve timeouts and retires in Kafka Streams

2020-05-14 Thread Bruno Cadonna
Hi Matthias,

Thank you for the KIP. I like your KIP.

Here my feedback:

1. The KIP is not clear about what should happen when task.timeout.ms
expires. To facilitate the mapping from the error users might
encounter due to timeouts to this KIP, it would be good to state the
error that will be thrown when task.timeout.ms expires.

2. The KIP does also not clearly state how task.timeout.ms is
measured. Does the time start with the first timeout exception and
then run until either the timeout expires or the task receives a
successful reply? Or is it started each time the task is processed by
the stream thread and stopped when its turn is over and when the sum
of the single times without a successful reply reaches the timeout an
error is thrown?

Best,
Bruno

On Tue, May 12, 2020 at 10:14 PM Matthias J. Sax  wrote:
>
> John, Guozhang,
>
> thanks a lot for your feedback. I updated the KIP on a slightly
> different angle: instead of using retries, we should switch to a timeout
> based approach. I also extended the KIP to deprecate producer/admin
> `retries` config.
>
> I like the proposal to skip a task if a client TimeoutException occurs
> and just retry it later; update the KIP accordingly. However, I would
> not retry forever by default. In general, all points you raised are
> valid and the question is just what _default_ do we want to have. Given
> the issue that tasks might get "out-of-sync" regarding their event-time
> progress and that inexperience users might not do proper monitoring, I
> prefer to have a "reasonable" default timeout if a task does not make
> progress at all and fail for this case.
>
> I would also argue (following Guozhang) that we don't necessarily need
> new metrics. Monitoring the number of alive threads (recently added),
> consumer lag, processing rate etc should give an operator enough insight
> into the application. I don't see the need atm to add some specify "task
> timeout" metrics.
>
> For the issue of cascading failures, I would want to exclude it from
> this KIP to keep it focused.
>
>
> -Matthias
>
>
> On 2/27/20 1:31 PM, Guozhang Wang wrote:
> > Hello John,
> >
> > I'll make note that you owe me a beer now :)
> >
> > I think I'm leaning towards your approach as well based on my observations
> > on previously reported timeout exceptions in the past. I once left some
> > thoughts on Matthias' PR here
> > https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510
> and I
> > think I can better summarize my thoughts in this thread:
> >
> > 1) First of all, we need to think from user's perspective, what they'd
> > really want to be notified:
> >
> > a. "If my application cannot make progress temporarily due to various
> > transient issues (John had listed several examples already), just handle
> > that internally and I do not wanna be notified and worry about how to tune
> > my timeout configs at all".
> > b. "If my application cannot make progress for a long time, which is
> likely
> > due to a bad config, a human error, network issues, etc such that I should
> > be involved in the loop of trouble shooting, let me know sooner than
> later".
> >
> > and what they'd not preferred but may happen today:
> >
> > c. "one transient error cause a thread to die, but then after tasks
> > migrated everything goes to normal; so the application silently lost a
> > thread without letting me know"; in fact, in such cases even a cascading
> > exception that eventually kills all thread may be better since at
> least the
> > users would be notified.
> >
> > Based on that, instead of retrying immediately at the granularity each
> > blocking call, it should be sufficient to only consider the handling logic
> > at the thread level. That is, within an iteration of the thread, it would
> > try to:
> >
> > * initialized some created tasks;
> > * restored some restoring tasks;
> > * processed some running tasks who have buffered records that are
> > processable;
> > * committed some tasks.
> >
> > In each of these steps, we may need to make some blocking calls in the
> > underlying embedded clients, and if either of them timed out, we would not
> > be able to make progress partially or not being able to make any progress
> > at all. If we still want to set a configured value of "retries", I think a
> > better idea would be to say "if we cannot make progress for consecutive N
> > iterations of a thread, the user should be notified".
> >
> > ---
> >
> > 2) Second, let's consider what's a good way to notify the user. Today our
> > way of notification is simple: throw the exception all the way up to
> user's
> > uncaught-exception-handler (if it's registered) and let the thread
> die. I'm
> > wondering if we could instead educate users to watch on some key metrics
> > for "progress indicate" than relying on the exception handler though. Some
> > candidates in mind:
> >
> > * consumer-lag: this is for both source topics and for repartition topics,
> > it indicates if one or more of t

[jira] [Created] (KAFKA-9992) EmbeddedKafka not working with kafka_2.13

2020-05-14 Thread Andras Katona (Jira)
Andras Katona created KAFKA-9992:


 Summary: EmbeddedKafka not working with kafka_2.13
 Key: KAFKA-9992
 URL: https://issues.apache.org/jira/browse/KAFKA-9992
 Project: Kafka
  Issue Type: Bug
  Components: packaging, streams
Affects Versions: 2.4.1
Reporter: Andras Katona


Kafka Streams artifact is depending on kafka_2.12 as of now, it is in the 
[kafka-streams-2.4.1.pom|https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams/2.4.1/kafka-streams-2.4.1.pom]:
{code}

  org.apache.kafka
  kafka_2.12
  2.4.1
  test
  
{code}
But it is not hardcoded, whatever scala version was used to compile this 
component before uploading, that will be present in the pom.

When I'm using these deps:
{code}

  org.apache.kafka
  kafka-streams
  2.4.1
  test
  test



  org.apache.kafka
  kafka_2.13
  2.4.1
  test
  test

{code}

My test fails with the following exception (deleteTopicAndWait is called in my 
@After method):
{noformat}
java.lang.NoSuchMethodError: 
scala.collection.JavaConverters.setAsJavaSetConverter(Lscala/collection/Set;)Lscala/collection/convert/Decorators$AsJava;
at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster$TopicsDeletedCondition.conditionMet(EmbeddedKafkaCluster.java:316)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:370)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:356)
at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:266)
at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicAndWait(EmbeddedKafkaCluster.java:221)
{noformat}

I modified kafka build locally to separate artifacts based on scala version 
just like it is done with kafka core, and I pulled in kafka-streams_2.13 from 
my local mvn repo and test was working again.

I was only trying with 2.4.1, but I'm assuming other versions are also 
affected, please add the proper versions and proper components too (in case 
it's not packaging).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-585: Conditional SMT

2020-05-14 Thread Tom Bentley
Hi Konstantine,

Thanks very much for your input. It's helpful to have a steer from a
committer on the syntax point especially. I've made all those changes and
added a note about compatibility.

Cheers,

Tom

On Thu, May 14, 2020 at 6:00 AM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi Tom.
>
> Thanks for the KIP. I like how the proposal has ended up to be and I think
> it describes a practical approach.
>
> I have to say that, for a moment, earlier in the discussion I thought we
> were leaning a bit towards an unconventional mini assembly language based
> on java properties.
> The reliance on java properties for SMT definition and configuration has
> drawn some criticism in the past, so keeping the model lean and appealing
> as well as expressive is worth the effort in my opinion.
>
> With that in mind, I feel that we could move forward without the question
> mark notation. That approach would be similar to what was introduced when
> SMTs were implemented by adding the `transforms` prefix. We'll have a top
> level property called `predicates` and a namespace starting with the
> `predicates.` prefix followed by aliases. Avoiding the '?' in the
> transforms seems also ok to me in order to keep things simple and
> intuitive.
>
> Besides these suggestions, I have a few additional minor comments:
>
> 1. Should we extend the `AutoCloseable` interface instead of `Closeable`
> since we ignore the Exception in the signature anyways? I know this will
> make Predicate different from Transformation but it also corresponds better
> to the intended use.
> 2. As Andrew mentioned, Predicate needs to be declared an interface in the
> KIP.
> 3. Currently examples are neither java properties nor json. Should we
> comply with one of the two formats to avoid confusing users that will read
> the KIP before trying this feature?
>
> Thanks again for driving this proposal.
>
> Konstantine
>
> On Mon, May 11, 2020 at 7:57 AM Tom Bentley  wrote:
>
> > Hi Andrew,
> >
> > That works nicely enough for the proposal where the predicate is
> configured
> > directly on the transformation. But I thought there was more consensus
> > around the proposal to have the transformation configuration refer to a
> > predicate indirectly, defined with the ?predicates key. I guess an
> example
> > would look like this:
> >
> > transforms: t2
> > transforms.t2?predicate: has-prefix
> > transforms.t2.type: org.apache.kafka.connect.transforms.ExtractField$Key
> > transforms.t2.field: c1
> > ?predicates: has-prefix
> > ?predicates.has-prefix.type:
> > org.apache.kafka.connect.transforms.predicates.TopicNameMatch
> > ?predicates.has-prefix.negate: true
> > ?predicates.has-prefix.pattern: my-prefix-.*
> >
> > I agree that this seems to reduce the chance of conflict to practically
> > nothing. I'm happy enough to go with this and I've updated the KIP
> > accordingly.
> >
> > Assuming no one raises more concerns I'll start a vote soon.
> >
> > Kind regards,
> >
> > Tom
> >
> > On Mon, May 11, 2020 at 10:54 AM Andrew Schofield <
> > andrew_schofi...@live.com>
> > wrote:
> >
> > > Hi,
> > > I have implemented some of this and configured some predicates and I
> > > prefer this slight modification of the ? syntax:
> > >
> > > transforms: t2
> > > transforms.t2?type:
> > > org.apache.kafka.connect.transforms.predicates.TopicNameMatch
> > > transforms.t2?negate: true
> > > transforms.t2?pattern: my-prefix-.*
> > > transforms.t2.type:
> org.apache.kafka.connect.transforms.ExtractField$Key
> > > transforms.t2.field: c1
> > >
> > > So, "transforms.." introduces the config for the transformation,
> > > while "transforms.?" introduces the
> > > config for the predicate. The risk of a clash now is that someone has
> > used
> > > a ? in the alias name, which is unlikely.
> > >
> > > Also, there's a slight typo in the Java definition of the Predicate
> > > interface. It should be:
> > >
> > > public interface Predicate> extends
> > > Configurable, Closeable
> > >
> > > Looks like discussion has died down so I wonder whether it's time to
> > begin
> > > a vote.
> > >
> > > Cheers,
> > > Andrew
> > >
> > > On 06/05/2020, 08:57, "Andrew Schofield" 
> > > wrote:
> > >
> > > >Hi Tom,
> > > >I think that either proposal is sufficiently readable and they
> both
> > > have elements of
> > > >cryptic syntax. I remember talking to an engineering director once
> > > who said he didn't hire
> > > >"curly braces people" any more. This is an interface for curly
> > braces
> > > people whichever
> > > >way we go here.
> > >
> > > >I slightly prefer the predicates as a top-level concept rather
> than
> > > as a feature
> > > >of the If transform. I also don't like the namespacing behaviour
> of
> > > the If transform which
> > > >I guess works like variable declaration so each scope has its own
> > > namespace of transform
> > > >aliases. I expect the following is valid but ill-advised.
> > >
> > > >transforms: t1
> > > >  

Re: [DISCUSS] KIP-601: Configurable socket connection timeout

2020-05-14 Thread Rajini Sivaram
Hi Cheng,

1) Thanks for the update,  the KIP now says `
socket.connections.setup.timeout.ms*`*, which sounds good.

2) Not sure 10s is a good default because it unnecessarily times out
connections, only to attempt re-connecting to the same broker (except in
the leastLoadedNode case where it would be useful to have a lower timeout).
Couldn't we just use 30s (request timeout) as the default value, retaining
the current behaviour? Or alternatively, only apply the timeout where it
makes sense to have a lower timeout (i.e. a timeout for leastLoadedNode)?

Regards,

Rajini

On Thu, May 14, 2020 at 5:00 AM Cheng Tan  wrote:

> Hi Rajini,
>
> Thanks for the comments.
>
> > I think
> > they started off as connection timeouts but now include authentication
> time
> > as well. Have we considered using similar configs for this case?
>
>
> The new config I proposed is focusing on the connections to unreachable
> servers. The timeout count won’t involved the authentication. I think
> “socket.” prefix make sense. I’ll change it. Colin mentioned that adding
> the key word “setup” can help people understand that this config only
> applies to the connection setup. What about “socket.connection.setup.ms”?
>
> > The KIP proposes 10s as the default. What does this mean for typical
> > connections like a produce request going to the leader?
>
> The new timeout config applies to each connection. It’s at the
> NetworkClient level and won’t consider the underlying connection logic.
> Specifically, by default, every connection will have 10 seconds to become
> “connected” from “connecting”, which implies the corresponding socket
> channel is now connected (SocketChanel.finishConnect() returns True), no
> matter what the request logic and abstraction is.
>
> Please let me know if these make sense to you or if you have more
> thoughts. Thank you.
>
> Best, -Cheng
>
> > 在 2020年5月13日,上午7:11,Rajini Sivaram  写道:
> >
> > Hi Cheng,
> >
> > Thanks for the KIP, sounds like a good improvement. A couple of comments:
> >
> > 1) We currently have client connection timeouts on the broker with
> configs
> > named `xxx.socket.timeout.ms` (e.g. controller.socket.timeout.ms). I
> think
> > they started off as connection timeouts but now include authentication
> time
> > as well. Have we considered using similar configs for this case? We may
> > want to prefix the new config with `socket.` anyway - something along the
> > lines of `socket.connection.timeout.ms` if it is just the connection
> time.
> >
> > 2) The KIP proposes 10s as the default. What does this mean for typical
> > connections like a produce request going to the leader? Instead of one
> > connection attempt to the leader, we want three separate connection
> > attempts within the request timeout to the leader?
> >
> > Regards,
> >
> > Rajini
> >
> >
> >> On Thu, May 7, 2020 at 11:51 PM Jose Garcia Sancio <
> jsan...@confluent.io>
> >> wrote:
> >> Cheng,
> >> Thanks for the KIP and the detailed proposal section. LGTM!
>  On Thu, May 7, 2020 at 3:38 PM Cheng Tan  wrote:
>  I think more about the potential wider use cases, I modified the
> >> proposal to target all the connection. Thanks.
> >>> - Best, - Cheng Tan
>  On May 7, 2020, at 1:41 AM, Cheng Tan  wrote:
>  Hi Colin,
>  Sorry for the confusion. I’m proposing to implement timeout in the
> >> NetworkClient.leastLoadedNode() when iterating all the cached node. The
> >> alternative I can think is to implement the timeout in
> NetworkClient.poll()
>  I’d prefer to implement in the leastLoadedNode(). Here’re the reasons:
>  Usually when clients send a request, they will asking the network
> >> client to send the request to a specific node. In this case, the
> >> connection.setup.timeout won’t matter too much because the client
> doesn’t
> >> want to try other nodes for that specific request. The request level
> >> timeout would be enough. The metadata fetcher fetches the nodes status
> >> periodically so the clients can reassign the request to another node
> after
> >> timeout.
>  Consumer, producer, and AdminClient are all using leastLoadedNode()
> >> for metadata fetch, where the connection setup timeout can play an
> >> important role. Unlike other requests can refer to the metadata for node
> >> condition, the metadata requests can only blindly choose a node for
> retry
> >> in the worst scenario. We want to make sure the client can get the
> metadata
> >> smoothly and as soon as possible. As a result, we need this
> >> connection.setup.timeout.
>  Implementing the timeout in poll() or anywhere else might need an
> >> extra iteration of all nodes, which might downgrade the network client
> >> performance.
>  I also updated the KIP content and KIP status. Please let me know if
> >> the above ideas make sense. Thanks.
>  Best, - Cheng Tan
> > On May 4, 2020, at 5:26 PM, Colin McCabe   >> cmcc...@apache.org>> wrote:
> > Hi Cheng,
> > On the KIP page, it lists this KIP as "draft."  

Build failed in Jenkins: kafka-trunk-jdk11 #1454

2020-05-14 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-6145: KIP-441: Improve assignment balance (#8588)


--
Started by an SCM change
Running as SYSTEM
[EnvInject] - Loading node environment variables.
Building remotely on H30 (ubuntu) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress -- https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress -- https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision d62f6ebdfe38adf894187e76546eedf13ee98432 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f d62f6ebdfe38adf894187e76546eedf13ee98432
Commit message: "KAFKA-6145: KIP-441: Improve assignment balance (#8588)"
 > git rev-list --no-walk fad8db67bb74d1ebffdde22fd4e8ecfd1881 # timeout=10
ERROR: No tool found matching GRADLE_4_10_2_HOME
Setting GRADLE_4_10_3_HOME=/home/jenkins/tools/gradle/4.10.3
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins6180996940548855431.sh
+ rm -rf 
ERROR: No tool found matching GRADLE_4_10_2_HOME
Setting GRADLE_4_10_3_HOME=/home/jenkins/tools/gradle/4.10.3
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins1804903382464549630.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew --no-daemon --continue -PmaxParallelForks=2 
-PtestLoggingEvents=started,passed,skipped,failed -PxmlFindBugsReport=true 
clean test -PscalaVersion=2.12
[0.074s][warning][os,thread] Failed to start thread - pthread_create failed 
(EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
Error occurred during initialization of VM
java.lang.OutOfMemoryError: unable to create native thread: possibly out of 
memory or process/resource limits reached
Build step 'Execute shell' marked build as failure
FATAL: Remote call on H30 failed
Also:   hudson.remoting.Channel$CallSiteStackTrace: Remote call to H30
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1743)
at 
hudson.remoting.UserRequest$ExceptionResponse.retrieve(UserRequest.java:357)
at hudson.remoting.Channel.call(Channel.java:957)
at hudson.Launcher$RemoteLauncher.kill(Launcher.java:1086)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:510)
at hudson.model.Run.execute(Run.java:1815)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
java.lang.NoClassDefFoundError: Could not initialize class 
hudson.slaves.SlaveComputer
at hudson.util.ProcessTree.get(ProcessTree.java:414)
at hudson.Launcher$RemoteLauncher$KillTask.call(Launcher.java:1103)
at hudson.Launcher$RemoteLauncher$KillTask.call(Launcher.java:1094)
at hudson.remoting.UserRequest.perform(UserRequest.java:212)
at hudson.remoting.UserRequest.perform(UserRequest.java:54)
at hudson.remoting.Request$2.run(Request.java:369)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused: java.io.IOException: Remote call on H30 failed
at hudson.remoting.Channel.call(Channel.java:963)
at hudson.Launcher$RemoteLauncher.kill(Launcher.java:1086)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:510)
at hudson.model.Run.execute(Run.java:1815)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceC

KIP Request

2020-05-14 Thread David Mollitor
Hello,

I would like to create a KIP in regards to:

https://github.com/apache/kafka/pull/7944

Can you please add my Apache ID (dmollitor) to the access list?

Thanks.
-- 

*David A. Mollitor* | Principal Customer Operations Engineer

cloudera.com 

[image: Cloudera]

[image: Cloudera on Twitter]

[image: Cloudera on Facebook]

[image: Cloudera on LinkedIn]

--


Kafka Integration with Microsoft Dynamics 365 CE

2020-05-14 Thread Ranjalkar, Vrushali
Hi team,

We are looking for the SCADA integration with Dynamics 356 CE w Kafka. We 
already have SCADA and Kafka integration in place.
Just wanted to understand if there are readymade connector available for Kafka 
and Dynamics 365 integration.
Any support is appreciated.

Regards
Vrushali




This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Where allowed by local law, electronic 
communications with Accenture and its affiliates, including e-mail and instant 
messaging (including content), may be scanned by our systems for the purposes 
of information security and assessment of internal compliance with Accenture 
policy. Your privacy is important to us. Accenture uses your personal data only 
in compliance with data protection laws. For further information on how 
Accenture processes your personal data, please see our privacy statement at 
https://www.accenture.com/us-en/privacy-policy.
__

www.accenture.com


Re: KIP Request

2020-05-14 Thread Bill Bejeck
Hi David,

Thanks for your interest.  You're all set now.

-Bill

On Thu, May 14, 2020 at 10:43 AM David Mollitor
 wrote:

> Hello,
>
> I would like to create a KIP in regards to:
>
> https://github.com/apache/kafka/pull/7944
>
> Can you please add my Apache ID (dmollitor) to the access list?
>
> Thanks.
> --
>
> *David A. Mollitor* | Principal Customer Operations Engineer
>
> cloudera.com 
>
> [image: Cloudera]
>
> [image: Cloudera on Twitter]
>
> [image: Cloudera on Facebook]
>
> [image: Cloudera on LinkedIn]
>
> --
>


Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-14 Thread David Jacot
Hi Anna,

Thanks for your answers and the updated KIP. Looks good to me!

Best,
David

On Thu, May 14, 2020 at 12:54 AM Anna Povzner  wrote:

> I updated the KIP to add a new broker configuration to limit connection
> creation rate per IP: max.connection.creation.rate.per.ip. Once the limit
> is reached for a particular IP address, the broker will reject the
> connection from that IP (close the connection it accepted) and continue
> rejecting them until the rate is back within the rate limit.
>
> On Wed, May 13, 2020 at 11:46 AM Anna Povzner  wrote:
>
> > Hi David and Alexandre,
> >
> > Thanks so much for your feedback! Here are my answers:
> >
> > 1. Yes, we have seen several cases of clients that create a new
> connection
> > per produce/consume request. One hypothesis is someone who is used to
> > connection pooling may accidentally write a Kafka client that creates a
> new
> > connection every time.
> >
> > 2 & 4. That's a good point I haven't considered. I think it makes sense
> to
> > provide an ability to limit connection creations per IP as well. This is
> > not hard to implement -- the broker already keeps track of the number of
> > connections per IP, and immediately closes a new connection if it comes
> > from an IP that reached the connection limit. So, we could additionally
> > track the rate, and close the connection from IP that exceeds the rate.
> One
> > slight concern is whether keeping track of per IP rates and quotas adds
> > overhead (CPU and memory). But perhaps it is not a problem if we use
> > expiring sensors.
> >
> > It would still make sense to limit the overall connection creation rate
> > for the Kafka clusters which are shared among many different
> > applications/clients, since they may spike at the same time bringing the
> > total rate too high.
> >
> > 3. Controlling connection queue sizes only controls the share of time
> > network threads use for creating new connections (and accepting on
> Acceptor
> > thread) vs. doing other work on each Processor iteration. It does not
> > directly control how processing connection creations would be related to
> > other processing done by brokers like on request handler threads. So,
> while
> > controlling queue size may mitigate the issue for some of the workloads,
> it
> > does not guarantee that. Plus, if we want to limit how many connections
> are
> > created per IP, the queue size approach would not work, unless we go
> with a
> > "share" of the queue, which I think even further obscures what that
> setting
> > means (and what we would achieve as an end result). Does this answer the
> > question?
> >
> > If there are no objections, I will update the KIP to add per IP
> connection
> > rate limits (config and enforcement).
> >
> > Thanks,
> >
> > Anna
> >
> >
> > On Tue, May 12, 2020 at 11:25 AM Alexandre Dupriez <
> > alexandre.dupr...@gmail.com> wrote:
> >
> >> Hello,
> >>
> >> Thank you for the KIP.
> >>
> >> I experienced in the past genuine broker brownouts due to connection
> >> storms consuming most of the CPU available on the server and I think
> >> it is useful to protect against it.
> >>
> >> I tend to share the questions asked in points 2 and 4 from David. Is
> >> there still a risk of denial of service if the limit applies at the
> >> listener-level without differentiating between (an) “offending”
> >> client(s) and the others?
> >>
> >> To rebound on point 3 - conceptually one difference between capping
> >> the queue size or throttling as presented in the KIP would come from
> >> the time it takes to accept a connection and how that time evolves
> >> with the connection rate.
> >> Assuming that that time increases monotonically with resource
> >> utilization, the admissible rate of connections would decrease as the
> >> server becomes more loaded, if the limit was set on queue size.
> >>
> >> Thanks,
> >> Alexandre
> >>
> >> Le mar. 12 mai 2020 à 08:49, David Jacot  a écrit
> :
> >> >
> >> > Hi Anna,
> >> >
> >> > Thanks for the KIP! I have few questions:
> >> >
> >> > 1. You mention that some clients may create a new connections for each
> >> > requests: "Another example is clients that create a new connection for
> >> each
> >> > produce/consume request". I am curious here but do we know any clients
> >> > behaving like this?
> >> >
> >> > 2. I am a bit concerned by the impact of misbehaving clients on the
> >> other
> >> > ones. Let's say that we define a quota of 10 connections / sec for a
> >> broker
> >> > and that we have a misbehaving application constantly trying to create
> >> 20
> >> > connections on that broker. That application will constantly hit the
> >> quota
> >> > and
> >> > always have many pending connections in the queue waiting to be
> >> accepted.
> >> > Regular clients trying to connect would need to wait until all the
> >> pending
> >> > connections upfront in the queue are drained in the best case scenario
> >> or
> >> > won't be able to connect at all in the worst case scenario if the
> queu

Re: [VOTE] KIP-577: Allow HTTP Response Headers Configured for Kafka Connect

2020-05-14 Thread Jeff Huang
All,
Vote for this KIP has been passed. 
This KIP is accepted with 3 +1 binding votes (Randall, Manikumar, Konstantine) 
and 1 +1 non-binding vote (Aneel)

Thanks to all who made suggestion and ideas.

Jeff Huang.


On 2020/04/17 18:28:20, Zhiguo Huang  wrote: 
> Thanks to everyone for their input. I've incorporated the changes, and I
> think this is ready for voting.
> 
> To summarize, the KIP simply proposes to add a feature which allows HTTP
> response headers configured for Kafka Connect.The KIP can be found here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP+577%3A+Allow+HTTP+Response+Headers+to+be+Configured+for+Kafka+Connect
> 
> Thanks!
> 
> Jeff.
> 


Re: [DISCUSS] KIP-572: Improve timeouts and retires in Kafka Streams

2020-05-14 Thread John Roesler
Thanks for the update, Matthias!

Other than Bruno’s good points, this proposal looks good to me. 

Thanks,
John

On Thu, May 14, 2020, at 07:17, Bruno Cadonna wrote:
> Hi Matthias,
> 
> Thank you for the KIP. I like your KIP.
> 
> Here my feedback:
> 
> 1. The KIP is not clear about what should happen when task.timeout.ms
> expires. To facilitate the mapping from the error users might
> encounter due to timeouts to this KIP, it would be good to state the
> error that will be thrown when task.timeout.ms expires.
> 
> 2. The KIP does also not clearly state how task.timeout.ms is
> measured. Does the time start with the first timeout exception and
> then run until either the timeout expires or the task receives a
> successful reply? Or is it started each time the task is processed by
> the stream thread and stopped when its turn is over and when the sum
> of the single times without a successful reply reaches the timeout an
> error is thrown?
> 
> Best,
> Bruno
> 
> On Tue, May 12, 2020 at 10:14 PM Matthias J. Sax  wrote:
> >
> > John, Guozhang,
> >
> > thanks a lot for your feedback. I updated the KIP on a slightly
> > different angle: instead of using retries, we should switch to a timeout
> > based approach. I also extended the KIP to deprecate producer/admin
> > `retries` config.
> >
> > I like the proposal to skip a task if a client TimeoutException occurs
> > and just retry it later; update the KIP accordingly. However, I would
> > not retry forever by default. In general, all points you raised are
> > valid and the question is just what _default_ do we want to have. Given
> > the issue that tasks might get "out-of-sync" regarding their event-time
> > progress and that inexperience users might not do proper monitoring, I
> > prefer to have a "reasonable" default timeout if a task does not make
> > progress at all and fail for this case.
> >
> > I would also argue (following Guozhang) that we don't necessarily need
> > new metrics. Monitoring the number of alive threads (recently added),
> > consumer lag, processing rate etc should give an operator enough insight
> > into the application. I don't see the need atm to add some specify "task
> > timeout" metrics.
> >
> > For the issue of cascading failures, I would want to exclude it from
> > this KIP to keep it focused.
> >
> >
> > -Matthias
> >
> >
> > On 2/27/20 1:31 PM, Guozhang Wang wrote:
> > > Hello John,
> > >
> > > I'll make note that you owe me a beer now :)
> > >
> > > I think I'm leaning towards your approach as well based on my observations
> > > on previously reported timeout exceptions in the past. I once left some
> > > thoughts on Matthias' PR here
> > > https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510
> > and I
> > > think I can better summarize my thoughts in this thread:
> > >
> > > 1) First of all, we need to think from user's perspective, what they'd
> > > really want to be notified:
> > >
> > > a. "If my application cannot make progress temporarily due to various
> > > transient issues (John had listed several examples already), just handle
> > > that internally and I do not wanna be notified and worry about how to tune
> > > my timeout configs at all".
> > > b. "If my application cannot make progress for a long time, which is
> > likely
> > > due to a bad config, a human error, network issues, etc such that I should
> > > be involved in the loop of trouble shooting, let me know sooner than
> > later".
> > >
> > > and what they'd not preferred but may happen today:
> > >
> > > c. "one transient error cause a thread to die, but then after tasks
> > > migrated everything goes to normal; so the application silently lost a
> > > thread without letting me know"; in fact, in such cases even a cascading
> > > exception that eventually kills all thread may be better since at
> > least the
> > > users would be notified.
> > >
> > > Based on that, instead of retrying immediately at the granularity each
> > > blocking call, it should be sufficient to only consider the handling logic
> > > at the thread level. That is, within an iteration of the thread, it would
> > > try to:
> > >
> > > * initialized some created tasks;
> > > * restored some restoring tasks;
> > > * processed some running tasks who have buffered records that are
> > > processable;
> > > * committed some tasks.
> > >
> > > In each of these steps, we may need to make some blocking calls in the
> > > underlying embedded clients, and if either of them timed out, we would not
> > > be able to make progress partially or not being able to make any progress
> > > at all. If we still want to set a configured value of "retries", I think a
> > > better idea would be to say "if we cannot make progress for consecutive N
> > > iterations of a thread, the user should be notified".
> > >
> > > ---
> > >
> > > 2) Second, let's consider what's a good way to notify the user. Today our
> > > way of notification is simple: throw the exception all the way up to
> > u

[jira] [Created] (KAFKA-9993) Think about inheritance in the protocol generation framework

2020-05-14 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9993:
--

 Summary: Think about inheritance in the protocol generation 
framework
 Key: KAFKA-9993
 URL: https://issues.apache.org/jira/browse/KAFKA-9993
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen


We have seen that there are a lot of common fields inside the request/response 
templates that could be extracted as a super class for auto generated classes. 
For example most response contains a top level error code. Currently to build a 
service receiving multiple RPCs, the code template produces a lot of redundant 
error code extraction logic which is far from ideal. 

What we want to discuss is whether to enable the general inheritance mechanism 
in this framework, what's the trade-off and complexity increase, and if there 
is any workaround just to make less boiler templates.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9994) Catch TaskMigrated exception in task corruption code path

2020-05-14 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9994:
--

 Summary: Catch TaskMigrated exception in task corruption code path 
 Key: KAFKA-9994
 URL: https://issues.apache.org/jira/browse/KAFKA-9994
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Boyang Chen


We have seen a case where the TaskMigrated exception gets thrown from 
taskManager.commit(). This should be prevented by proper catching.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9995) IllegalStateException when offsets not found

2020-05-14 Thread James Hay (Jira)
James Hay created KAFKA-9995:


 Summary: IllegalStateException when offsets not found
 Key: KAFKA-9995
 URL: https://issues.apache.org/jira/browse/KAFKA-9995
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.1
Reporter: James Hay


 

I have a recently upgraded Kafka Streams 2.4.1 application and we have started 
seeing the application periodically crash due to the following error:
{code:java}
2020-05-14T16:53:03.839Z DEBUG <> 
[chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] 
o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2-consumer, 
groupId=chat] Fetching committed offsets for partitions: 
[private.chat.endpoint-0, public.chat.message-0] 2020-05-14T16:53:03.841Z INFO  
<> [chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] 
o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2-consumer, 
groupId=chat] Found no committed offset for partition private.chat.endpoint-0 
2020-05-14T16:53:03.842Z ERROR <> 
[chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] 
o.a.k.s.p.internals.StreamThread - stream-thread 
[chat-c199e52b-9344-457c-a2ae-04a669a7e0fb-StreamThread-2] Encountered the 
following error during processing: java.lang.IllegalStateException: Offset 
limit should monotonically increase, but was reduced. New limit: 0. Previous 
limit: 857859at 
org.apache.kafka.streams.processor.internals.StandbyTask.updateOffsetLimits(StandbyTask.java:215)
at 
org.apache.kafka.streams.processor.internals.StandbyTask.update(StandbyTask.java:181)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1048)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:825)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
{code}
Is this a known issue? What can cause offsets to not be found?

Other things to note:
 * We have TOPOLOGY_OPTIMIZATION set to OPTIMIZE
 * We are only seeing this on two of our three environments (at the moment). 
The two environments we are seeing a problem have little traffic and only seems 
to impact the machines that are on the whole idle. Our prod environment which 
consumes regular events is showing no signs of having the same problem.
 * There is some evidence to suggest there is a pattern to the timing of this 
error. Although not always the case, 24hrs between errors is common.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-05-14 Thread Satish Duggana
Hi Jun,
Thanks for your comments.  We updated the KIP with more details.

>100. For each of the operations related to tiering, it would be useful to
provide a description on how it works with the new API. These include
things like consumer fetch, replica fetch, offsetForTimestamp, retention
(remote and local) by size, time and logStartOffset, topic deletion, etc.
This will tell us if the proposed APIs are sufficient.

We addressed most of these APIs in the KIP. We can add more details if
needed.

>101. For the default implementation based on internal topic, is it meant
as a proof of concept or for production usage? I assume that it's the
former. However, if it's the latter, then the KIP needs to describe the
design in more detail.

It is production usage as was mentioned in an earlier mail. We plan to
update this section in the next few days.

>102. When tiering a segment, the segment is first written to the object
store and then its metadata is written to RLMM using the api "void
putRemoteLogSegmentData()".
One potential issue with this approach is that if the system fails after
the first operation, it leaves a garbage in the object store that's never
reclaimed. One way to improve this is to have two separate APIs, sth like
preparePutRemoteLogSegmentData() and commitPutRemoteLogSegmentData().

That is a good point. We currently have a different way using markers in
the segment but your suggestion is much better.

>103. It seems that the transactional support and the ability to read from
follower are missing.

KIP is updated with transactional support, follower fetch semantics, and
reading from a follower.

>104. It would be useful to provide a testing plan for this KIP.

We added a few tests by introducing test util for tiered storage in the PR.
We will provide the testing plan in the next few days.

Thanks,
Satish.


On Wed, Feb 26, 2020 at 9:43 PM Harsha Chintalapani  wrote:

>
>
>
>
> On Tue, Feb 25, 2020 at 12:46 PM, Jun Rao  wrote:
>
>> Hi, Satish,
>>
>> Thanks for the updated doc. The new API seems to be an improvement
>> overall. A few more comments below.
>>
>> 100. For each of the operations related to tiering, it would be useful to
>> provide a description on how it works with the new API. These include
>> things like consumer fetch, replica fetch, offsetForTimestamp, retention
>> (remote and local) by size, time and logStartOffset, topic deletion, etc.
>> This will tell us if the proposed APIs are sufficient.
>>
>
> Thanks for the feedback Jun. We will add more details around this.
>
>
>> 101. For the default implementation based on internal topic, is it meant
>> as a proof of concept or for production usage? I assume that it's the
>> former. However, if it's the latter, then the KIP needs to describe the
>> design in more detail.
>>
>
> Yes it meant to be for production use.  Ideally it would be good to merge
> this in as the default implementation for metadata service. We can add more
> details around design and testing.
>
> 102. When tiering a segment, the segment is first written to the object
>> store and then its metadata is written to RLMM using the api "void
>> putRemoteLogSegmentData()".
>> One potential issue with this approach is that if the system fails after
>> the first operation, it leaves a garbage in the object store that's never
>> reclaimed. One way to improve this is to have two separate APIs, sth like
>> preparePutRemoteLogSegmentData() and commitPutRemoteLogSegmentData().
>>
>> 103. It seems that the transactional support and the ability to read from
>> follower are missing.
>>
>> 104. It would be useful to provide a testing plan for this KIP.
>>
>
> We are working on adding more details around transactional support and
> coming up with test plan.
> Add system tests and integration tests.
>
> Thanks,
>>
>> Jun
>>
>> On Mon, Feb 24, 2020 at 8:10 AM Satish Duggana 
>> wrote:
>>
>> Hi Jun,
>> Please look at the earlier reply and let us know your comments.
>>
>> Thanks,
>> Satish.
>>
>> On Wed, Feb 12, 2020 at 4:06 PM Satish Duggana 
>> wrote:
>>
>> Hi Jun,
>> Thanks for your comments on the separation of remote log metadata storage
>> and remote log storage.
>> We had a few discussions since early Jan on how to support eventually
>> consistent stores like S3 by uncoupling remote log segment metadata and
>> remote log storage. It is written with details in the doc here(1). Below is
>> the brief summary of the discussion from that doc.
>>
>> The current approach consists of pulling the remote log segment metadata
>> from remote log storage APIs. It worked fine for storages like HDFS. But
>> one of the problems of relying on the remote storage to maintain metadata
>> is that tiered-storage needs to be strongly consistent, with an impact not
>> only on the metadata(e.g. LIST in S3) but also on the segment data(e.g. GET
>> after a DELETE in S3). The cost of maintaining metadata in remote storage
>> needs to be factored in. This is true in the case of S3, LIST APIs incur
>> huge co

Jenkins build is back to normal : kafka-trunk-jdk8 #4528

2020-05-14 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-14 Thread Sagar
Hey @Adam,

Thanks for sharing your experience with using prefix seek. I did look at
your code for RocksDBPrefixIterator, infact I have repurposed that class
itself since it wasn't being used else where. Regarding how I plan to
expose them through-out the state stores, what I have tried to do is add it
as a separate interface. So, basically, it is not at the same level as the
*range function so to speak. The reason I did that is currently I feel not
all state stores are a natural fit for prefix seek. As I mentioned in the
KIP as well, the current equivalent to it could be BulkLoadingStore(not in
terms of functionality but in terms of how it is also not implemented by
all of them). So, that ways I am not needing to stub them across all the
state-stores and we can implement it only where needed. For example, in the
PR that I have put for reference in the KIP, you can see that I have it
implemented only for RocksDB.

@Guozhang,

Thanks for the feedback. Those are very interesting questions and I will
try my best to answer based upon whatever limited understanding I have
developed so far :)

1) Regarding the usage of useFixedLengthPrefixExtractor, honestly, I hadn't
looked at that config. I did look it up after you pointed it out and seems
it's more for hash-based memtables? I may be wrong though. But what I would
say is that, the changes I had made were not exactly from a correctness
stand point but more from trying to showcase how we can implement these
changes. The idea was that once we see the merit in this approach then we
can add some of the tunings( and I would need your team's assistance there
:D).

2) Regarding the similarity of `RocksDBPrefixIterator` and
`RocksDBRangeIterator`, yes the implementations look more or less similar.
So, in terms of performance, they might be similar. But semantically, they
can solve 2 different use-cases. The range seek is useful when we know both
from and to. But if we consider use-cases where we want to find keys with a
certain prefix, but we don't know if what it's start and end is, then
prefix seek would come in more handy. The point that I am trying to make is
that it can extend the scope of state stores from just point lookups to
somewhat being able to speculative queries where by users can search if a
certain pattern exists. I can vouch for this personally because I wanted to
use state stores for one such use case and since this option wasn't there,
I had to do some other things. An equivalent to this could be SCAN operator
in Redis. (Not trying to compare the Redis and state stores but trying to
give some context).

Regarding the point on bloom filter, I think there are certain
optimisations that are being talked about in case of prefix seek here:

https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key
Again
this isn't something that I have explored fully. Also, on the prefix seek
page on RocksDB they mention that there's a prefix iterating technique
called Prefix Bloom Filter.

3) Regarding the question on length of bytes for seek v/s prefix seek, I am
not entirely sure about that scenario. What I have understood is that
at-least for Rocks DB, it is more performant for short iterator queries
that longer ones.

4) Regarding the last question on placing it within Segment, the reason I
didn't do that way, is that I thought we shouldn't tie this feature only to
RocksDB. I agree that I got this idea while looking/reading about RocksDB
but if we keep it outside the purview of RocksDB and keep it as a pluggable
entity, then a) it remains generic by not being tied to any specific store
and b) no change is needed at all for any of the other stores which haven't
implemented it.

I am not sure of any of the above points make sense but as I said, this is
based out of my limited understanding of the codebase. So, pardon any
incorrect/illogical statements plz!

@Sophie,

Thanks for bringing that point up! I have mentioned about that PR in the
KIP under a section called Other considerations. Nonetheless, thanks for
pointing it out!

Thanks!
Sagar.


On Thu, May 14, 2020 at 5:17 AM Sophie Blee-Goldman 
wrote:

> Not to derail this KIP discussion, but to leave a few notes on some of the
> RocksDB points that have come up:
>
> Someone actually merged some long overdue performance improvements to
> the RocksJava implementation (the PR was opened back in 2017! yikes).
> I haven't looked into the prefix seek API closely enough to know how
> relevant
> this particular change is, and they are still improving things, but it
> gives me some
> faith.
>
> There are some pretty promising results reported on the PR:
> https://github.com/facebook/rocksdb/pull/2283#issuecomment-561563037
>
> Regarding the custom comparator, they also recently merged this performance
> 
> improvement . The tl;dr is
> they reduced the slowdown of a custom comparator in Java
> (relative to the

[jira] [Created] (KAFKA-9996) upgrade zookeeper to 3.5.8 to address security vulnerabilities

2020-05-14 Thread Emanuele Maccherani (Jira)
Emanuele Maccherani created KAFKA-9996:
--

 Summary: upgrade zookeeper to 3.5.8 to address security 
vulnerabilities
 Key: KAFKA-9996
 URL: https://issues.apache.org/jira/browse/KAFKA-9996
 Project: Kafka
  Issue Type: Bug
  Components: packaging
Affects Versions: 2.5.0
Reporter: Emanuele Maccherani


Kafka is now using zookeeper 3.5.7, which is affected by CVE-2020-8840 and 
CVE-2020-9488. Those 2 are resolved in 3.5.8.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-14 Thread Sophie Blee-Goldman
Hey all,

After discussing with Bruno I'd like to propose a small amendment,
which is to record the processor-node-level metrics only for *stateful*
*operators*. They would still be considered a "processor-node-level"
metric and not a "state-store-level" metric as the staleness is still
a property of the node rather than of the state itself. However, it seems
that this information is primarily useful for stateful operators that might
be exposing state via IQ or otherwise dependent on the record time
unlike a stateless operator.

It's worth calling out that recent performance improvements to the metrics
framework mean that we no longer fetch the system time at the operator
level, but only once per task. In other words the system time is not updated
between each process as a record flows through the subtopology, so
debugging the processor-level latency via the stateleness will not be
possible.Note that this doesn't mean the operator-level metrics are not
*useful* relative to the task-level metric. Upstream caching and/or
suppression
can still cause a record's staleness at some downstream stateful operator
to deviate from the task-level staleness (recorded at the source node).

Please let me know if you have any concerns about this change. The
KIP has been updated with the new proposal

On Thu, May 14, 2020 at 3:04 AM Bruno Cadonna  wrote:

> Hi Sophie,
>
> Thank you for the KIP.
>
> The KIP looks good to me.
>
> 50th percentile:
> I think we do not need it now. If we need it, we can add it. Here the
> old truism applies: Adding is always easier than removing.
>
> processor-node-level metrics:
> I think it is good to have the staleness metrics also on
> processor-node-level. If we do not want to record them on all
> processor nodes, you could restrict the recording to stateful
> processor-nodes, since those are the ones that would benefit most from
> the staleness metrics.
>
> Best,
> Bruno
>
> On Thu, May 14, 2020 at 4:15 AM Sophie Blee-Goldman 
> wrote:
> >
> > Yeah, the specific reason was just to align with the current metrics.
> >
> > Is it better to conform than to be right? History has a lot to say on
> that
> > matter
> > but I'm not sure how much of it applies to the fine details of metrics
> > naming :P
> >
> > More seriously, I figured if people are looking at this metric they're
> > likely to
> > be looking at all the others. Then naming this one "-mean" would probably
> > lead some to conclude that the "-avg" suffix in the other metrics has a
> > different meaning.
> >
> > As for the percentiles, I actually like p99 (and p75) better. I'll swap
> > that out
> >
> > On Wed, May 13, 2020 at 7:07 PM John Roesler 
> wrote:
> >
> > > Thanks Sophie,
> > >
> > > I hope this isn't too nit-picky, but is there a reason to choose "avg"
> > > instead
> > > of "mean"? Maybe this is too paranoid, and I might be oversensitive
> because
> > > of the mistake I just made earlier, but it strikes me that "avg" is
> > > actually
> > > ambiguous, as it refers to a family of statistics, whereas "mean" is
> > > specific.
> > > I see other Kafka metrics with "avg", but none with "mean"; was that
> the
> > > reason? If so, I'm +1.
> > >
> > > Regarding the names of the percentile, I actually couldn't find _any_
> other
> > > metrics that use percentile. Was there a reason to choose "99th" as
> opposed
> > > to "p99" or any other scheme? This is not a criticism, I'm just
> primarily
> > > asking
> > > for consistency's sake.
> > >
> > > Thanks again,
> > > -John
> > >
> > > On Wed, May 13, 2020, at 19:19, Sophie Blee-Goldman wrote:
> > > > Alright, I can get behind adding the min metric for the sake of
> pretty
> > > > graphs
> > > > (and trivial computation).
> > > >
> > > > I'm still on the fence regarding the mean (or 50th percentile) but I
> can
> > > see
> > > > how users might expect it and find it a bit disorienting not to
> have. So
> > > the
> > > > updated proposed metrics are
> > > >
> > > >
> > > >- record-staleness-max [ms]
> > > >- record-staleness-99th [ms] *(99th percentile)*
> > > >- record-staleness-75th [ms] *(75th percentile)*
> > > >- record-staleness-avg [ms] *(mean)*
> > > >- record-staleness-min [ms]
> > > >
> > > >
> > > > On Wed, May 13, 2020 at 4:42 PM John Roesler 
> > > wrote:
> > > >
> > > > > Oh boy, I never miss an opportunity to embarrass myself. I guess
> the
> > > mean
> > > > > seems more interesting to me than the median, but neither are as
> > > > > interesting as the higher percentiles (99th and max).
> > > > >
> > > > > Min isn’t really important for any SLAs, but it does round out the
> > > mental
> > > > > picture of the distribution. I’ve always graphed min along with the
> > > other
> > > > > metrics to help me understand how fast the system can be, which
> helps
> > > in
> > > > > optimization decisions. It’s also a relatively inexpensive metric
> to
> > > > > compute, so it might be nice to just throw it in.
> > > > >
> > > > > On Wed, May 13, 2020, at 18:18,

[jira] [Created] (KAFKA-9997) upgrade log4j lib to address CVE-2020-9488

2020-05-14 Thread Emanuele Maccherani (Jira)
Emanuele Maccherani created KAFKA-9997:
--

 Summary: upgrade log4j lib to address CVE-2020-9488
 Key: KAFKA-9997
 URL: https://issues.apache.org/jira/browse/KAFKA-9997
 Project: Kafka
  Issue Type: Bug
  Components: packaging
Affects Versions: 2.5.0
Reporter: Emanuele Maccherani


Kafka latest version is using log4j 1.2.17, which is affected by CVE-2020-9488. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk11 #1455

2020-05-14 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9650: include human readable units in ms and bytes configs 
(#8222)


--
Started by an SCM change
Running as SYSTEM
[EnvInject] - Loading node environment variables.
Building remotely on H30 (ubuntu) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress -- https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress -- https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision 873e9446ef8426061e2f1b6cd21815b270e27f03 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 873e9446ef8426061e2f1b6cd21815b270e27f03
Commit message: "KAFKA-9650: include human readable units in ms and bytes 
configs (#8222)"
 > git rev-list --no-walk d62f6ebdfe38adf894187e76546eedf13ee98432 # timeout=10
ERROR: No tool found matching GRADLE_4_10_2_HOME
Setting GRADLE_4_10_3_HOME=/home/jenkins/tools/gradle/4.10.3
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins2876915775957114976.sh
+ rm -rf 
ERROR: No tool found matching GRADLE_4_10_2_HOME
Setting GRADLE_4_10_3_HOME=/home/jenkins/tools/gradle/4.10.3
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins8054929188848912288.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew --no-daemon --continue -PmaxParallelForks=2 
-PtestLoggingEvents=started,passed,skipped,failed -PxmlFindBugsReport=true 
clean test -PscalaVersion=2.12
[0.068s][warning][os,thread] Failed to start thread - pthread_create failed 
(EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
Error occurred during initialization of VM
java.lang.OutOfMemoryError: unable to create native thread: possibly out of 
memory or process/resource limits reached
Build step 'Execute shell' marked build as failure
FATAL: Remote call on H30 failed
Also:   hudson.remoting.Channel$CallSiteStackTrace: Remote call to H30
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1743)
at 
hudson.remoting.UserRequest$ExceptionResponse.retrieve(UserRequest.java:357)
at hudson.remoting.Channel.call(Channel.java:957)
at hudson.Launcher$RemoteLauncher.kill(Launcher.java:1086)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:510)
at hudson.model.Run.execute(Run.java:1815)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
java.lang.NoClassDefFoundError: Could not initialize class 
hudson.slaves.SlaveComputer
at hudson.util.ProcessTree.get(ProcessTree.java:414)
at hudson.Launcher$RemoteLauncher$KillTask.call(Launcher.java:1103)
at hudson.Launcher$RemoteLauncher$KillTask.call(Launcher.java:1094)
at hudson.remoting.UserRequest.perform(UserRequest.java:212)
at hudson.remoting.UserRequest.perform(UserRequest.java:54)
at hudson.remoting.Request$2.run(Request.java:369)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused: java.io.IOException: Remote call on H30 failed
at hudson.remoting.Channel.call(Channel.java:963)
at hudson.Launcher$RemoteLauncher.kill(Launcher.java:1086)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:510)
at hudson.model.Run.execute(Run.java:1815)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.

[jira] [Created] (KAFKA-9998) KafkaProducer.close(timeout) still may block indefinitely

2020-05-14 Thread radai rosenblatt (Jira)
radai rosenblatt created KAFKA-9998:
---

 Summary: KafkaProducer.close(timeout) still may block indefinitely
 Key: KAFKA-9998
 URL: https://issues.apache.org/jira/browse/KAFKA-9998
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.1
Reporter: radai rosenblatt


looking at KafkaProducer.close(timeout), we have this:
{code:java}
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
timeoutMs);

// this will keep track of the first encountered exception
AtomicReference firstException = new AtomicReference<>();
boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
if (timeoutMs > 0) {
if (invokedFromCallback) {
log.warn("Overriding close timeout {} ms to 0 ms in order to 
prevent useless blocking due to self-join. " +
"This means you have incorrectly invoked close with a 
non-zero timeout from the producer call-back.",
timeoutMs);
} else {
// Try to close gracefully.
if (this.sender != null)
this.sender.initiateClose();
if (this.ioThread != null) {
try {
this.ioThread.join(timeoutMs);< GRACEFUL JOIN
} catch (InterruptedException t) {
firstException.compareAndSet(null, new 
InterruptException(t));
log.error("Interrupted while joining ioThread", t);
}
}
}
}

if (this.sender != null && this.ioThread != null && 
this.ioThread.isAlive()) {
log.info("Proceeding to force close the producer since pending requests 
could not be completed " +
"within timeout {} ms.", timeoutMs);
this.sender.forceClose();
// Only join the sender thread when not calling from callback.
if (!invokedFromCallback) {
try {
this.ioThread.join();   <- UNBOUNDED JOIN
} catch (InterruptedException e) {
firstException.compareAndSet(null, new InterruptException(e));
}
}
}
...
}

{code}
specifically in our case the ioThread was running a (very) long running 
user-provided callback which was preventing the producer from closing within 
the given timeout.

 

I think the 2nd join() call should either be _VERY_ short (since we're already 
past the timeout at that stage) ir should not happen at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9999) Topic description should be triggered after each failed topic creation iteration

2020-05-14 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-:
--

 Summary: Topic description should be triggered after each failed 
topic creation iteration 
 Key: KAFKA-
 URL: https://issues.apache.org/jira/browse/KAFKA-
 Project: Kafka
  Issue Type: Bug
  Components: admin, streams
Affects Versions: 2.4.0
Reporter: Boyang Chen


We spotted a case in system test failure where the topic already exists but the 
admin client still attempts to recreate it:

 
{code:java}
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog. Topic is probably 
marked for deletion (number of partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number of 
partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-uwin-cnt-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager) 
[2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic 
SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number of 
partitions is unknown).
Will retry to create this topic in 100 ms (to let broker finish async delete 
operation first).
Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 
'SmokeTest-cntByCnt-changelog' already exists. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,120] INFO stream-thread [main] Topics 
[SmokeTest-KSTREAM-REDUCE-STATE-STORE-20-changelog, 
SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made 
ready with 5 retries left 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics 
after 5 retries. This can happen if the Kafka cluster is temporary not 
available. You can increase admin client config `retries` to be resilient 
against this error. 
(org.apache.kafka.streams.processor.internals.InternalTopicManager)
[2020-05-14 09:56:40,221] ERROR stream-thread 
[SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered the 
following unexpected Kafka exception during processing, this usually indicate 
Streams internal errors: 
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.StreamsException: Could not create topics after 
5 retries. This can happen if the Kafka cluster is temporary not available. You 
can increase admin client config `retries` to be resilient against this error.
        at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171)
        at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
        at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588)
 
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650)
 
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
        at 
org.apache.kafka.clients.con

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-14 Thread Sophie Blee-Goldman
Whoops, I guess I didn't finish reading the KIP all the way to the end
earlier. Thanks
for including the link to the RocksDB PR in the KIP!

I have one additional question about the proposal: do you plan to also add
this
prefix seek API to the dual column family iterators? These are used by
RocksDBTimestampedStore (which extends RocksDBStore), for example the
*RocksDBDualCFRangeIterator*

Thanks for the KIP!

On Thu, May 14, 2020 at 10:50 AM Sagar  wrote:

> Hey @Adam,
>
> Thanks for sharing your experience with using prefix seek. I did look at
> your code for RocksDBPrefixIterator, infact I have repurposed that class
> itself since it wasn't being used else where. Regarding how I plan to
> expose them through-out the state stores, what I have tried to do is add it
> as a separate interface. So, basically, it is not at the same level as the
> *range function so to speak. The reason I did that is currently I feel not
> all state stores are a natural fit for prefix seek. As I mentioned in the
> KIP as well, the current equivalent to it could be BulkLoadingStore(not in
> terms of functionality but in terms of how it is also not implemented by
> all of them). So, that ways I am not needing to stub them across all the
> state-stores and we can implement it only where needed. For example, in the
> PR that I have put for reference in the KIP, you can see that I have it
> implemented only for RocksDB.
>
> @Guozhang,
>
> Thanks for the feedback. Those are very interesting questions and I will
> try my best to answer based upon whatever limited understanding I have
> developed so far :)
>
> 1) Regarding the usage of useFixedLengthPrefixExtractor, honestly, I hadn't
> looked at that config. I did look it up after you pointed it out and seems
> it's more for hash-based memtables? I may be wrong though. But what I would
> say is that, the changes I had made were not exactly from a correctness
> stand point but more from trying to showcase how we can implement these
> changes. The idea was that once we see the merit in this approach then we
> can add some of the tunings( and I would need your team's assistance there
> :D).
>
> 2) Regarding the similarity of `RocksDBPrefixIterator` and
> `RocksDBRangeIterator`, yes the implementations look more or less similar.
> So, in terms of performance, they might be similar. But semantically, they
> can solve 2 different use-cases. The range seek is useful when we know both
> from and to. But if we consider use-cases where we want to find keys with a
> certain prefix, but we don't know if what it's start and end is, then
> prefix seek would come in more handy. The point that I am trying to make is
> that it can extend the scope of state stores from just point lookups to
> somewhat being able to speculative queries where by users can search if a
> certain pattern exists. I can vouch for this personally because I wanted to
> use state stores for one such use case and since this option wasn't there,
> I had to do some other things. An equivalent to this could be SCAN operator
> in Redis. (Not trying to compare the Redis and state stores but trying to
> give some context).
>
> Regarding the point on bloom filter, I think there are certain
> optimisations that are being talked about in case of prefix seek here:
>
>
> https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter#prefix-vs-whole-key
> Again
> this isn't something that I have explored fully. Also, on the prefix seek
> page on RocksDB they mention that there's a prefix iterating technique
> called Prefix Bloom Filter.
>
> 3) Regarding the question on length of bytes for seek v/s prefix seek, I am
> not entirely sure about that scenario. What I have understood is that
> at-least for Rocks DB, it is more performant for short iterator queries
> that longer ones.
>
> 4) Regarding the last question on placing it within Segment, the reason I
> didn't do that way, is that I thought we shouldn't tie this feature only to
> RocksDB. I agree that I got this idea while looking/reading about RocksDB
> but if we keep it outside the purview of RocksDB and keep it as a pluggable
> entity, then a) it remains generic by not being tied to any specific store
> and b) no change is needed at all for any of the other stores which haven't
> implemented it.
>
> I am not sure of any of the above points make sense but as I said, this is
> based out of my limited understanding of the codebase. So, pardon any
> incorrect/illogical statements plz!
>
> @Sophie,
>
> Thanks for bringing that point up! I have mentioned about that PR in the
> KIP under a section called Other considerations. Nonetheless, thanks for
> pointing it out!
>
> Thanks!
> Sagar.
>
>
> On Thu, May 14, 2020 at 5:17 AM Sophie Blee-Goldman 
> wrote:
>
> > Not to derail this KIP discussion, but to leave a few notes on some of
> the
> > RocksDB points that have come up:
> >
> > Someone actually merged some long overdue performance improvements to
> > the RocksJava implementation (the PR wa

[jira] [Resolved] (KAFKA-9984) Should fail the subscription when pattern is empty

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-9984.

Fix Version/s: 2.6.0
   Resolution: Fixed

> Should fail the subscription when pattern is empty
> --
>
> Key: KAFKA-9984
> URL: https://issues.apache.org/jira/browse/KAFKA-9984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Boyang Chen
>Assignee: HaiyuanZhao
>Priority: Major
> Fix For: 2.6.0
>
>
> We have seen a case where the consumer subscribes to an empty string pattern:
> ```
> [Consumer ...  ] Subscribed to pattern:  ''
> ```
> which doesn't make any sense and usually indicate a configuration error. The 
> `consumer.subscribe(pattern)` call should fail with illegal argument for this 
> case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk11 #1456

2020-05-14 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: skip listOffsets request for newly created changelog topics

[github] KAFKA-9984 Should fail the subscription when pattern is empty (#8665)


--
Started by an SCM change
Running as SYSTEM
[EnvInject] - Loading node environment variables.
Building remotely on H30 (ubuntu) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress -- https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress -- https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision f50bd5f80e33598e7fd70290f0c0e809443bc9a0 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f f50bd5f80e33598e7fd70290f0c0e809443bc9a0
Commit message: "KAFKA-9984 Should fail the subscription when pattern is empty 
(#8665)"
 > git rev-list --no-walk 873e9446ef8426061e2f1b6cd21815b270e27f03 # timeout=10
ERROR: No tool found matching GRADLE_4_10_2_HOME
Setting GRADLE_4_10_3_HOME=/home/jenkins/tools/gradle/4.10.3
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins5780199165877269981.sh
+ rm -rf 
ERROR: No tool found matching GRADLE_4_10_2_HOME
Setting GRADLE_4_10_3_HOME=/home/jenkins/tools/gradle/4.10.3
[kafka-trunk-jdk11] $ /bin/bash -xe /tmp/jenkins2091690435836903399.sh
+ export GRADLE_OPTS=-Xmx1024m
+ GRADLE_OPTS=-Xmx1024m
+ ./gradlew --no-daemon --continue -PmaxParallelForks=2 
-PtestLoggingEvents=started,passed,skipped,failed -PxmlFindBugsReport=true 
clean test -PscalaVersion=2.12
[0.073s][warning][os,thread] Failed to start thread - pthread_create failed 
(EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
Error occurred during initialization of VM
java.lang.OutOfMemoryError: unable to create native thread: possibly out of 
memory or process/resource limits reached
Build step 'Execute shell' marked build as failure
FATAL: Remote call on H30 failed
Also:   hudson.remoting.Channel$CallSiteStackTrace: Remote call to H30
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1743)
at 
hudson.remoting.UserRequest$ExceptionResponse.retrieve(UserRequest.java:357)
at hudson.remoting.Channel.call(Channel.java:957)
at hudson.Launcher$RemoteLauncher.kill(Launcher.java:1086)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:510)
at hudson.model.Run.execute(Run.java:1815)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
java.lang.NoClassDefFoundError: Could not initialize class 
hudson.slaves.SlaveComputer
at hudson.util.ProcessTree.get(ProcessTree.java:414)
at hudson.Launcher$RemoteLauncher$KillTask.call(Launcher.java:1103)
at hudson.Launcher$RemoteLauncher$KillTask.call(Launcher.java:1094)
at hudson.remoting.UserRequest.perform(UserRequest.java:212)
at hudson.remoting.UserRequest.perform(UserRequest.java:54)
at hudson.remoting.Request$2.run(Request.java:369)
at 
hudson.remoting.InterceptingExecutorService$1.call(InterceptingExecutorService.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused: java.io.IOException: Remote call on H30 failed
at hudson.remoting.Channel.call(Channel.java:963)
at hudson.Launcher$RemoteLauncher.kill(Launcher.java:1086)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:510)
at hudson.model.Run.execute(Run.java:1815)
at hudson.mod

Re: [DISCUSS] KIP-554: Add Broker-side SCRAM Config API

2020-05-14 Thread Colin McCabe
On Tue, May 12, 2020, at 06:43, Tom Bentley wrote:
> Hi Colin,
> 
> It's not clear whether users of the Java API would need to supply the salt
> and salted password directly, or whether the constructor of ScramCredential
> would take the password and perform the hashing itself.
> 

Hi Tom,

The AdminClient should do the hashing, right?  I don't see any advantage to 
doing it externally.  I do think we should support setting the salt explicitly, 
but really only for testing purposes.  Normally, it should be randomized.

> I also wonder a little about consistency with the other APIs which have
> separate create/alter/delete methods. I imagine you considered exposing
> separate methods in the Java API,  implementing them using the same RPC,
> but can you share your rationale?

I wanted this to match up with the command-line API, which doesn't distinguish 
between create and alter.

best,
Colin

> 
> Kind regards,
> 
> Tom
> 
> On Mon, May 11, 2020 at 6:48 AM Cheng Tan  wrote:
> 
> > Hi Colin,
> >
> >
> > If I understood correctly, in your design, listScramUsers will return the
> > mechanism and iteration. Let’s use the field naming of RFC 5802 for this
> > discussion:
> >
> >  SaltedPassword  := Hi(Normalize(password), salt, i)
> >  ClientKey   := HMAC(SaltedPassword, "Client Key")
> >  StoredKey   := H(ClientKey)
> >  AuthMessage := client-first-message-bare + "," +
> > server-first-message + "," +
> > client-final-message-without-proof
> >  ClientSignature := HMAC(StoredKey, AuthMessage)
> >  ClientProof := ClientKey XOR ClientSignature
> >  ServerKey   := HMAC(SaltedPassword, "Server Key")
> >  ServerSignature := HMAC(ServerKey, AuthMessage)
> >
> > I think it’s also safe and useful for listScramUsers to return salt and
> > ServerKey. The current practice of —describe with —zookeeper is returning
> > these two fields (KIP-84)
> >
> > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type
> > users --entity-name alice
> > Configs for user-principal 'alice' are
> > SCRAM-SHA-512=[salt=djR5dXdtZGNqamVpeml6NGhiZmMwY3hrbg==,stored_key=sb5jkqStV9RwPVTGxG1ZJHxF89bqjsD1jT4SFDK4An2goSnWpbNdY0nkq0fNV8xFcZqb7MVMJ1tyEgif5OXKDQ==,
> > server_key=3EfuHB4LPOcjDH0O5AysSSPiLskQfM5K9+mOzGmkixasmWEGJWZv7svtgkP+acO2Q9ms9WQQ9EndAJCvKHmjjg==,iterations=4096],SCRAM-SHA-256=[salt=10ibs0z7xzlu6w5ns0n188sis5,stored_key=+Acl/wi1vLZ95Uqj8rRHVcSp6qrdfQIwZbaZBwM0yvo=,server_key=nN+fZauE6vG0hmFAEj/49+2yk0803y67WSXMYkgh77k=,iterations=4096]
> >
> >
> > Please let me know what you think.
> >
> > Best, - Cheng Tan
> >
> > > On Apr 30, 2020, at 11:16 PM, Colin McCabe  wrote:
> > >
> > >
> >
> >
>


Re: [DISCUSS] KIP-554: Add Broker-side SCRAM Config API

2020-05-14 Thread Colin McCabe
Hi Cheng,

Good point.  I updated the KIP to include the same information that is 
currently returned.

best,
Colin


On Sun, May 10, 2020, at 22:40, Cheng Tan wrote:
> Hi Colin,
> 
> 
> If I understood correctly, in your design, listScramUsers will return 
> the mechanism and iteration. Let’s use the field naming of RFC 5802 for 
> this discussion:
> 
>  SaltedPassword  := Hi(Normalize(password), salt, i)
>  ClientKey   := HMAC(SaltedPassword, "Client Key")
>  StoredKey   := H(ClientKey)
>  AuthMessage := client-first-message-bare + "," +
> server-first-message + "," +
> client-final-message-without-proof
>  ClientSignature := HMAC(StoredKey, AuthMessage)
>  ClientProof := ClientKey XOR ClientSignature
>  ServerKey   := HMAC(SaltedPassword, "Server Key")
>  ServerSignature := HMAC(ServerKey, AuthMessage)
> 
> I think it’s also safe and useful for listScramUsers to return salt and 
> ServerKey. The current practice of —describe with —zookeeper is 
> returning these two fields (KIP-84)
> 
> bin/kafka-configs.sh --zookeeper localhost:2181 --describe 
> --entity-type users --entity-name alice
> Configs for user-principal 'alice' are 
> SCRAM-SHA-512=[salt=djR5dXdtZGNqamVpeml6NGhiZmMwY3hrbg==,stored_key=sb5jkqStV9RwPVTGxG1ZJHxF89bqjsD1jT4SFDK4An2goSnWpbNdY0nkq0fNV8xFcZqb7MVMJ1tyEgif5OXKDQ==,
>  
> server_key=3EfuHB4LPOcjDH0O5AysSSPiLskQfM5K9+mOzGmkixasmWEGJWZv7svtgkP+acO2Q9ms9WQQ9EndAJCvKHmjjg==,iterations=4096],SCRAM-SHA-256=[salt=10ibs0z7xzlu6w5ns0n188sis5,stored_key=+Acl/wi1vLZ95Uqj8rRHVcSp6qrdfQIwZbaZBwM0yvo=,server_key=nN+fZauE6vG0hmFAEj/49+2yk0803y67WSXMYkgh77k=,iterations=4096]
> 
> 
> Please let me know what you think.
> 
> Best, - Cheng Tan
> 
> > On Apr 30, 2020, at 11:16 PM, Colin McCabe  wrote:
> > 
> > 
> 
>


Re: [VOTE] KIP-606: Add Metadata Context to MetricsReporter

2020-05-14 Thread Randall Hauch
+1 (binding)

Thanks for the proposal, Xavier.

On Wed, May 13, 2020 at 4:10 PM Gwen Shapira  wrote:

> +1 (binding)
> Thanks for the proposal, Xavier.
>
> On Wed, May 13, 2020 at 11:54 AM Xavier Léauté  wrote:
>
> > Hi everyone,
> >
> > Folks seem happy with the state of the KIP, so I'd like to start the vote
> > for KIP-606
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-606%3A+Add+Metadata+Context+to+MetricsReporter
> >
> > - Xavier
> >
>
>
> --
> Gwen Shapira
>


Re: [DISCUSS] KIP-596 Safely abort Producer transactions during application shutdown

2020-05-14 Thread Boyang Chen
Hey Xiang,

since Matthias is agreeing, do you want to kick off the vote?

Boyang

On Fri, May 8, 2020 at 4:20 PM Matthias J. Sax  wrote:

> >> I'm not sure I follow your question, the producer will not automatically
> >> close when there is fatal error IIUC.
>
> Correct. I found the code example just a little bit confusing, because
> the produce is only closed in case of an error and only if the exception
> is fatal.
>
> Overall, the code example might be better like this:
>
>
>
> try (Producer producer = new KafkaProducer<>(props, new
> StringSerializer(), new StringSerializer())) {
>
>   producer.initTransactions();
>
>   try {
> producer.beginTransaction();
> for (int i = 0; i < 100; i++)
>   producer.send(new ProducerRecord<>("my-topic",
> Integer.toString(i), Integer.toString(i)));
> producer.commitTransaction();
>   } catch (Exception e) {
> producer.abortTransaction();
> if(e instanceof IllegalStateException ||
> e instanceof ProducerFencedException ||
> e instanceof UnsupportedVersionException ||
> e instanceof AuthorizationException) {
>   throw e;
> }
>   }
> }
>
>
>
> Now the producer is closed via try-with-resources for all cases.
>
>
>
> @Xiang: feel free to start a vote thread.
>
>
> -Matthias
>
> On 5/5/20 6:23 PM, Boyang Chen wrote:
> > Hey Matthias,
> >
> > I'm not sure I follow your question, the producer will not automatically
> > close when there is fatal error IIUC.
> >
> > Boyang
> >
> > On Tue, May 5, 2020 at 6:16 PM 张祥  wrote:
> >
> >> Thanks for the comment Matthias.
> >>
> >> In fact, I cannot think of why we cannot close the producer no matter
> what.
> >> On the other hand, it is also okay to reuse the producer when the error
> is
> >> not fatal. @Guozhang Wang   @Boyang Chen
> >> 
> >>
> >> Matthias J. Sax  于2020年5月1日周五 上午7:52写道:
> >>
> >>> Thanks for the KIP. Make sense to me. I think you can start a vote.
> >>>
> >>> One minor comment about the code example: From my understanding, a
> >>> producer should always be closed (independent if there was no error, a
> >>> transient error, or a fatal error). If that is correct, than the code
> >>> example seems to be miss-leading?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 4/25/20 6:08 PM, 张祥 wrote:
>  Sorry, but this KIP is still open to discussion, any comments and
> ideas
>  would be appreciated, Thanks.
> 
>  张祥  于2020年4月17日周五 下午1:04写道:
> 
> > Guozhang, thanks for the valuable suggestion.
> >
> > A new part called "suggested coding pattern" has been added and I
> copy
> >>> the
> > core code here:
> >
> > try {
> > producer.beginTransaction();
> > for (int i = 0; i < 100; i++)
> > producer.send(new ProducerRecord<>("my-topic",
> > Integer.toString(i), Integer.toString(i)));
> > producer.commitTransaction();
> > } catch (Exception e) {
> > producer.abortTransaction();
> > if(e instanceof IllegalStateException ||
> > e instanceof ProducerFencedException ||
> > e instanceof UnsupportedVersionException ||
> > e instanceof AuthorizationException ||
> > e instanceof OutOfOrderSequenceException) {
> > producer.close();
> > }
> > }
> >
> > As you can see, in the catch block,  all fatal exceptions need to be
> > listed, I am not sure I have listed all of them and I wonder if there
> >>> is a
> > better way to do this.
> >
> >
> > Guozhang Wang  于2020年4月17日周五 上午8:50写道:
> >
> >> Xiang, thanks for the written KIP. I just have one meta comment and
> >> otherwise it looks good to me: could you also add a section about
> >> suggested
> >> coding patterns (especially how try - catch should be implemented)
> as
> >>> we
> >> discussed on the JIRA to the wiki page as well?
> >>
> >> And please also note that besides the javadoc of the function, on
> top
> >>> of
> >> the KafkaProducer class there are also comments regarding example
> >>> snippet:
> >>
> >> ```
> >>
> >> * 
> >> * {@code
> >> * Properties props = new Properties();
> >> * props.put("bootstrap.servers", "localhost:9092");
> >> * props.put("transactional.id", "my-transactional-id");
> >> * Producer producer = new KafkaProducer<>(props, new
> >> StringSerializer(), new StringSerializer());
> >> *
> >> * producer.initTransactions();
> >> *
> >> * try {
> >> * producer.beginTransaction();
> >> * for (int i = 0; i < 100; i++)
> >> * producer.send(new ProducerRecord<>("my-topic",
> >> Integer.toString(i), Integer.toString(i)));
> >> * producer.commitTransaction();
> >> * } catch (ProducerFencedException | OutOfOrderSequenceException |
> >> AuthorizationException e) {
> >> * // We can't recover from these exceptions, so our only option
> >> is
> >> to close the producer and exit.
> 

Re: [VOTE]: KIP-604: Remove ZooKeeper Flags from the Administrative Tools

2020-05-14 Thread Guozhang Wang
+1.

Thanks Colin!

Guozhang

On Tue, May 12, 2020 at 3:45 PM Colin McCabe  wrote:

> Hi all,
>
> I'd like to start a vote on KIP-604: Remove ZooKeeper Flags from the
> Administrative Tools.
>
> As a reminder, this KIP is for the next major release of Kafka, the 3.0
> release.   So it won't go into the upcoming 2.6 release.  It's a pretty
> small change that just removes the --zookeeper flags from some tools and
> removes a deprecated tool.  We haven't decided exactly when we'll do 3.0
> but I believe we will certainly want this change in that release.
>
> The KIP does contain one small change relevant to Kafka 2.6: adding
> support for --if-exists and --if-not-exists in combination with the
> --bootstrap-server flag.
>
> best,
> Colin
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-14 Thread Randall Hauch
Hi, Aakash.

Thanks for the KIP. Connect does need an improved ability for sink
connectors to report individual records as being problematic, and this
integrates nicely with the existing DLQ feature.

I also appreciate the desire to maintain compatibility so that connectors
can take advantage of this feature when deployed in a runtime that supports
this feature, but can safely and easily do without the feature when
deployed to an older runtime. But I do understand Andrew's concern about
the aesthetics. Have you considered overloading the `put(...)` method and
adding the `reporter` as a second parameter? Essentially it would add the
one method (with proper JavaDoc) to `SinkTask` only:

```
public void put(Collection records, BiFunction reporter) {
put(records);
}
```
and the WorkerSinkTask would be changed to call `put(Collection,
BiFunction)` instead.

Sink connector implementations that don't do anything different can still
override `put(Collection)`, and it still works as before. Developers that
want to change their sink connector implementations to support this new
feature would do the following, which would work in older and newer Connect
runtimes:
```
public void put(Collection records) {
put(records, null);
}
public void put(Collection records, BiFunction reporter) {
// the normal `put(Collection)` logic goes here, but can optionally
use `reporter` if non-null
}
```

I think this has all the same benefits of the current KIP, but
it's noticeably simpler and hopefully more aesthetically pleasing.

As for Andrew's second concern about "the task can send errant records to
it within put(...)" being too restrictive. My guess is that this was more
an attempt at describing the basic behavior, and less about requiring the
reporter only being called within the `put(...)` method and not by methods
to which `put(...)` synchronously or asynchronously delegates. Can you
confirm whether my assumption is correct? If so, then perhaps my suggestion
helps work around this issue as well, since there would be no restriction
on when the reporter is called, and the whole "Order of Operations" section
could potentially be removed.

Third, it's not clear to me why the "Error Reporter Object" subsection in
the "Proposal" section lists the worker configuration properties that were
previously introduced with
https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect.
Maybe it's worth mentioning that the error reporter functionality will
reuse or build upon KIP-298, including reusing the configuration properties
defined in KIP-298. But IIUC, the KIP does not propose changing any
technical or semantic aspect of these configuration properties, and
therefore the KIP would be more clear and succinct without them. *That* the
error reporter will use these properties is part of the UX and therefore
necessary to mention, but *how* it uses those properties is really up to
the implementation.

Fourth, the "Synchrony" section has a sentence that is confusing, or not as
clear as it could be.

"If a record is sent to the error reporter, processing of the next
errant record in accept(...) will not begin until the producer successfully
sends the errant record to Kafka."

This sentence is a bit difficult to understand, but IIUC this really just
means that "accept(...)" will be synchronous and will block until the
errant record has been successfully written to Kafka. If so, let's say
that. The rest of the paragraph is fine.

Finally, is this KIP proposing new metrics, or that existing metrics would
be used to track the error reporter usage? If the former, then please
fully-specify what these metrics will be, similarly to how metrics are
specified in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
.

Thoughts?

Best regards,

Randall

On Mon, May 11, 2020 at 4:49 PM Andrew Schofield 
wrote:

> Hi Aakash,
> Thanks for sorting out the replies to the mailing list.
>
> First, I do like the idea of improving error reporting in sink connectors.
> I'd like a simple
> way to put bad records onto the DLQ.
>
> I think this KIP is considerably more complicated than it seems. The
> guidance on the
> SinkTask.put() method is that it should send the records asynchronously
> and immediately
> return, so the task is likely to want to report errors asynchronously
> too.  Currently the KIP
> states that "the task can send errant records to it within put(...)" and
> that's too restrictive.
> The task ought to be able to report any unflushed records, but the
> synchronisation of this is going
> to be tricky. I suppose the connector author needs to make sure that all
> errant records have
> been reported before returning control from SinkTask.flush(...) or perhaps
> SinkTask.preCommit(...).
>
> I think the interface is a little strange too. I can see that this was
> done so it's possible to deliver a connector
> that supports error repor

[jira] [Created] (KAFKA-10000) Atomic commit of source connector records and offsets

2020-05-14 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-1:
-

 Summary: Atomic commit of source connector records and offsets
 Key: KAFKA-1
 URL: https://issues.apache.org/jira/browse/KAFKA-1
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Chris Egerton


It'd be nice to be able to configure source connectors such that their offsets 
are committed if and only if all records up to that point have been ack'd by 
the producer. This would go a long way towards EOS for source connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-14 Thread Christopher Egerton
Hi Randall,

I think I prefer the method originally specified in the KIP. A separate
method can come with a contract about if/when it's called so that tasks can
assume that it's only invoked once over their lifetime, and allows
connector developers to separate the logic for storing (and possibly doing
other things with) the errant record reporter from processing records,
which are generally accomplished differently. It also seems strange to
repeatedly pass in the same argument to that method; there's never going to
be a case where the framework might want to change what it passes to the
task for the errant record reporter, is there? Oh, and connector developers
would still have to manually implement the now-deprecated variant of
"put(Collection records)" to call something like "put(records,
null)", or there would be compatibility risks on older workers if they only
implemented the non-deprecated method.

That said, if others like this idea I think it works well enough to not
block the rest of the KIP.

Cheers,

Chris

On Thu, May 14, 2020 at 3:52 PM Randall Hauch  wrote:

> Hi, Aakash.
>
> Thanks for the KIP. Connect does need an improved ability for sink
> connectors to report individual records as being problematic, and this
> integrates nicely with the existing DLQ feature.
>
> I also appreciate the desire to maintain compatibility so that connectors
> can take advantage of this feature when deployed in a runtime that supports
> this feature, but can safely and easily do without the feature when
> deployed to an older runtime. But I do understand Andrew's concern about
> the aesthetics. Have you considered overloading the `put(...)` method and
> adding the `reporter` as a second parameter? Essentially it would add the
> one method (with proper JavaDoc) to `SinkTask` only:
>
> ```
> public void put(Collection records, BiFunction Throwable> reporter) {
> put(records);
> }
> ```
> and the WorkerSinkTask would be changed to call `put(Collection,
> BiFunction)` instead.
>
> Sink connector implementations that don't do anything different can still
> override `put(Collection)`, and it still works as before. Developers that
> want to change their sink connector implementations to support this new
> feature would do the following, which would work in older and newer Connect
> runtimes:
> ```
> public void put(Collection records) {
> put(records, null);
> }
> public void put(Collection records, BiFunction Throwable> reporter) {
> // the normal `put(Collection)` logic goes here, but can optionally
> use `reporter` if non-null
> }
> ```
>
> I think this has all the same benefits of the current KIP, but
> it's noticeably simpler and hopefully more aesthetically pleasing.
>
> As for Andrew's second concern about "the task can send errant records to
> it within put(...)" being too restrictive. My guess is that this was more
> an attempt at describing the basic behavior, and less about requiring the
> reporter only being called within the `put(...)` method and not by methods
> to which `put(...)` synchronously or asynchronously delegates. Can you
> confirm whether my assumption is correct? If so, then perhaps my suggestion
> helps work around this issue as well, since there would be no restriction
> on when the reporter is called, and the whole "Order of Operations" section
> could potentially be removed.
>
> Third, it's not clear to me why the "Error Reporter Object" subsection in
> the "Proposal" section lists the worker configuration properties that were
> previously introduced with
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> .
> Maybe it's worth mentioning that the error reporter functionality will
> reuse or build upon KIP-298, including reusing the configuration properties
> defined in KIP-298. But IIUC, the KIP does not propose changing any
> technical or semantic aspect of these configuration properties, and
> therefore the KIP would be more clear and succinct without them. *That* the
> error reporter will use these properties is part of the UX and therefore
> necessary to mention, but *how* it uses those properties is really up to
> the implementation.
>
> Fourth, the "Synchrony" section has a sentence that is confusing, or not as
> clear as it could be.
>
> "If a record is sent to the error reporter, processing of the next
> errant record in accept(...) will not begin until the producer successfully
> sends the errant record to Kafka."
>
> This sentence is a bit difficult to understand, but IIUC this really just
> means that "accept(...)" will be synchronous and will block until the
> errant record has been successfully written to Kafka. If so, let's say
> that. The rest of the paragraph is fine.
>
> Finally, is this KIP proposing new metrics, or that existing metrics would
> be used to track the error reporter usage? If the former, then please
> fully-specify what these metrics will be, similarly to how metri

Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-14 Thread Christopher Egerton
Hi Aakash,

Responding to your question from a separate KIP-610 discussion thread here,
with my comments:

> 4. Great point about the addition of the extra configuration properties.
By
"If we decide to include these properties, we should also update the
"Synchrony" section to be agnostic about what the error reporter is doing
under the hood since there won't necessarily be a Kafka producer involved
in handling records given to the error reporter," are you referring to the
fact that if people only choose to enable logging and not sending to a DLQ,
there won't necessarily be a producer involved?

Yep, exactly :)

Cheers,

Chris

On Thu, May 14, 2020 at 4:34 PM Christopher Egerton 
wrote:

> Hi Randall,
>
> I think I prefer the method originally specified in the KIP. A separate
> method can come with a contract about if/when it's called so that tasks can
> assume that it's only invoked once over their lifetime, and allows
> connector developers to separate the logic for storing (and possibly doing
> other things with) the errant record reporter from processing records,
> which are generally accomplished differently. It also seems strange to
> repeatedly pass in the same argument to that method; there's never going to
> be a case where the framework might want to change what it passes to the
> task for the errant record reporter, is there? Oh, and connector developers
> would still have to manually implement the now-deprecated variant of
> "put(Collection records)" to call something like "put(records,
> null)", or there would be compatibility risks on older workers if they only
> implemented the non-deprecated method.
>
> That said, if others like this idea I think it works well enough to not
> block the rest of the KIP.
>
> Cheers,
>
> Chris
>
> On Thu, May 14, 2020 at 3:52 PM Randall Hauch  wrote:
>
>> Hi, Aakash.
>>
>> Thanks for the KIP. Connect does need an improved ability for sink
>> connectors to report individual records as being problematic, and this
>> integrates nicely with the existing DLQ feature.
>>
>> I also appreciate the desire to maintain compatibility so that connectors
>> can take advantage of this feature when deployed in a runtime that
>> supports
>> this feature, but can safely and easily do without the feature when
>> deployed to an older runtime. But I do understand Andrew's concern about
>> the aesthetics. Have you considered overloading the `put(...)` method and
>> adding the `reporter` as a second parameter? Essentially it would add the
>> one method (with proper JavaDoc) to `SinkTask` only:
>>
>> ```
>> public void put(Collection records, BiFunction> Throwable> reporter) {
>> put(records);
>> }
>> ```
>> and the WorkerSinkTask would be changed to call `put(Collection,
>> BiFunction)` instead.
>>
>> Sink connector implementations that don't do anything different can still
>> override `put(Collection)`, and it still works as before. Developers that
>> want to change their sink connector implementations to support this new
>> feature would do the following, which would work in older and newer
>> Connect
>> runtimes:
>> ```
>> public void put(Collection records) {
>> put(records, null);
>> }
>> public void put(Collection records, BiFunction> Throwable> reporter) {
>> // the normal `put(Collection)` logic goes here, but can
>> optionally
>> use `reporter` if non-null
>> }
>> ```
>>
>> I think this has all the same benefits of the current KIP, but
>> it's noticeably simpler and hopefully more aesthetically pleasing.
>>
>> As for Andrew's second concern about "the task can send errant records to
>> it within put(...)" being too restrictive. My guess is that this was more
>> an attempt at describing the basic behavior, and less about requiring the
>> reporter only being called within the `put(...)` method and not by methods
>> to which `put(...)` synchronously or asynchronously delegates. Can you
>> confirm whether my assumption is correct? If so, then perhaps my
>> suggestion
>> helps work around this issue as well, since there would be no restriction
>> on when the reporter is called, and the whole "Order of Operations"
>> section
>> could potentially be removed.
>>
>> Third, it's not clear to me why the "Error Reporter Object" subsection in
>> the "Proposal" section lists the worker configuration properties that were
>> previously introduced with
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
>> .
>> Maybe it's worth mentioning that the error reporter functionality will
>> reuse or build upon KIP-298, including reusing the configuration
>> properties
>> defined in KIP-298. But IIUC, the KIP does not propose changing any
>> technical or semantic aspect of these configuration properties, and
>> therefore the KIP would be more clear and succinct without them. *That*
>> the
>> error reporter will use these properties is part of the UX and therefore
>> necessary to mention, but *how* it uses those 

Need help regarding missing kafka producer metrics while upgrade from 1.1.0 to 2.0.0

2020-05-14 Thread Rajkumar Natarajan
Hi Kafka community,

we are currently using  kafka client version 1.1.0 in our production. I'm
working on upgrade it to kafka version 2.0.0.
I see there are some metrics which are present in version 1.1.1 are removed
in kafka version 2.0.0.

S# version 1.1.1 2.0.0
1
kafka.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs

2 kafka.producer:type=ProducerRequestMetrics,name=ProducerRequestSize
3
kafka.producer:type=ProducerRequestMetrics,name=ProducerRequestThrottleRateAndTimeMs

4 kafka.producer:type=ProducerTopicMetrics,name=DroppedMessagesPerSec
5 kafka.producer.async:type=ProducerSendThread,name=ProducerQueueSize

Could you guys please help me the kafka metrics documentation link or KIP
documentation for the above specific producer metrics in kafka 2.0.0
versions.


Re: [VOTE] KIP-586: Deprecate commit records without record metadata

2020-05-14 Thread Gwen Shapira
+1 (binding)

Makes a lot of sense :) Thanks, Mario.

On Mon, Apr 27, 2020 at 12:05 PM Mario Molina  wrote:

> Hi all,
>
> I'd like to start a vote for KIP-586. You can find the link for this KIP
> here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-586%3A+Deprecate+commit+records+without+record+metadata
>
> Thanks!
> Mario
>


-- 
Gwen Shapira
Engineering Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-14 Thread Aakash Shah
Hi Andrew,

Thanks for your comments.

Based on my understanding from
https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/sink/SinkTask.html,
put(...) can be asynchronous but not necessarily should be, specifically
in: "As records are fetched from Kafka, they will be passed to the sink
task using the put(Collection) API, which should either write them to the
downstream system or batch them for later writing. Periodically, Connect
will call flush(Map) to ensure that batched records are actually pushed to
the downstream system."

With respect to sending errant records within put(...) being too
restrictive, as Randall pointed out, this was more of an attempt at
describing the basic behavior rather than requiring the reporter only be
called within put(...).

I see your point about the previous pattern of adding methods that the task
uses to SinkTaskContext. The main concerns I have with this procedure are:

1. The added effort for a connector developer that wants to use this
functionality in checking if this method exists and implementing this
across all connectors
2. Since the addition of that method two years ago, the Connect ecosystem
has grown a lot which causes other compatibility issues and inertia towards
upgrading their version of Connect

Please let me know what you think.

Thanks,
Aakash

On Mon, May 11, 2020 at 2:49 PM Andrew Schofield 
wrote:

> Hi Aakash,
> Thanks for sorting out the replies to the mailing list.
>
> First, I do like the idea of improving error reporting in sink connectors.
> I'd like a simple
> way to put bad records onto the DLQ.
>
> I think this KIP is considerably more complicated than it seems. The
> guidance on the
> SinkTask.put() method is that it should send the records asynchronously
> and immediately
> return, so the task is likely to want to report errors asynchronously
> too.  Currently the KIP
> states that "the task can send errant records to it within put(...)" and
> that's too restrictive.
> The task ought to be able to report any unflushed records, but the
> synchronisation of this is going
> to be tricky. I suppose the connector author needs to make sure that all
> errant records have
> been reported before returning control from SinkTask.flush(...) or perhaps
> SinkTask.preCommit(...).
>
> I think the interface is a little strange too. I can see that this was
> done so it's possible to deliver a connector
> that supports error reporting but it can also work in earlier versions of
> the KC runtime. But, the
> pattern so far is that the task uses the methods of SinkTaskContext to
> access utilities in the Kafka
> Connect runtime, and I suggest that reporting a bad record is such a
> utility. SinkTaskContext has
> changed before when the configs() methods was added, so I think there is
> precedent for adding a method.
> The way the KIP adds a method to SinkTask that the KC runtime calls to
> provide the error reporting utility
> seems not to match what has gone before.
>
> Thanks,
> Andrew
>
> On 11/05/2020, 19:05, "Aakash Shah"  wrote:
>
> I wasn't previously added to the dev mailing list, so I'd like to post
> my
> discussion with Andrew Schofield below for visibility and further
> discussion:
>
> Hi Andrew,
>
> Thanks for the reply. The main concern with this approach would be its
> backward compatibility. I’ve highlighted the thoughts around the
> backwards
> compatibility of the initial approach, please let me know what you
> think.
>
> Thanks,
> Aakash
>
>
> 
>
> Hi,
> By adding a new method to the SinkContext interface in say Kafka 2.6, a
> connector that calls it would require a Kafka 2.6 connect runtime. I
> don't
> quite see how that's a backward compatibility problem. It's just that
> new
> connectors need the latest interface. I might not quite be
> understanding,
> but I think it would be fine.
>
> Thanks,
> Andrew
>
>
> 
>
> Hi Andrew,
>
> I apologize for the way the reply was sent. I just subscribed to the
> dev
> mailing list so it should be resolved now.
>
> You are correct, new connectors would simply require the latest
> interface.
> However, we want to remove that requirement - in other words, we want
> to
> allow the possibility that someone wants the latest connector/to
> upgrade to
> the latest version, but deploys it on an older version of AK.
> Basically, we
> don't want to enforce the necessity of upgrading AK to get the latest
> interface. In the current approach, there would be no issue of
> deploying a
> new connector on an older version of AK, as the Connect framework would
> simply not invoke the new method.
>
> Please let me know what you think and if I need to clari

Build failed in Jenkins: kafka-trunk-jdk8 #4530

2020-05-14 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: skip listOffsets request for newly created changelog topics

[github] KAFKA-9984 Should fail the subscription when pattern is empty (#8665)


--
[...truncated 1.96 MB...]

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullProducedOnToWithTopicName STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullProducedOnToWithTopicName PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullNamedOnFilter STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullNamedOnFilter PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullMapperOnMapValuesWithKeyWithNamed STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullMapperOnMapValuesWithKeyWithNamed PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullJoinWindowsOnJoinWithStreamJoined STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullJoinWindowsOnJoinWithStreamJoined PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullMapperOnFlatMapValues STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullMapperOnFlatMapValues PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullNamedOnFlatMap STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullNamedOnFlatMap PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullValueTransformerSupplierOnTransformValues STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullValueTransformerSupplierOnTransformValues PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldSupportGroupByCountWithKStreamToKTable STARTED

org.apache.kafka.streams.kstream.internals.KGroupedTableImplTest > shouldReduce 
PASSED

org.apache.kafka.streams.kstream.internals.KGroupedTableImplTest > 
shouldThrowNullPointOnCountWhenMaterializedIsNull STARTED

org.apache.kafka.streams.kstream.internals.KGroupedTableImplTest > 
shouldThrowNullPointOnCountWhenMaterializedIsNull PASSED

org.apache.kafka.streams.kstream.internals.KGroupedTableImplTest > 
shouldThrowNullPointerOnReduceWhenAdderIsNull STARTED

org.apache.kafka.streams.kstream.internals.KGroupedTableImplTest > 
shouldThrowNullPointerOnReduceWhenAdderIsNull PASSED

org.apache.kafka.streams.kstream.internals.KGroupedTableImplTest > 
shouldNotAllowNullAdderOnReduce STARTED

org.apache.kafka.streams.kstream.internals.KGroupedTableImplTest > 
shouldNotAllowNullAdderOnReduce PASSED

org.apache.kafka.streams.kstream.internals.KGroupedTableImplTest > 
shouldThrowNullPointerOnAggregateWhenMaterializedIsNull STARTED

org.apache.kafka.streams.kstream.internals.KGroupedTableImplTest > 
shouldThrowNullPointerOnAggregateWhenMaterializedIsNull PASSED

org.apache.kafka.streams.kstream.internals.KGroupedTableImplTest > 
shouldAggregateAndMaterializeResults STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldSupportGroupByCountWithKStreamToKTable PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullJoinWindowsOnLeftJoinWithStreamJoined STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullJoinWindowsOnLeftJoinWithStreamJoined PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullProcessSupplierOnProcessWithNamedAndStores STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullProcessSupplierOnProcessWithNamedAndStores PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullMapperOnSelectKey STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullMapperOnSelectKey PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullNamedOnForEach STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullNamedOnForEach PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueSupplier STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullNamedOnFlatTransformValuesWithFlatValueSupplier PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldCantHaveNullPredicateWithNamed STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldCantHaveNullPredicateWithNamed PASSED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullStoreNameOnTransformWithNamed STARTED

org.apache.kafka.streams.kstream.internals.KStreamImplTest > 
shouldNotAllowNullStoreNameOnTransformW

Re: [DISCUSS] KIP-601: Configurable socket connection timeout

2020-05-14 Thread Cheng Tan
Hi Rajini,

Thanks for the reply. 

> Not sure 10s is a good default because it unnecessarily times out
> connections, only to attempt re-connecting to the same broker (except in
> the leastLoadedNode case where it would be useful to have a lower timeout).


The underlying logic for a connection turn from “connecting” to “connected” is 
finishing the TCP three-way handshake (done by socketChannel.connect()). So we 
can say the 10 seconds timeout is for the transportation layer three-way 
handshake.

The request timeout includes more besides the handshakes, such as 
authentication, server processing the request, and server sending back the 
response. 

Thus I think setting the default of socket.connection.setup.timeout.ms a 
smaller value than request.timeout.ms is reasonable. Do you have any specific 
reason in mind that 10s is too short? 

Best, -Chang Tan 


> 在 2020年5月14日,上午6:44,Rajini Sivaram  写道:
> 
> Hi Cheng,
> 
> 1) Thanks for the update,  the KIP now says `
> socket.connections.setup.timeout.ms*`*, which sounds good.
> 
> 2) Not sure 10s is a good default because it unnecessarily times out
> connections, only to attempt re-connecting to the same broker (except in
> the leastLoadedNode case where it would be useful to have a lower timeout).
> Couldn't we just use 30s (request timeout) as the default value, retaining
> the current behaviour? Or alternatively, only apply the timeout where it
> makes sense to have a lower timeout (i.e. a timeout for leastLoadedNode)?
> 
> Regards,
> 
> Rajini
> 
>> On Thu, May 14, 2020 at 5:00 AM Cheng Tan  wrote:
>> 
>> Hi Rajini,
>> 
>> Thanks for the comments.
>> 
>>> I think
>>> they started off as connection timeouts but now include authentication
>> time
>>> as well. Have we considered using similar configs for this case?
>> 
>> 
>> The new config I proposed is focusing on the connections to unreachable
>> servers. The timeout count won’t involved the authentication. I think
>> “socket.” prefix make sense. I’ll change it. Colin mentioned that adding
>> the key word “setup” can help people understand that this config only
>> applies to the connection setup. What about “socket.connection.setup.ms”?
>> 
>>> The KIP proposes 10s as the default. What does this mean for typical
>>> connections like a produce request going to the leader?
>> 
>> The new timeout config applies to each connection. It’s at the
>> NetworkClient level and won’t consider the underlying connection logic.
>> Specifically, by default, every connection will have 10 seconds to become
>> “connected” from “connecting”, which implies the corresponding socket
>> channel is now connected (SocketChanel.finishConnect() returns True), no
>> matter what the request logic and abstraction is.
>> 
>> Please let me know if these make sense to you or if you have more
>> thoughts. Thank you.
>> 
>> Best, -Cheng
>> 
 在 2020年5月13日,上午7:11,Rajini Sivaram  写道:
>>> 
>>> Hi Cheng,
>>> 
>>> Thanks for the KIP, sounds like a good improvement. A couple of comments:
>>> 
>>> 1) We currently have client connection timeouts on the broker with
>> configs
>>> named `xxx.socket.timeout.ms` (e.g. controller.socket.timeout.ms). I
>> think
>>> they started off as connection timeouts but now include authentication
>> time
>>> as well. Have we considered using similar configs for this case? We may
>>> want to prefix the new config with `socket.` anyway - something along the
>>> lines of `socket.connection.timeout.ms` if it is just the connection
>> time.
>>> 
>>> 2) The KIP proposes 10s as the default. What does this mean for typical
>>> connections like a produce request going to the leader? Instead of one
>>> connection attempt to the leader, we want three separate connection
>>> attempts within the request timeout to the leader?
>>> 
>>> Regards,
>>> 
>>> Rajini
>>> 
>>> 
 On Thu, May 7, 2020 at 11:51 PM Jose Garcia Sancio <
>> jsan...@confluent.io>
 wrote:
 Cheng,
 Thanks for the KIP and the detailed proposal section. LGTM!
>> On Thu, May 7, 2020 at 3:38 PM Cheng Tan  wrote:
>> I think more about the potential wider use cases, I modified the
 proposal to target all the connection. Thanks.
> - Best, - Cheng Tan
>> On May 7, 2020, at 1:41 AM, Cheng Tan  wrote:
>> Hi Colin,
>> Sorry for the confusion. I’m proposing to implement timeout in the
 NetworkClient.leastLoadedNode() when iterating all the cached node. The
 alternative I can think is to implement the timeout in
>> NetworkClient.poll()
>> I’d prefer to implement in the leastLoadedNode(). Here’re the reasons:
>> Usually when clients send a request, they will asking the network
 client to send the request to a specific node. In this case, the
 connection.setup.timeout won’t matter too much because the client
>> doesn’t
 want to try other nodes for that specific request. The request level
 timeout would be enough. The metadata fetcher fetches the nodes status
 periodically so the clients can r

[jira] [Created] (KAFKA-10001) Store's own restore listener should be triggered in store changelog reader

2020-05-14 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10001:
-

 Summary: Store's own restore listener should be triggered in store 
changelog reader
 Key: KAFKA-10001
 URL: https://issues.apache.org/jira/browse/KAFKA-10001
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-610: Error Reporting in Sink Connectors

2020-05-14 Thread Aakash Shah
Hi Randall,

Thanks for the feedback.

1. This is a great suggestion, but I find that adding an overloaded
put(...) which essentially deprecates the old put(...) to only be used when
a connector is deployed on older versions of Connect adds enough of a
complication that could cause connectors to break if the old put(...)
doesn't correctly invoke the overloaded put(...); either that, or it will
add duplication of functionality across the two put(...) methods. I think
the older method simplifies things with the idea that a DLQ/error reporter
will or will not be passed into the method depending on the version of AK.
However, I also understand the aesthetic advantage of this method vs the
setter method, so I am okay with going in this direction if others agree
with adding the overloaded put(...).

2. Yes, your assumption is correct. Yes, we can remove the "Order of
Operations" if we go with the overloaded put(...) direction.

3. Great point, I will remove them from the KIP.

4. Yeah, accept(...) will be synchronous. I will change it to be clearer,
thanks.

5. This KIP will use existing metrics as well introduce new metrics. I will
update this section to fully specify the metrics.

Please let me know what you think.

Thanks,
Aakash

On Thu, May 14, 2020 at 3:52 PM Randall Hauch  wrote:

> Hi, Aakash.
>
> Thanks for the KIP. Connect does need an improved ability for sink
> connectors to report individual records as being problematic, and this
> integrates nicely with the existing DLQ feature.
>
> I also appreciate the desire to maintain compatibility so that connectors
> can take advantage of this feature when deployed in a runtime that supports
> this feature, but can safely and easily do without the feature when
> deployed to an older runtime. But I do understand Andrew's concern about
> the aesthetics. Have you considered overloading the `put(...)` method and
> adding the `reporter` as a second parameter? Essentially it would add the
> one method (with proper JavaDoc) to `SinkTask` only:
>
> ```
> public void put(Collection records, BiFunction Throwable> reporter) {
> put(records);
> }
> ```
> and the WorkerSinkTask would be changed to call `put(Collection,
> BiFunction)` instead.
>
> Sink connector implementations that don't do anything different can still
> override `put(Collection)`, and it still works as before. Developers that
> want to change their sink connector implementations to support this new
> feature would do the following, which would work in older and newer Connect
> runtimes:
> ```
> public void put(Collection records) {
> put(records, null);
> }
> public void put(Collection records, BiFunction Throwable> reporter) {
> // the normal `put(Collection)` logic goes here, but can optionally
> use `reporter` if non-null
> }
> ```
>
> I think this has all the same benefits of the current KIP, but
> it's noticeably simpler and hopefully more aesthetically pleasing.
>
> As for Andrew's second concern about "the task can send errant records to
> it within put(...)" being too restrictive. My guess is that this was more
> an attempt at describing the basic behavior, and less about requiring the
> reporter only being called within the `put(...)` method and not by methods
> to which `put(...)` synchronously or asynchronously delegates. Can you
> confirm whether my assumption is correct? If so, then perhaps my suggestion
> helps work around this issue as well, since there would be no restriction
> on when the reporter is called, and the whole "Order of Operations" section
> could potentially be removed.
>
> Third, it's not clear to me why the "Error Reporter Object" subsection in
> the "Proposal" section lists the worker configuration properties that were
> previously introduced with
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> .
> Maybe it's worth mentioning that the error reporter functionality will
> reuse or build upon KIP-298, including reusing the configuration properties
> defined in KIP-298. But IIUC, the KIP does not propose changing any
> technical or semantic aspect of these configuration properties, and
> therefore the KIP would be more clear and succinct without them. *That* the
> error reporter will use these properties is part of the UX and therefore
> necessary to mention, but *how* it uses those properties is really up to
> the implementation.
>
> Fourth, the "Synchrony" section has a sentence that is confusing, or not as
> clear as it could be.
>
> "If a record is sent to the error reporter, processing of the next
> errant record in accept(...) will not begin until the producer successfully
> sends the errant record to Kafka."
>
> This sentence is a bit difficult to understand, but IIUC this really just
> means that "accept(...)" will be synchronous and will block until the
> errant record has been successfully written to Kafka. If so, let's say
> that. The rest of the paragraph is fine.
>
>

[jira] [Resolved] (KAFKA-10) Kafka deployment on EC2 should be WHIRR based, instead of current contrib/deploy code based solution

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10.
--
Resolution: Won't Fix  (was: Abandoned)

Apache Whirr has been retired.

> Kafka deployment on EC2 should be WHIRR based, instead of current 
> contrib/deploy code based solution
> 
>
> Key: KAFKA-10
> URL: https://issues.apache.org/jira/browse/KAFKA-10
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Affects Versions: 0.6
>Priority: Major
>
> Apache Whirr is a a set of libraries for running cloud services 
> http://incubator.apache.org/whirr/ 
> It is desirable that Kafka's integration with EC2 be Whirr based, rather than 
> the code based solution we currently have in contrib/deploy. 
> The code in contrib/deploy will be deleted in 0.6 release



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: KIP-560 Discuss

2020-05-14 Thread Boyang Chen
Hey Sang, seems this thread has been quiet, are you still working on this
KIP?

On Sat, Mar 7, 2020 at 3:54 PM Matthias J. Sax 
wrote:

> Thanks for the KIP Sang!
>
> I have a couple of more comments about the wiki page:
>
> (1) The "Public Interface" section should only list the new stuff. This
> KIP does not change anything with regard to the existing options
> `--input-topic` or `--intermediate-topic` and thus it's just "noise" to
> have them in this section. Only list the new option `allInputTopicsOption`.
>
> (2) Don't post code, ie, the implementation of private methods. KIPs
> should only describe public interface changes.
>
> (3) The KIP should describe that we intend to use
> `describeConsumerGroups` calls to discover the topic names -- atm, it's
> unclear from the KIP how the new feature actually works.
>
> (4) If the new flag is used, we will discover input and intermediate
> topics. Hence, the name is miss leading. We could call it
> `--all-user-topics` and explain in the description that "user topics"
> are input and intermediate topics for this case (in general, also output
> topics are "user topics" but there is nothing to be done for output
> topics). Thoughts?
>
>
> -Matthias
>
> On 1/27/20 6:35 AM, Sang wn Lee wrote:
> > thank you John Roesle
> >
> > It is a good idea
> > "—all-input-topics"
> >
> > I agree with you
> >
> > I'll update right away
> >
> >
> > On 2020/01/24 14:14:17, "John Roesler"  wrote:
> >> Hi all, thanks for the explanation. I was also not sure how the kip
> would be possible to implement.
> >>
> >> No that it does seem plausible, my only feedback is that the command
> line option could align better with the existing one. That is, the existing
> option is called “—input-topics”, so it seems like the new one should be
> called “—all-input-topics”.
> >>
> >> Thanks,
> >> John
> >>
> >> On Fri, Jan 24, 2020, at 01:42, Boyang Chen wrote:
> >>> Thanks Sophie for the explanation! I read Sang's PR and basically he
> did
> >>> exactly what you proposed (check it here
> >>>  in case I'm wrong).
> >>>
> >>> I think Sophie's response answers Gwen's question already, while in the
> >>> meantime for a KIP itself we are not required to mention all the
> internal
> >>> details about how to make the changes happen (like how to actually get
> the
> >>> external topics), considering the change scope is pretty small as
> well. But
> >>> again, it would do no harm if we mention it inside Proposed Change
> session
> >>> specifically so that people won't get confused about how.
> >>>
> >>>
> >>> On Thu, Jan 23, 2020 at 8:26 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> >>> wrote:
> >>>
>  Hi all,
> 
>  I think what Gwen is trying to ask (correct me if I'm wrong) is how
> we can
>  infer which topics are associated with
>  Streams from the admin client's topic list. I agree that this doesn't
> seem
>  possible, since as she pointed out the
>  topics list (or even description) lacks the specific information we
> need.
> 
>  What we could do instead is use the admin client's
>  `describeConsumerGroups` API to get the information
>  on the Streams app's consumer group specifically -- note that the
> Streams
>  application.id config is also used
>  as the consumer group id, so each app forms a group to read from the
> input
>  topics. We could compile a list
>  of these topics just by looking at each member's assignment (and even
> check
>  for a StreamsPartitionAssignor
>  to verify that this is indeed a Streams app group, if we're being
>  paranoid).
> 
>  The reset tool actually already gets the consumer group description,
> in
>  order to validate there are no active
>  consumers in the group. We may as well grab the list of topics from it
>  while it's there. Or did you have something
>  else in mind?
> 
>  On Sat, Jan 18, 2020 at 6:17 PM Sang wn Lee 
> wrote:
> 
> > Thank you
> >
> > I understand you
> >
> > 1. admin client has topic list
> > 2. applicationId can only have one stream, so It won't be a problem!
> > 3. For example, --input-topic [reg]
> > Allowing reg solves some inconvenience
> >
> >
> > On 2020/01/18 18:15:23, Gwen Shapira  wrote:
> >> I am not sure I follow. Afaik:
> >>
> >> 1. Topics don't include client ID information
> >> 2. Even if you did, the same ID could be used for topics that are
> not
> > Kafka
> >> Streams input
> >>
> >> The regex idea sounds doable, but I'm not sure it solves much?
> >>
> >>
> >> On Sat, Jan 18, 2020, 7:12 AM Sang wn Lee 
>  wrote:
> >>
> >>> Thank you
> >>> Gwen Shapira!
> >>> We'll add a flag to clear all topics by clientId
> >>> It is ‘reset-all-external-topics’
> >>>
> >>> I also want to use regex on the input topic flag to clear all
>  matching
> >>> to

[jira] [Resolved] (KAFKA-3779) Add the LRU cache for KTable.to() operator

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-3779.

Resolution: Won't Fix

`KTable#to()` was removed from the API.

> Add the LRU cache for KTable.to() operator
> --
>
> Key: KAFKA-3779
> URL: https://issues.apache.org/jira/browse/KAFKA-3779
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
>Priority: Major
>
> The KTable.to operator currently does not use a cache. We can add a cache to 
> this operator to deduplicate and reduce data traffic as well. This is to be 
> done after KAFKA-3777.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7566) Add sidecar job to leader (or a random single follower) only

2020-05-14 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7566.

Resolution: Won't Fix

> Add sidecar job to leader (or a random single follower) only
> 
>
> Key: KAFKA-7566
> URL: https://issues.apache.org/jira/browse/KAFKA-7566
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Priority: Minor
>
> Hey there,
> recently we need to add an archive job to a streaming application. The caveat 
> is that we need to make sure only one instance is doing this task to avoid 
> potential race condition, and we also don't want to schedule it as a regular 
> stream task so that we will be blocking normal streaming operation. 
> Although we could do so by doing a zk lease, I'm raising the case here since 
> this could be some potential use case for streaming job also. For example, 
> there are some `leader specific` operation we could schedule in DSL instead 
> of adhoc manner.
> Let me know if you think this makes sense to you, thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-4697) Simplify Streams Reset Tool

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4697.

Resolution: Abandoned

This tickets only affect 0.10.0.x and 0.10.1.x versions and it's very unlikely 
that we will do a new bug-fix release.

> Simplify Streams Reset Tool
> ---
>
> Key: KAFKA-4697
> URL: https://issues.apache.org/jira/browse/KAFKA-4697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Currently, deleting topics does not delete committed offsets. Thus, the reset 
> tool need to modify offsets of internal repartitioning topics that are 
> actually deleted.
> With KAFKA-2000 in place, this is not required anymore (ie, this ticket does 
> not apply to {{0.10.2.0}}.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-4742) ResetTool does not commit offsets correclty

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4742.

Resolution: Abandoned

This ticket only affect 0.10.0.x and 0.10.1.x and it's very unlikely that a new 
bug-fix release will be done.

> ResetTool does not commit offsets correclty
> ---
>
> Key: KAFKA-4742
> URL: https://issues.apache.org/jira/browse/KAFKA-4742
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Matthias J. Sax
>Priority: Minor
>
> {{StreamsResetter}} has two issues with regard to offset manipulation
>  - it should commit offset "zero" for internal repartitioning topics (instead 
> of using `seedToBeginning()`, that is only correct for user input topics)
>  - it should not commit offsets for internal changelog topic
> Does not affect {{0.10.2}} because internal topics get deleted and as of 
> {{0.10.2}} any committed offsets get deleted automatically on topic deletion, 
> too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-4778) OOM on kafka-streams instances with high numbers of unreaped Record classes

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4778.

Resolution: Abandoned

Does not seem to be an issue in newer released any more?

> OOM on kafka-streams instances with high numbers of unreaped Record classes
> ---
>
> Key: KAFKA-4778
> URL: https://issues.apache.org/jira/browse/KAFKA-4778
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, streams
>Affects Versions: 0.10.1.1
> Environment: AWS m3.large Ubuntu 16.04.1 LTS.  rocksDB on local SSD.  
> Kafka has 3 zk, 5 brokers.  
> stream processors are run with:
> -XX:+UseG1GC -XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails
> Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
> Java HotSpot(TM) 64-Bit Server VM (build 25.101-b13, mixed mode)
> Stream processors written in scala 2.11.8
>Reporter: Dave Thomas
>Priority: Major
> Attachments: oom-killer.txt
>
>
> We have a stream processing app with ~8 source/sink stages operating roughly 
> at the rate of 500k messages ingested/day (~4M across the 8 stages).
> We get OOM eruptions once every ~18 hours. Note it is Linux triggering the 
> OOM-killer, not the JVM terminating itself. 
> It may be worth noting that stream processing uses ~50 mbytes while 
> processing normally for hours on end, until the problem surfaces; then in 
> ~20-30 sec memory grows suddenly from under 100 mbytes to >1 gig and does not 
> shrink until the process is killed.
> We are using supervisor to restart the instances.  Sometimes, the process 
> dies immediately once stream processing resumes for the same reason, a 
> process which could continue for minutes or hours.  This extended window has 
> enabled us to capture a heap dump using jmap.
> jhat's histogram feature reveals the following top objects in memory:
> Class Instance Count  Total Size
> class [B  4070487 867857833
> class [Ljava.lang.Object; 2066986 268036184
> class [C  539519  92010932
> class [S  1003290 80263028
> class [I  508208  77821516
> class java.nio.HeapByteBuffer 1506943 58770777
> class org.apache.kafka.common.record.Record   1506783 36162792
> class org.apache.kafka.clients.consumer.ConsumerRecord528652  35948336
> class org.apache.kafka.common.record.MemoryRecords$RecordsIterator501742  
> 32613230
> class org.apache.kafka.common.record.LogEntry 2009373 32149968
> class org.xerial.snappy.SnappyInputStream 501600  20565600
> class java.io.DataInputStream 501742  20069680
> class java.io.EOFException501606  20064240
> class java.util.ArrayDeque501941  8031056
> class java.lang.Long  516463  4131704
> Note high on the list include org.apache.kafka.common.record.Record, 
> org.apache.kafka.clients.consumer.ConsumerRecord,
> org.apache.kafka.common.record.MemoryRecords$RecordsIterator,
> org.apache.kafka.common.record.LogEntry
> All of these contain 500k-1.5M instances.
> There is nothing in stream processing logs that is distinctive (log levels 
> are still at default).
> Could it be references (weak, phantom, etc) causing these instances to not be 
> garbage-collected?
> Edit: to request a full heap dump (created using `jmap 
> -dump:format=b,file=`), contact me directly at opensou...@peoplemerge.com.  
> It is 2G.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-4887) Enabling caching on a persistent window store breaks support for duplicate insertion

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4887.

Resolution: Fixed

> Enabling caching on a persistent window store breaks support for duplicate 
> insertion
> 
>
> Key: KAFKA-4887
> URL: https://issues.apache.org/jira/browse/KAFKA-4887
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Elias Levy
>Priority: Major
>
> {{CachingWindowStore}} and {{RocksDBWindowStore}} interact badly when 
> duplicate insertion support is enabled by passing {{true}} as the fourth 
> argument to {{windowed}} in the state store supplier.
> When the feature is enabled, {{RocksDBWindowStore}} correct handles 
> duplicates by assigning a unique sequence number to each element on insertion 
> and using the number within the key.
> When caching is enabled by calling {{enableCaching}} on the supplier, 
> {{CachingWindowStore}} fails to the the same.  Thus, of multiple values 
> inserted with the same key, only the last one survives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-4934) Add streams test with RocksDb failures

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4934.

Resolution: Abandoned

> Add streams test with RocksDb failures
> --
>
> Key: KAFKA-4934
> URL: https://issues.apache.org/jira/browse/KAFKA-4934
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Priority: Major
>
> We need to add either integration of system tests with RocksDb failing 
> underneath and fix any problems that occur, including deadlocks, wrong 
> exceptions, etc.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5071) ERROR StreamThread:783 StreamThread-128 - stream-thread [StreamThread-128] Failed to commit StreamTask 0_304 state: org.apache.kafka.streams.errors.ProcessorStateExcept

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5071.

Resolution: Abandoned

> ERROR StreamThread:783 StreamThread-128 - stream-thread [StreamThread-128] 
> Failed to commit StreamTask 0_304 state:  
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_304] Failed 
> to flush state store fmdbt 
> -
>
> Key: KAFKA-5071
> URL: https://issues.apache.org/jira/browse/KAFKA-5071
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
> Environment: Linux
>Reporter: Dhana
>Priority: Major
> Attachments: RocksDB_Issue_commitFailedonFlush.7z
>
>
> Scenario: we use two consumer(applicaion -puse10) in different machine.
> using 400 partitions, 200 streams/consumer.
> config:
> bootstrap.servers=10.16.34.29:9092,10.16.35.134:9092,10.16.38.27:9092
> zookeeper.connect=10.16.34.29:2181,10.16.35.134:2181,10.16.38.27:2181
> num.stream.threads=200
> pulse.per.pdid.count.enable=false
> replication.factor=2
> state.dir=/opt/rocksdb
> max.poll.records=50
> session.timeout.ms=18
> request.timeout.ms=502
> max.poll.interval.ms=500
> fetch.max.bytes=102400
> max.partition.fetch.bytes=102400
> heartbeat.interval.ms = 6
> Logs - attached.
> Error:
> 2017-04-11 18:18:45.170 INFO  VehicleEventsStreamProcessor:219 
> StreamThread-32 - Current size of Treemap is 4 for pdid 
> skga11041730gedvcl2pdid2236
> 2017-04-11 18:18:45.170 INFO  VehicleEventsStreamProcessor:245 
> StreamThread-32 - GE to be processed pdid skga11041730gedvcl2pdid2236 and 
> uploadTimeStamp 2017-04-11 17:46:06.883
> 2017-04-11 18:18:45.175 INFO  VehicleEventsStreamProcessor:179 
> StreamThread-47 - Arrived GE uploadTimestamp 2017-04-11 17:46:10.911 pdid 
> skga11041730gedvcl2pdid2290
> 2017-04-11 18:18:45.176 INFO  VehicleEventsStreamProcessor:219 
> StreamThread-47 - Current size of Treemap is 4 for pdid 
> skga11041730gedvcl2pdid2290
> 2017-04-11 18:18:45.176 INFO  VehicleEventsStreamProcessor:245 
> StreamThread-47 - GE to be processed pdid skga11041730gedvcl2pdid2290 and 
> uploadTimeStamp 2017-04-11 17:46:06.911
> 2017-04-11 18:18:45.571 INFO  StreamThread:737 StreamThread-128 - 
> stream-thread [StreamThread-128] Committing all tasks because the commit 
> interval 3ms has elapsed
> 2017-04-11 18:18:45.571 INFO  StreamThread:775 StreamThread-128 - 
> stream-thread [StreamThread-128] Committing task StreamTask 0_304
> 2017-04-11 18:18:45.574 ERROR StreamThread:783 StreamThread-128 - 
> stream-thread [StreamThread-128] Failed to commit StreamTask 0_304 state: 
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_304] Failed 
> to flush state store fmdbt
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:764)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> while executing flush from store fmdbt
>   at 
> com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flushInternal(HarmanRocksDBStore.java:353)
>   at 
> com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flush(HarmanRocksDBStore.java:342)
>   at 
> com.harman.analytics.stream.base.stores.HarmanPersistentKVStore.flush(HarmanPersistentKVStore.java:72)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323)
>   ... 8 more
> Caused by: org.rocksdb.RocksDBException: N
>   at org.rocksdb.RocksDB.flush(Native Method)
>   at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
>   at 
> com.harman.analytics.stream.base.stores.HarmanRocksDBStore.flushInternal(HarmanRocksDBStore.java:351)
>   ... 11 more
> 2017-04-11 18:18:45.583 INFO  StreamThread:397 StreamThr

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-05-14 Thread John Roesler
Hi, Sagar!

Thanks for this KIP. I'm sorry it took me so long to reply. I'll number my
points differently to avoid confusion.

I can provide some additional context on the difficulties we previously
faced in KIP-213 (which you and Adam have already discussed).

J1) In your KIP, you propose the following interface:

public interface PrefixSeekableStore {
KeyValueIterator prefixSeek(K prefix);
}

This is roughly the same thing that Adam and I were considering
before. It has a hidden problem, that it assumes that prefixes of
keys in the key space are also in the key space. In other words, this
is a store with key type K, and the API assumes that prefixes are also
of type K. This is true for some key types, like String or Bytes, but not
for others.

For example, if the keys are UUIDs, then no prefix is also a UUID. If the
key is a complex data type, like Windowed in our own DSL, then
we would absolutely want to query all keys with the same record key
(the K part), or the same window start time, but in neither case is the 
prefix actually a Windowed. 

You can skirt the issue by defining a third type parameter, maybe KP, that
is the "prefix" type, but this would also be awkward for many usages.

J2) There is a related problem with serialization. Whether something
is a prefix or not depends not on the Java key (K), but on the binary
format that is produced when you use a serde on the key. Whether
we say that the prefix must also be a K or whether it gets its own type,
KP, there are problems.

In the latter case, we must additionally require a second set of serdes
for the prefixes, but there's no obvious way to incorporate this in the
API, especially not in the DSL.

In either case, for the API to actually work, we need to know ahead
of time that the Serde will produce a binary key that starts with the
part that we wish to use as a prefix. For example, what we were doing
briefly in KIP-213 (where we had complex keys, similar to Windowed)
was to define "dummy" values that indicate that a Windowed is actually
just a prefix key, not a real key. Maybe the window start time would be
null or the key part would be null. But we also had to define a serde
that would very specifically anticipate which component of the complex
key would need to be used in a prefix key. Having to bring all these
parts together in a reliable, easy-to-debug, fashion gives me some doubt
that people would actually be able to use this feature in complicated
programs without driving themselves crazy.

J3) Thanks so much for including benchmarks and tests! Unfortunately,
these don't include everything you need to really plug into the Streams
API. I think when you push it a little farther, you'll realize what Adam
was talking about wrt the interface difficulties.

In your benchmark and tests, you directly construct the store and then
use it, but in a real Streams application, you can only provide your
implementation in a StoreSupplier, for example via the Materialized
parameter. Then, to use the store from inside a Processor, you'd have
to get it by name from the ProcessorContext, and then cast it to one of
the pre-defined store types, KeyValueStore, WindowedStore, or 
SessionStore. It won't work to "mix in" your interface because the
processor gets a store that's wrapped in layers that handle serialization,
change-logging, recording metrics, and caching. 

To use the store through IQ, you have to provide a QueriableStoreType
to KafkaStreams#store, and you get back a similarly wrapped store.

I think our only choices to add an interface like yours is either to add
it to one of the existing store types, like KeyValueStore or
WindowedStore, or to define a completely new store hierarchy, meaning
you have to duplicate all the "wrapper" layers in Streams.

I think if you write an "end-to-end" test, where you write a Streams app,
provide your store, and then use it in a Processor and through IQ,
you'll see what I'm talking about.

IIRC, those three points were the ones that ultimately led us to abandon
the whole idea last time and just register the stores with key type Bytes.
I think some creative solutions may yet be possible, but it'll take some
more design work to get there.

Can I ask what your motivation is, exactly, for proposing this feature?
The motivation just says "some users may want to do it", which has
the advantage that it's impossible to disagree with, but doesn't provide
a lot of concrete detail ;)

Specifically, what I'm wondering is whether you wanted to use this as
part of a KayValue store, which might be a challenge, or whether you
wanted to use it for more efficient scans in a WindowedStore, like
Guozhang.

Thanks again for the KIP! I hope my response isn't too discouraging;
I just wanted to convey the challenges we faced last time, since they
are all not obvious up front.

Best regards,
-John


On Thu, May 14, 2020, at 16:17, Sophie Blee-Goldman wrote:
> Whoops, I guess I didn't finish reading the KIP all the way to the end
> ear

[jira] [Resolved] (KAFKA-5240) Make persistent checkpointedOffsets optionable

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5240.

Resolution: Fixed

> Make persistent checkpointedOffsets optionable
> --
>
> Key: KAFKA-5240
> URL: https://issues.apache.org/jira/browse/KAFKA-5240
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Lukas Gemela
>Priority: Minor
>
> By looking at the ProcessorStateManager class kafka streams library tries to 
> persist current offset for each partition to the file. It has to use locking 
> mechanism to be sure that there is no other thread/task running which is 
> modifying the .checkpoint file. It does that even if you don't use persistent 
> store in your topology (which is a bit confusing)
> From my understanding this is because you want to make active state 
> restorations faster and not to seek from the beginning 
> (ProcessorStateManager:217)
> We actually run everything in docker environment and we don't restart our 
> microservices - we just run another docker container and delete the old one. 
> We don't use persistent stores and we don't want to have our microservices to 
> write anything to the filesystem. 
> We always set aggressive [compact. delete] policy to get the kafka streams 
> internal topics to have them compacted as much as possible and therefore we 
> don't need a fast recovery either - we always have to replay whole topic no 
> matter what. 
> Would it be possible to make writing to the file system optionable?
> Thanks!
> L.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5393) in-memory state store memory issue

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5393.

Resolution: Abandoned

> in-memory state store memory issue 
> ---
>
> Key: KAFKA-5393
> URL: https://issues.apache.org/jira/browse/KAFKA-5393
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Kevin Chen
>Priority: Major
> Attachments: Screen Shot 2017-06-06 at 9.45.42 AM.png
>
>
> We are running 2 kafka stream instance that use low level Processor API, and 
> we are using in-memory state store.
> When we upgrade  instance A, which will move all the tasks to instance B, and 
> re-balance between the 2 after A is back up and running. 
> But the problem is that, even after the re-balance, the memory in instance A 
> did not drop to the previous level(the load is about the same).
> see the screen shot below.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5581) Avoid creating changelog topics for state stores that are materialized from a source topic

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5581.

Resolution: Fixed

> Avoid creating changelog topics for state stores that are materialized from a 
> source topic
> --
>
> Key: KAFKA-5581
> URL: https://issues.apache.org/jira/browse/KAFKA-5581
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Jeyhun Karimov
>Priority: Major
>  Labels: architecture, performance
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. A few examples:
> There are a few places where the materialized store do not need a separate 
> changelog topic. This issue summarize a specific issue:
> 1) If a KTable is read directly from a source topic, and is materialized i.e. 
> {code}
> table1 = builder.table("topic1", "store1")`.
> {code}
> In this case {{table1}}'s changelog topic can just be {{topic1}}, and we do 
> not need to create a separate {{table1-changelog}} topic.
> 2) if a KStream is materialized for joins where the streams are directly from 
> a topic, e.g.:
> {code}
> stream1 = builder.stream("topic1");
> stream2 = builder.stream("topic2");
> stream3 = stream1.join(stream2, windows);  // stream1 and stream2 are 
> materialized with a changelog topic
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5625) Invalid subscription data may cause streams app to throw BufferUnderflowException

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5625.

Resolution: Abandoned

> Invalid subscription data may cause streams app to throw 
> BufferUnderflowException
> -
>
> Key: KAFKA-5625
> URL: https://issues.apache.org/jira/browse/KAFKA-5625
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Xavier Léauté
>Priority: Minor
>
> I was able to cause my streams app to crash with the following error when 
> attempting to join the same consumer group with a rogue client.
> At the very least I would expect streams to throw a 
> {{TaskAssignmentException}} to indicate invalid subscription data.
> {code}
> java.nio.BufferUnderflowException
> at java.nio.Buffer.nextGetIndex(Buffer.java:506)
> at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361)
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:97)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:302)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:365)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:512)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:462)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:445)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:798)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:778)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:574)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:545)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6000) streams 0.10.2.1 - kafka 0.11.0.1 state restore not working

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6000.

Resolution: Abandoned

> streams 0.10.2.1 - kafka 0.11.0.1 state restore not working
> ---
>
> Key: KAFKA-6000
> URL: https://issues.apache.org/jira/browse/KAFKA-6000
> Project: Kafka
>  Issue Type: Bug
>  Components: core, streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Bart Vercammen
>Priority: Blocker
> Attachments: correct-restore.log, failed-restore.log
>
>
> Potential interop issue between Kafka Streams (0.10.2.1) and Kafka (0.11.0.1)
> {noformat}
> 11:24:16.416 [StreamThread-3] DEBUG rocessorStateManager - task [0_2] 
> Registering state store lateststate to its state manager 
> 11:24:16.472 [StreamThread-3] TRACE rocessorStateManager - task [0_2] 
> Restoring state store lateststate from changelog topic 
> scratch.lateststate.dsh 
> 11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset 
> for partition scratch.lateststate.dsh-2 to latest offset. 
> 11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Partition 
> scratch.lateststate.dsh-2 is unknown for fetching offset, wait for metadata 
> refresh 
> 11:24:16.474 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending 
> ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
> partitionTimestamps={scratch.lateststate.dsh-2=-1}, minVersion=0) to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.476 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 
> ListOffsetResponse 
> {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=1773763}]}]}
>  from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.476 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling 
> ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 
> 1773763, timestamp -1 
> 11:24:16.477 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset 
> for partition scratch.lateststate.dsh-2 to earliest offset. 
> 11:24:16.478 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending 
> ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
> partitionTimestamps={scratch.lateststate.dsh-2=-2}, minVersion=0) to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.480 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 
> ListOffsetResponse 
> {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.481 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling 
> ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 0, 
> timestamp -1 
> 11:24:16.483 [StreamThread-3] DEBUG rocessorStateManager - restoring 
> partition scratch.lateststate.dsh-2 from offset 0 to endOffset 1773763 
> 11:24:16.484 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 0 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.485 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.486 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.490 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 0 to buffered 
> record list 
> 11:24:16.492 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 3 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 0 
> 11:24:16.493 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Returning fetched 
> records at offset 0 for assigned partition scratch.lateststate.dsh-2 and 
> update position to 1586527 
> 11:24:16.494 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Ignoring fetched 
> records for scratch.lateststate.dsh-2 at offset 0 since the current position 
> is 1586527 
> 11:24:16.496 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.496 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.498 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.latest

Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-14 Thread John Roesler
Hi Sophie,

It seems like there would still be plenty of use cases for recording
this metric at all processors and not just stateful ones, but I'm happy
to suspend my arguments for now. Since you're proposing to keep
them at the processor-node level, it will be seamless later to add
in the stateless processors if we want. As a wise man once said,
"Adding is always easier than removing."

Regarding the time measurement, it's an implementation detail
we don't need to consider in the KIP. Nevertheless, I'd greatly
prefer to measure the system time again when recording the
metric. I don't think we've seen any evidence that proves this
would harm performance, and the amount of inaccuracy using
the cached system time could incur is actually substantial. But,
if you want to just "not mention this" in the KIP, we can defer to
the actual PR discussion, at which time we're in a better position
to use benchmarks, etc., to make the call.

Along the lines of the measurement accuracy discussion, one
minor thought I had is that maybe we should consider measuring
the task staleness metric at the sink, rather than the source, so that
it includes the processing latency of the task itself, not just the latency
of everything up to, but not including, the task (which seems confusing
for users). I guess this could also be an implementation detail, though.

Thanks for the update,
-John

On Thu, May 14, 2020, at 13:31, Sophie Blee-Goldman wrote:
> Hey all,
> 
> After discussing with Bruno I'd like to propose a small amendment,
> which is to record the processor-node-level metrics only for *stateful*
> *operators*. They would still be considered a "processor-node-level"
> metric and not a "state-store-level" metric as the staleness is still
> a property of the node rather than of the state itself. However, it seems
> that this information is primarily useful for stateful operators that might
> be exposing state via IQ or otherwise dependent on the record time
> unlike a stateless operator.
> 
> It's worth calling out that recent performance improvements to the metrics
> framework mean that we no longer fetch the system time at the operator
> level, but only once per task. In other words the system time is not updated
> between each process as a record flows through the subtopology, so
> debugging the processor-level latency via the stateleness will not be
> possible.Note that this doesn't mean the operator-level metrics are not
> *useful* relative to the task-level metric. Upstream caching and/or
> suppression
> can still cause a record's staleness at some downstream stateful operator
> to deviate from the task-level staleness (recorded at the source node).
> 
> Please let me know if you have any concerns about this change. The
> KIP has been updated with the new proposal
> 
> On Thu, May 14, 2020 at 3:04 AM Bruno Cadonna  wrote:
> 
> > Hi Sophie,
> >
> > Thank you for the KIP.
> >
> > The KIP looks good to me.
> >
> > 50th percentile:
> > I think we do not need it now. If we need it, we can add it. Here the
> > old truism applies: Adding is always easier than removing.
> >
> > processor-node-level metrics:
> > I think it is good to have the staleness metrics also on
> > processor-node-level. If we do not want to record them on all
> > processor nodes, you could restrict the recording to stateful
> > processor-nodes, since those are the ones that would benefit most from
> > the staleness metrics.
> >
> > Best,
> > Bruno
> >
> > On Thu, May 14, 2020 at 4:15 AM Sophie Blee-Goldman 
> > wrote:
> > >
> > > Yeah, the specific reason was just to align with the current metrics.
> > >
> > > Is it better to conform than to be right? History has a lot to say on
> > that
> > > matter
> > > but I'm not sure how much of it applies to the fine details of metrics
> > > naming :P
> > >
> > > More seriously, I figured if people are looking at this metric they're
> > > likely to
> > > be looking at all the others. Then naming this one "-mean" would probably
> > > lead some to conclude that the "-avg" suffix in the other metrics has a
> > > different meaning.
> > >
> > > As for the percentiles, I actually like p99 (and p75) better. I'll swap
> > > that out
> > >
> > > On Wed, May 13, 2020 at 7:07 PM John Roesler 
> > wrote:
> > >
> > > > Thanks Sophie,
> > > >
> > > > I hope this isn't too nit-picky, but is there a reason to choose "avg"
> > > > instead
> > > > of "mean"? Maybe this is too paranoid, and I might be oversensitive
> > because
> > > > of the mistake I just made earlier, but it strikes me that "avg" is
> > > > actually
> > > > ambiguous, as it refers to a family of statistics, whereas "mean" is
> > > > specific.
> > > > I see other Kafka metrics with "avg", but none with "mean"; was that
> > the
> > > > reason? If so, I'm +1.
> > > >
> > > > Regarding the names of the percentile, I actually couldn't find _any_
> > other
> > > > metrics that use percentile. Was there a reason to choose "99th" as
> > opposed
> > > > to "p99" or any other s

[jira] [Resolved] (KAFKA-6124) Revisit default config for internal client with regard to resilience

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6124?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6124.

Resolution: Abandoned

We changed couple of default values (also in consumer/producer/admin clients). 
This ticket is not needed any longer.

> Revisit default config for internal client with regard to resilience
> 
>
> Key: KAFKA-6124
> URL: https://issues.apache.org/jira/browse/KAFKA-6124
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip, streams-resilience
>
> We should reevaluate the default config of our internally used clients, to 
> update them to make Streams more resilient out-of-the-box.
> For example:
>  - increase producer "retries"
>  - increase producer "max.block.ms"
>  - consider impact on max.poll.internal.ms (should we keep it at 
> Integer.MAX_VALUE -- note, that KAFKA-5152 resolve the issue why we did set 
> it to infinity)
>  - double check all other defaults including {{KafkaAdmintClient}}
> We should also document all finding in the docs and explain how users can 
> configure their application to be more resilient if they want to.
> This Jira requires a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6125) Avoid third party exception to flow through streams code base

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6125.

Resolution: Not A Problem

> Avoid third party exception to flow through streams code base
> -
>
> Key: KAFKA-6125
> URL: https://issues.apache.org/jira/browse/KAFKA-6125
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: streams-exception-handling
>
> Streams uses multiple internal client that might throw fatal exceptions (some 
> should actually never occur, and if, this would indicate a bug).
> We should wrap all calls to the used clients with a {{try-catch}}, and log 
> those exceptions as ERRORs immediately. For exceptions that can only occur 
> due to a bug (e.g., IllegalStateException, IllegalArgumentException, 
> WakeupException, InterruptException) we should ask users in the log message 
> to report this as a bug.
> Last, we rethrow all those exceptions as {{StreamsException}} (to avoid that 
> a standard library exception might be caught by accident somewhere else in 
> our code base).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6182) Automatic co-partitioning of topics via automatic intermediate topic with matching partitions

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6182.

Resolution: Fixed

KIP-221 is implemented now. Closing this ticket.

> Automatic co-partitioning of topics via automatic intermediate topic with 
> matching partitions
> -
>
> Key: KAFKA-6182
> URL: https://issues.apache.org/jira/browse/KAFKA-6182
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Antony Stubbs
>Priority: Major
>
> Currently it is up to the user to ensure that two input topics for a join 
> have the same number of partitions, and if they don't, manually create an 
> intermediate topic, and send the stream #through that topic first, and then 
> performing the join.
> It would be great to have Kafka streams detect this and at least give the 
> user the option to create an intermediate topic automatically with the same 
> number of partitions as the topic being joined with.
> See 
> https://docs.confluent.io/current/streams/developer-guide.html#joins-require-co-partitioning-of-the-input-data



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6237) stream stopped working after exception: Cannot execute transactional method because we are in an error state

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6237.

Resolution: Cannot Reproduce

There was a lot of EOS work recently. Closing this ticket. Should not be an 
issue any longer.

> stream stopped working after exception: Cannot execute transactional method 
> because we are in an error state
> 
>
> Key: KAFKA-6237
> URL: https://issues.apache.org/jira/browse/KAFKA-6237
> Project: Kafka
>  Issue Type: Bug
>  Components: core, streams
>Reporter: DHRUV BANSAL
>Priority: Critical
>  Labels: exactly-once
> Attachments: nohup.out
>
>
> 017-11-19 07:52:44,673 
> [project_logs_stream-a30ea242-3c9f-46a9-a01c-51903bd40ca5-StreamThread-1] 
> ERROR: org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [orion_logs_stream-a30ea242-3c9f-46a9-a01c-51903bd40ca5-StreamThread-1] 
> Failed while closing StreamTask 0_1:
> org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error state
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:524)
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:198)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:598)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:434)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:1086)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:1041)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:538)
> Caused by: org.apache.kafka.common.KafkaException: Unexpected error in 
> AddOffsetsToTxnResponse: The server experienced an unexpected error when 
> processing the request
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$AddOffsetsToTxnHandler.handleResponse(TransactionManager.java:978)
>   at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:648)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
>   at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:206)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
>   at java.lang.Thread.run(Thread.java:745)
> Also when I see the state of the corresponding consumer group it is saying:
> +Warning: Consumer group  is rebalancing.+



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6461.

Resolution: Cannot Reproduce

> TableTableJoinIntegrationTest is unstable if caching is enabled
> ---
>
> Key: KAFKA-6461
> URL: https://issues.apache.org/jira/browse/KAFKA-6461
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: flaky-test
> Fix For: 1.1.0
>
>
> {noformat}
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
> testLeftInner[caching enabled = true] FAILED
> 20:41:05 java.lang.AssertionError: Condition not met within timeout 
> 15000. Never received expected final result.
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> 20:41:05 at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
> 20:41:05 at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6804) Event values should not be included in log messages

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6804.

Resolution: Fixed

I believe this is fixed via KAFKA-8501.

> Event values should not be included in log messages
> ---
>
> Key: KAFKA-6804
> URL: https://issues.apache.org/jira/browse/KAFKA-6804
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Jussi Lyytinen
>Priority: Major
>
> In certain error situations, event values are included in log messages:
> {code:java}
> 2018-04-19 08:00:28 
> [my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] ERROR 
> o.a.k.s.p.i.AssignedTasks - stream-thread 
> [my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] Failed to 
> commit stream task 1_1 due to the following error:
> org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending 
> since an error caught with a previous record (key [my-key] value [my-value] 
> ...
> {code}
> In some environments, this is highly undesired behavior since the values can 
> contain sensitive information. Error logs are usually collected to separate 
> systems not meant for storing such information (e.g. patient or financial 
> data).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6821) The producer attempted to use a producer id which is not currently assigned to its transactional id

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6821.

Resolution: Cannot Reproduce

A lot of work on EOS was done recently. Closing this ticket at it should not be 
an issue any longer.

> The producer attempted to use a producer id which is not currently assigned 
> to its transactional id 
> 
>
> Key: KAFKA-6821
> URL: https://issues.apache.org/jira/browse/KAFKA-6821
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Centos 7
>Reporter: RandySun
>Priority: Trivial
>  Labels: eos
>
> I use Kafka Stream to join to KStream,however, I found an error stack trace 
> in my application log as below:
> {code:java}
> Aborting producer batches due to fatal error 
> org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id 
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1037)
>  
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905)
>  
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) 
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) 
> at java.lang.Thread.run(Thread.java:745) 
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id 
> {code}
>  
> There is no evidence showing any other error in my application, and the kafka 
> controller.log saying
>  
>  
> {code:java}
> [2018-04-17 17:54:33,212] DEBUG [Controller id=2] Preferred replicas by 
> broker Map(2 -> Map(__consumer_offsets-19 -> Vector(2, 3, 1), 
> com.randy.Demo1-KSTREAM-FLATMAP-01-repartition-1 -> Vector(2)
> [2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
> replica Map() (kafka.controller.KafkaController)
> [2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
> broker 2 is 0.0 (kafka.controller.KafkaController)
> [2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
> replica Map() (kafka.controller.KafkaController)
> [2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
> broker 1 is 0.0 (kafka.controller.KafkaController)
> [2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
> replica Map() (kafka.controller.KafkaController)
> [2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
> broker 3 is 0.0 (kafka.controller.KafkaController)
> {code}
>  
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6888) java.lang.UnsatisfiedLinkError: librocksdbjni4271925274440341234.dll: âÑX

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6888.

Resolution: Abandoned

> java.lang.UnsatisfiedLinkError: librocksdbjni4271925274440341234.dll:  âÑX
> --
>
> Key: KAFKA-6888
> URL: https://issues.apache.org/jira/browse/KAFKA-6888
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.1.0
> Environment: Windows 7, Java 8, Kafka-Streams 1.1.0 and 0.11.0.0 
> versions
>Reporter: Sandeep Kapoor
>Priority: Major
>
> Hi Team,
> I am Window 7 user and using *kafka-streams* 1.1.0 (Latest) version. When I 
> am executing my code, I am getting the issue "StreamThread-1" 
> **java.lang.UnsatisfiedLinkError:** 
> *C:\Users\sandeep\AppData\Local\Temp\librocksdbjni4271925274440341234.dll:  
> âÑX*". Earlier I was trying my code with 0.11.0.0 version, and was getting 
> "*RocksDB on windows (librocksdbjni-win64.dll)- Can't find dependent 
> libraries*". Then as per ([https://github.com/facebook/rocksdb/issues/1302)] 
> thread I downloaded and installed **Visual C++ runtime for Visual Studio 
> 2015** and upgraded  the kafka-stream version from 0.11.0.0 to 1.1.0. But 
> still I am getting the same exception (java.lang.UnsatisfiedLinkError). Only 
> thing changed is that instead of "Can't find dependent libraries" now I am 
> getting  "âÑX" ( a strange expression).
> Also I checked that in my maven dependencies I am using RocksDbJni version 
> 5.7.3 and it contains "librocksdbjni-win64.dll" file as well.
> *Please find below the exception, I am getting (with kafka-streams version 
> 1.1.0) :*  -
> Exception in thread 
> "streams-starter-app-862326ca-30c0-468d-898c-e40d4578f1c7-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: 
> C:\Users\sandeep\AppData\Local\Temp\librocksdbjni4271925274440341234.dll:  âÑX
>      at java.lang.ClassLoader$NativeLibrary.load(Native Method)
>      at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1929)
>      at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1814)
>      at java.lang.Runtime.load0(Runtime.java:809)
>      at java.lang.System.load(System.java:1083)
>      at 
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
>      at 
> org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
>      at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
>      at org.rocksdb.RocksDB.(RocksDB.java:35)
>      at org.rocksdb.Options.(Options.java:25)
>      at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:116)
>      at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:167)
>      at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
>      at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
>      at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.init(InnerMeteredKeyValueStore.java:160)
>      at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.init(MeteredKeyValueBytesStore.java:102)
>      at 
> org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:225)
>      at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:162)
>      at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88)
>      at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:316)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>      at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
>  INFO stream-client 
> [streams-starter-app-862326ca-30c0-468d-898c-e40d4578f1c7] State transition 
> from ERROR to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:261)
> *Please find below the exception, (with kafka-streams version 0.11.0.0 and 
> prior to installing "Visual C++ runtime for Visual Studio 2015") : -*
> WARN stream-thread 
> [streams-starter-app-4be266bc-fcc9-4c1e-93be-807e3736d6cb-StreamThread-1] 
> Unexpected state transition from ASSIGNING_PARTITIONS to DEAD. 
> (org.apache.kafka.streams.processor.internals.StreamThread:978) 
>  INFO stream-client 
> [streams-starter-app-4be266bc-fcc9-4c1e-93be-807e3736d6cb] State transition 
> from REBALANCING to PENDING_SHUTDOWN. 
> (org.apache.kafka.streams.KafkaStreams:229) 
>  java.lang.UnsatisfiedLinkError: 
> C:\Users\sandeep\AppData\Local\Temp\librocksdbjni662096

[jira] [Resolved] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6822.

Resolution: Abandoned

> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, streams
>Reporter: Phil Mikhailov
>Priority: Major
>
> We have a microservices that use Kafka Streams which stuck in initialization 
> of stream topolgy while filling StateStore from Kafka using KafkaConsumer. 
> Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but 
> environment runs Kafka cluster 1.0.0 (Confluent 4.0.0). 
> We reproduced this problem several times by restarting microservices and 
> eventually had to reset the stream offsets to beginning in order unblock 
> microservices.
> While investigating this problem more deeply we found out that  StateStore 
> (0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses 
> KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like 
> this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
> That said, we had a situation when StateStore (0.10.2.1) stuck in loading 
> data. The reason was in {{ProcessorStateManager.restoreActiveState:245}} 
> which kept spinning in consumer loop 'cause the following condition never 
> happened:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  
> We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
> compaction. 
>  Or there is inconsistency between offsets calculation between 0.10.2.1 and 
> 1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6977.

Resolution: Abandoned

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> -
>
> Key: KAFKA-6977
> URL: https://issues.apache.org/jira/browse/KAFKA-6977
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>  Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> partition assignment took 40 ms.
> current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
> 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 
> 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31]
> current standby tasks: []
> previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 
> 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28]
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> State transition from PARTITIONS_ASSIGNED to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State 
> transition from REBALANCING to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> ERROR org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> Encountered the following error during processing:
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Shutting down
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from RUNNING to PENDING_SHUTDOWN.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka 
> producer with timeoutMillis = 9223372036854775807 ms.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Stream thread shutdown complete
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from PENDING_SHUTDOWN to DEAD.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4] State 
> transition from RUNNING to ERROR.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> WARN org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-3727734b-3c2f-4775-8

[jira] [Resolved] (KAFKA-7004) Support configurable restore consumer poll timeout

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7004.

Resolution: Fixed

> Support configurable restore consumer poll timeout
> --
>
> Key: KAFKA-7004
> URL: https://issues.apache.org/jira/browse/KAFKA-7004
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Shawn Nguyen
>Priority: Minor
>  Labels: Needs-kip
>
> In the StateChangelogReader, the restore consumer is currently hard coded to 
> poll for 10ms at most per call.
> {code:java}
>  public Collection restore(final RestoringTasks active) {
> if (!needsInitializing.isEmpty()) {
> initialize();
> }
> if (needsRestoring.isEmpty()) {
> restoreConsumer.unsubscribe();
> return completed();
> }
> final Set restoringPartitions = new 
> HashSet<>(needsRestoring.keySet());
> try {
> final ConsumerRecords allRecords = 
> restoreConsumer.poll(10);
> for (final TopicPartition partition : restoringPartitions) {
> restorePartition(allRecords, partition, 
> active.restoringTaskFor(partition));
> }
> ...{code}
>  
> It'd be nice to be able to configure the restore consumer to poll for a 
> larger timeout (e.g. 500ms) to give it more time to accumulate records for 
> the restoration task. In the main event loop for polling in 
> StreamThread.java, the main consumer uses the POLL_MS_CONFIG set in 
> StreamsConfig.java to configure the max poll timeout. We could construct a 
> similar config in the StreamsConfig class, but prefixed with the processing 
> type (restore in this case). Let me know if this sounds reasonable, and I'll 
> create a KIP and PR.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7015) Enhance RecordCollectorImpl exceptions with more context information

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7015.

Resolution: Not A Problem

We should actually not log sensitive information like key and value and 
actually removed some log statements that may leak key/value information. 
Hence, closing this ticket.

> Enhance RecordCollectorImpl exceptions with more context information  
> -
>
> Key: KAFKA-7015
> URL: https://issues.apache.org/jira/browse/KAFKA-7015
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Minor
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
> messages anymore if a put/get/delete operation fails as it only handles plain 
> bytes.
> In addition, the corresponding calls to record changelog records to record 
> collectors will also be sending byte arrays only, and hence when there is an 
> error happening, the record collector can only display the key but not the 
> value since it is all bytes:
> {code:java}
> [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
> task [2_2] Error sending record (key {"eventId":XXX,"version":123}
> value [] timestamp YYY) to topic TTT
> due to ...
> {code}
> The store exceptions got fixed via KAFKA-6538.
> This Jira is to track the fix for RecordCollectorImpl.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7035.

Resolution: Abandoned

> Kafka Processor's init() method sometimes is not called
> ---
>
> Key: KAFKA-7035
> URL: https://issues.apache.org/jira/browse/KAFKA-7035
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Oleksandr Konopko
>Priority: Critical
> Attachments: TransformProcessor.java
>
>
> Scenario:
> 1. We have processing of Kafka Topic which is implemented with Processor API
> 2. We want to collect metrics (lets say just count number of processed 
> entities for simplicity)
> 3. How we tried to organize this
>  * process data with process() method and send it down the stream with context
>  * on each call of process() method update the counter
>  * schedule puctuate function which will send metric to special topic. Metric 
> is build with counter
> You can find the code (we removed all business sensitive code out of it, so 
> it should be easy to read) in attachment
>  
> Problematic Kafka Streams behaviour that i can see by logging every step:
> 1. We have 80 messages in the input topic
> 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
> ProcessorB, ProcessorC and ProcessorD
> 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed 
> correctly, results are sent down the stream. Counter is upated
> 4. init() method was not called for ProcessorA and ProcessorB
> 5. ProcessorC and ProcessorD are created and they start to receive all the 
> rest of data. 95-99%
> 6. init() method is called for both ProcessorC and ProcessorD. It initiates 
> punctuation, which causes Metrics message be created and sent down the metric 
> stream periodically
> 7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
> Metric entity was not sent to metrics topic
> 8. Processing is finished.
>  
> In the end:
> Expected:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to 80
> Actual results:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to some number 3-6% less 
> than 80, for example 786543
>  
> Problem:
>  * init() method call is not guaranteed
>  * there is no way to guarantee that all work was done by punctuate method 
> before close()
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7133) DisconnectException every 5 minutes in single restore consumer thread

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7133.

Resolution: Abandoned

> DisconnectException every 5 minutes in single restore consumer thread
> -
>
> Key: KAFKA-7133
> URL: https://issues.apache.org/jira/browse/KAFKA-7133
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: Kafka Streams application in Kubernetes.
> Kafka Server in Docker on machine in host mode
>Reporter: Chris Schwarzfischer
>Priority: Major
>
> One of our streams applications (and only this one) gets a 
> {{org.apache.kafka.common.errors.DisconnectException}} almost exactly every 5 
> minutes.
> The application has two of
> KStream -> KGroupedStream -> KTable -> KGroupedTable -> KTable
> aggregations.
> Relevant config is in Streams:
> {code:java}
> this.properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.AT_LEAST_ONCE);
> //...
> this.properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
> this.properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 
> 1024 * 500 /* 500 MB */ );
> this.properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1024 * 
> 1024 * 100 /* 100 MB */);
> this.properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024 * 1024 * 50 
> /* 50 MB */);
> {code}
> On the broker:
> {noformat}
> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
> KAFKA_OFFSETS_RETENTION_MINUTES: 108000
> KAFKA_MIN_INSYNC_REPLICAS: 2
> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
> KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 2147483000
> KAFKA_LOG_RETENTION_HOURS: 2688
> KAFKA_OFFSETS_RETENTION_CHECK_INTERVAL_MS: 120
> KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 12000
> {noformat}
> Logging gives us a single restore consumer thread that throws exceptions 
> every 5 mins:
>  
> {noformat}
> July 4th 2018, 15:38:51.560   dockertest032018-07-04T13:38:51,559Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=317141939, epoch=INITIAL) 
> to node 1: org.apache.kafka.common.errors.DisconnectException.
> July 4th 2018, 15:37:54.833   dockertest032018-07-04T13:37:54,832Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=2064325970, epoch=INITIAL) 
> to node 3: org.apache.kafka.common.errors.DisconnectException.
> July 4th 2018, 15:37:54.833   dockertest032018-07-04T13:37:54,832Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=1735432619, epoch=INITIAL) 
> to node 2: org.apache.kafka.common.errors.DisconnectException.
> July 4th 2018, 15:32:26.379   dockertest032018-07-04T13:32:26,378Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=317141939, epoch=INITIAL) 
> to node 1: org.apache.kafka.common.errors.DisconnectException.
> July 4th 2018, 15:32:01.926   dockertest032018-07-04T13:32:01,925Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=1735432619, epoch=INITIAL) 
> to node 2: org.apache.kafka.common.errors.DisconnectException.
> July 4th 2018, 15:32:01.926   dockertest032018-07-04T13:32:01,925Z INFO  
> : 
> [testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2][]:
>  FetchSessionHandler::handleError:440 - [Consumer 
> clientId=testdev-cs9-test-aggregate-udrs-e39ef2d4-452b-4697-b031-26fc1bac8831-StreamThread-2-restore-consumer,
>  groupId=] Error sending fetch request (sessionId=2064325970, epoch=INITIAL) 
> to node 3:

[jira] [Resolved] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7209.

Resolution: Abandoned

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7203) Improve Streams StickyTaskAssingor

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7203.

Resolution: Fixed

> Improve Streams StickyTaskAssingor
> --
>
> Key: KAFKA-7203
> URL: https://issues.apache.org/jira/browse/KAFKA-7203
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> This is a inspired discussion while trying to fix KAFKA-7144.
> Currently we are not striking a very good trade-off sweet point between 
> stickiness and workload balance: we are honoring the former more than the 
> latter. One idea to improve on this is the following:
> {code}
> I'd like to propose a slightly different approach to fix 7114 while making 
> no-worse tradeoffs between stickiness and sub-topology balance. The key idea 
> is to try to adjust the assignment to gets the distribution as closer as to 
> the sub-topologies' num.tasks distribution.
> Here is a detailed workflow:
> 1. at the beginning, we first calculate for each client C, how many tasks 
> should it be assigned ideally, as num.total_tasks / num.total_capacity * 
> C_capacity rounded down, call it C_a. Note that since we round down this 
> number, the summing C_a across all C would be <= num.total_tasks, but this 
> does not matter.
> 2. and then for each client C, based on its num. previous assigned tasks C_p, 
> we calculate how many tasks it should take over, or give up as C_a - C_p (if 
> it is positive, it should take over some, otherwise it should give up some).
> Note that because of the round down, when we calculate the C_a - C_p for each 
> client, we need to make sure that the total number of give ups and total 
> number of take overs should be equal, some ad-hoc heuristics can be used.
> 3. then we calculate the tasks distribution across the sub-topologies as a 
> whole. For example, if we have three sub-topologies, st0 and st1, and st0 has 
> 4 total tasks, st1 has 4 total tasks, and st2 has 8 total tasks, then the 
> distribution between st0, st1 and st2 should be 1:1:2. Let's call it the 
> global distribution, and note that currently since num.tasks per sub-topology 
> never change, this distribution should NEVER change.
> 4. then for each client that should give up some, we decides which tasks it 
> should give up so that the remaining tasks distribution is proportional to 
> the above global distribution.
> For example, if a client previously own 4 tasks of st0, no tasks of st1, and 
> 2 tasks of st2, and now it needs to give up 3 tasks, I should then give up 2 
> of st0 and 1 of st1, so that the remaining distribution is closer to 1:1:2.
> 5. now we've collected a list of given-up tasks plus the ones that does not 
> have any prev active assignment (normally operations it should not happen 
> since all tasks should have been created since day one), we now migrate them 
> to those who needs to take over some, similarly proportional to the global 
> distribution.
> For example if a client previously own 1 task of st0, and nothing of st1 and 
> st2, and now it needs to take over 3 tasks, we would try to give it 1 task of 
> st1 and 2 tasks of st2, so that the resulted distribution becomes 1:1:2. And 
> we ONLY consider prev-standby tasks when we decide which one of st1 / st2 
> should we get for that client.
> Now, consider the following scenarios:
> a) this is a clean start and there is no prev-assignment at all, step 4 would 
> be a no-op; the result should still be fine.
> b) a client leaves the group, no client needs to give up and all clients may 
> need to take over some, so step 4 is no-op, and the cumulated step 5 only 
> contains the tasks of the left client.
> c) a new client joins the group, all clients need to give up some, and only 
> the new client need to take over all the given-up ones. Hence step 5 is 
> straight-forward.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7445) Branch one Stream in multiple Streams

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7445.

Resolution: Not A Problem

> Branch one Stream in multiple Streams
> -
>
> Key: KAFKA-7445
> URL: https://issues.apache.org/jira/browse/KAFKA-7445
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Dennis Reiter
>Priority: Minor
>
> Hi,
> I need to branch/split KStreams in multiple independent KStreams. I thought, 
> {{org.apache.kafka.streams.kstream.internals.KStreamImpl#branch}} is the 
> right one but in fact, its designed for another purpose.
> In contrast to {{branch}} I need to assign the record to *all* matching 
> streams, not only one stream.
> Speaking in code 
> ({{org.apache.kafka.streams.kstream.internals.KStreamBranch}}):
> {code:java}
> if (predicates[i].test(key, value)) {
>// use forward with childIndex here
>// and pipe the record to multiple streams
>context().forward(key, value, i);
> }
> {code}
> My question: is this still possible with features already included in 
> Streams? Or shall I propose a change?
> Thanks in advance
> Dennis



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7708) [kafka-streams-scala] Invalid signature for KTable join in 2.12

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7708.

Resolution: Abandoned

> [kafka-streams-scala] Invalid signature for KTable join in 2.12
> ---
>
> Key: KAFKA-7708
> URL: https://issues.apache.org/jira/browse/KAFKA-7708
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Edmondo Porcu
>Priority: Major
>  Labels: scala
>
> The signature in Scala 2.12 for the join in the 
> org.kafka.streams.scala.streams.KTable cannot be resolved by the compiler, 
> probably due to the way parameters lists are handled by the compiler .
> See:
>  
> [https://github.com/scala/bug/issues/11288]
> [https://stackoverflow.com/questions/53615950/scalac-2-12-fails-to-resolve-overloaded-methods-with-multiple-argument-lists-whe]
>  
> We are wondering how this is not captured by the current build of Kafka, we 
> are building on 2.12.7 as well



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7985) Cleanup AssignedTasks / AbstractTask logic

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7985.

Resolution: Abandoned

Given that this ticket is old and the code was refactored multiple times 
already, I am closing this ticket.

> Cleanup AssignedTasks / AbstractTask logic
> --
>
> Key: KAFKA-7985
> URL: https://issues.apache.org/jira/browse/KAFKA-7985
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Today the life time of a task is:
> created -> [initializeStateStores] -> 
> restoring (writes to the initialized state stores) -> [initializeTopology] -> 
> running -> [closeTopology] -> 
> suspended -> [closeStateManager] -> 
> dead
> And hence the assigned tasks contains the following non-overlapping sets : 
> created, restoring, running, suspended, (dead tasks do no need to be 
> maintained). Normally `created` should be empty since once a task is created 
> it should move on transit to either restoring or running immediately. So 
> whenever we are suspending tasks, we should go through these sets and act 
> accordingly:
> 1. `created` and `suspended`: just check these two sets are always empty.
> 2. `running`: transit to `suspended`.
> 3. `restoring`: transite to `suspended`. But the difference here is that we 
> do not need to close topology since it was not created yet at all; we just 
> need to remember the restored position, and keep the restorers on hold 
> instead of clearing all of them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


KAFKA-10000

2020-05-14 Thread Matthias J. Sax
Hi,

today, KAFKA-1 was created, marking quite a milestone for the
project. This milestone made me look into our backlog and there are many
old and outdated tickets. The oldest ticket I found was KAFKA-10 (closed
it already)...

Cleaning up the backlog is quite tedious, and I took the liberty to go
over Kafka Streams related tickets and close many of them.

However, to really cleanup the backlog, I would like to call for help.
No single person has enough context to know which tickets are still
valid and which are outdated. It would be great if some people (I guess
especially committer :D) would spent some time to go over the backlog
and close tickets.

Thanks a lot!


-Matthias




signature.asc
Description: OpenPGP digital signature


Re: KAFKA-10000

2020-05-14 Thread Boyang Chen
Thanks a lot for starting this initiative Matthias! I guess I will start
from closing my open tickets first :)

On Thu, May 14, 2020 at 9:02 PM Matthias J. Sax  wrote:

> Hi,
>
> today, KAFKA-1 was created, marking quite a milestone for the
> project. This milestone made me look into our backlog and there are many
> old and outdated tickets. The oldest ticket I found was KAFKA-10 (closed
> it already)...
>
> Cleaning up the backlog is quite tedious, and I took the liberty to go
> over Kafka Streams related tickets and close many of them.
>
> However, to really cleanup the backlog, I would like to call for help.
> No single person has enough context to know which tickets are still
> valid and which are outdated. It would be great if some people (I guess
> especially committer :D) would spent some time to go over the backlog
> and close tickets.
>
> Thanks a lot!
>
>
> -Matthias
>
>
>


Re: KAFKA-10000

2020-05-14 Thread Guozhang Wang
Hey Matthias, thanks for calling this out in the community.

Maybe one way to do this is to encourage everyone who read this thread to
browse over "tickets created by me" and see if some of those are already
fixed / not an issue any more :)

https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20reporter%20in%20(currentUser())%20ORDER%20BY%20priority%20DESC


Guozhang


On Thu, May 14, 2020 at 9:02 PM Matthias J. Sax  wrote:

> Hi,
>
> today, KAFKA-1 was created, marking quite a milestone for the
> project. This milestone made me look into our backlog and there are many
> old and outdated tickets. The oldest ticket I found was KAFKA-10 (closed
> it already)...
>
> Cleaning up the backlog is quite tedious, and I took the liberty to go
> over Kafka Streams related tickets and close many of them.
>
> However, to really cleanup the backlog, I would like to call for help.
> No single person has enough context to know which tickets are still
> valid and which are outdated. It would be great if some people (I guess
> especially committer :D) would spent some time to go over the backlog
> and close tickets.
>
> Thanks a lot!
>
>
> -Matthias
>
>
>

-- 
-- Guozhang


Build failed in Jenkins: kafka-trunk-jdk8 #4531

2020-05-14 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Handle task migrated inside corruption path (#8667)


--
[...truncated 6.14 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher PASSED

org.apache.kafka.streams.test.TestRecordTest > testFields STARTED

org.apache.kafka.streams.test.TestRecordTest > testFields PASSED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode STARTED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode PASSED

> Task :streams:upgrade-system-tests-0102:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0102:testClasses
> Task :streams:upgrade-system-tests-0102:che

Build failed in Jenkins: kafka-trunk-jdk14 #84

2020-05-14 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: Handle task migrated inside corruption path (#8667)


--
[...truncated 3.09 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInt

Re: [DISCUSS] KIP-613: Add end-to-end latency metrics to Streams

2020-05-14 Thread Matthias J. Sax
Thanks for the KIP Sophie.

I think it's not useful to record the avg/mean; it sensitive to
outliers. We should rather track the median (50th percentile).

Not sure if tracking min is useful, but I am also ok to track it.

However, I find it odd to track 75th percentile. Standard measures would
the 90th or 95th -- I guess we don't need both, so maybe picking 90th
might be more useful?

About the name: "staleness" wound really odd, and if fact the metric
does capture "latency" so we should call it "latency". I understand the
issue that we already have a latency metric. So maybe we could call it
`record-e2e-latency-*` ?

While I agree that we should include out-or-order data (the KIP should
talk about `out-of-order` data, not `late` data; data is only `late` if
it's out-of-order and if it's dropped), I don't really understand why
the new metric would help to configure grace period or retention time?
As you mention in the KIP, both are define as max difference of
`event-time - stream-time` and thus the new metric that takes
system-/wallclock-time into account does not seem to help at all.


Btw: there is a great talk about "How NOT to Measure Latency" by Gil
Tene: https://www.youtube.com/watch?v=lJ8ydIuPFeU


-Matthias


On 5/14/20 7:17 PM, John Roesler wrote:
> Hi Sophie,
> 
> It seems like there would still be plenty of use cases for recording
> this metric at all processors and not just stateful ones, but I'm happy
> to suspend my arguments for now. Since you're proposing to keep
> them at the processor-node level, it will be seamless later to add
> in the stateless processors if we want. As a wise man once said,
> "Adding is always easier than removing."
> 
> Regarding the time measurement, it's an implementation detail
> we don't need to consider in the KIP. Nevertheless, I'd greatly
> prefer to measure the system time again when recording the
> metric. I don't think we've seen any evidence that proves this
> would harm performance, and the amount of inaccuracy using
> the cached system time could incur is actually substantial. But,
> if you want to just "not mention this" in the KIP, we can defer to
> the actual PR discussion, at which time we're in a better position
> to use benchmarks, etc., to make the call.
> 
> Along the lines of the measurement accuracy discussion, one
> minor thought I had is that maybe we should consider measuring
> the task staleness metric at the sink, rather than the source, so that
> it includes the processing latency of the task itself, not just the latency
> of everything up to, but not including, the task (which seems confusing
> for users). I guess this could also be an implementation detail, though.
> 
> Thanks for the update,
> -John
> 
> On Thu, May 14, 2020, at 13:31, Sophie Blee-Goldman wrote:
>> Hey all,
>>
>> After discussing with Bruno I'd like to propose a small amendment,
>> which is to record the processor-node-level metrics only for *stateful*
>> *operators*. They would still be considered a "processor-node-level"
>> metric and not a "state-store-level" metric as the staleness is still
>> a property of the node rather than of the state itself. However, it seems
>> that this information is primarily useful for stateful operators that might
>> be exposing state via IQ or otherwise dependent on the record time
>> unlike a stateless operator.
>>
>> It's worth calling out that recent performance improvements to the metrics
>> framework mean that we no longer fetch the system time at the operator
>> level, but only once per task. In other words the system time is not updated
>> between each process as a record flows through the subtopology, so
>> debugging the processor-level latency via the stateleness will not be
>> possible.Note that this doesn't mean the operator-level metrics are not
>> *useful* relative to the task-level metric. Upstream caching and/or
>> suppression
>> can still cause a record's staleness at some downstream stateful operator
>> to deviate from the task-level staleness (recorded at the source node).
>>
>> Please let me know if you have any concerns about this change. The
>> KIP has been updated with the new proposal
>>
>> On Thu, May 14, 2020 at 3:04 AM Bruno Cadonna  wrote:
>>
>>> Hi Sophie,
>>>
>>> Thank you for the KIP.
>>>
>>> The KIP looks good to me.
>>>
>>> 50th percentile:
>>> I think we do not need it now. If we need it, we can add it. Here the
>>> old truism applies: Adding is always easier than removing.
>>>
>>> processor-node-level metrics:
>>> I think it is good to have the staleness metrics also on
>>> processor-node-level. If we do not want to record them on all
>>> processor nodes, you could restrict the recording to stateful
>>> processor-nodes, since those are the ones that would benefit most from
>>> the staleness metrics.
>>>
>>> Best,
>>> Bruno
>>>
>>> On Thu, May 14, 2020 at 4:15 AM Sophie Blee-Goldman 
>>> wrote:

 Yeah, the specific reason was just to align with the current metrics.

 Is it better to co

[jira] [Resolved] (KAFKA-8311) Better consumer timeout exception handling

2020-05-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8311.

Resolution: Duplicate

Closing this ticket – KIP-572 / KAFKA-9274 does subsume it.

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: WooYoung
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] KIP-609: Use Pre-registration and Blocking Calls for Better Transaction Efficiency

2020-05-14 Thread Boyang Chen
Hey all,

I would like to start the discussion for KIP-609:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-609%3A+Use+Pre-registration+and+Blocking+Calls+for+Better+Transaction+Efficiency

This KIP aims to improve the current EOS semantic which makes the
processing more efficient and consolidated.

Thanks!


Jenkins build is back to normal : kafka-trunk-jdk11 #1457

2020-05-14 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-572: Improve timeouts and retires in Kafka Streams

2020-05-14 Thread Matthias J. Sax
Thanks for the feedback Bruno. I updated the KIP (I thought the info was
there, but it seems it was too implicit).


1) Each time a client TimeoutException happens, we would log a WARN
message. If `task.timeout.ms` expires, we would rethrow the last client
`TimeoutException` to stop processing.


2) Yes, this first client `TimeoutException` will start the timer, and a
successful retry would reset/disable it.


-Matthias

On 5/14/20 9:19 AM, John Roesler wrote:
> Thanks for the update, Matthias!
> 
> Other than Bruno’s good points, this proposal looks good to me. 
> 
> Thanks,
> John
> 
> On Thu, May 14, 2020, at 07:17, Bruno Cadonna wrote:
>> Hi Matthias,
>>
>> Thank you for the KIP. I like your KIP.
>>
>> Here my feedback:
>>
>> 1. The KIP is not clear about what should happen when task.timeout.ms
>> expires. To facilitate the mapping from the error users might
>> encounter due to timeouts to this KIP, it would be good to state the
>> error that will be thrown when task.timeout.ms expires.
>>
>> 2. The KIP does also not clearly state how task.timeout.ms is
>> measured. Does the time start with the first timeout exception and
>> then run until either the timeout expires or the task receives a
>> successful reply? Or is it started each time the task is processed by
>> the stream thread and stopped when its turn is over and when the sum
>> of the single times without a successful reply reaches the timeout an
>> error is thrown?
>>
>> Best,
>> Bruno
>>
>> On Tue, May 12, 2020 at 10:14 PM Matthias J. Sax  wrote:
>>>
>>> John, Guozhang,
>>>
>>> thanks a lot for your feedback. I updated the KIP on a slightly
>>> different angle: instead of using retries, we should switch to a timeout
>>> based approach. I also extended the KIP to deprecate producer/admin
>>> `retries` config.
>>>
>>> I like the proposal to skip a task if a client TimeoutException occurs
>>> and just retry it later; update the KIP accordingly. However, I would
>>> not retry forever by default. In general, all points you raised are
>>> valid and the question is just what _default_ do we want to have. Given
>>> the issue that tasks might get "out-of-sync" regarding their event-time
>>> progress and that inexperience users might not do proper monitoring, I
>>> prefer to have a "reasonable" default timeout if a task does not make
>>> progress at all and fail for this case.
>>>
>>> I would also argue (following Guozhang) that we don't necessarily need
>>> new metrics. Monitoring the number of alive threads (recently added),
>>> consumer lag, processing rate etc should give an operator enough insight
>>> into the application. I don't see the need atm to add some specify "task
>>> timeout" metrics.
>>>
>>> For the issue of cascading failures, I would want to exclude it from
>>> this KIP to keep it focused.
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 2/27/20 1:31 PM, Guozhang Wang wrote:
 Hello John,

 I'll make note that you owe me a beer now :)

 I think I'm leaning towards your approach as well based on my observations
 on previously reported timeout exceptions in the past. I once left some
 thoughts on Matthias' PR here
 https://github.com/apache/kafka/pull/8122#pullrequestreview-360749510
>>> and I
 think I can better summarize my thoughts in this thread:

 1) First of all, we need to think from user's perspective, what they'd
 really want to be notified:

 a. "If my application cannot make progress temporarily due to various
 transient issues (John had listed several examples already), just handle
 that internally and I do not wanna be notified and worry about how to tune
 my timeout configs at all".
 b. "If my application cannot make progress for a long time, which is
>>> likely
 due to a bad config, a human error, network issues, etc such that I should
 be involved in the loop of trouble shooting, let me know sooner than
>>> later".

 and what they'd not preferred but may happen today:

 c. "one transient error cause a thread to die, but then after tasks
 migrated everything goes to normal; so the application silently lost a
 thread without letting me know"; in fact, in such cases even a cascading
 exception that eventually kills all thread may be better since at
>>> least the
 users would be notified.

 Based on that, instead of retrying immediately at the granularity each
 blocking call, it should be sufficient to only consider the handling logic
 at the thread level. That is, within an iteration of the thread, it would
 try to:

 * initialized some created tasks;
 * restored some restoring tasks;
 * processed some running tasks who have buffered records that are
 processable;
 * committed some tasks.

 In each of these steps, we may need to make some blocking calls in the
 underlying embedded clients, and if either of them timed out, we would not
 be able to make progress partially