Kafka Connect Startup Hook

2023-03-20 Thread Jan Baudisch (extern)
Hello,

can someone please give me a hint how to execute two lines of code upon Kafka 
Connect Startup, like:

final JaegerTracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.register(tracer);

I implemented using a custom (Fake-)Connector, but there is much overhead, 
because you also need a Task, Config etc.

Is there some simpler way, some kind of hook?

Thanks in advance,
Jan



AW: Kafka Connect Startup Hook

2023-03-20 Thread Jan Baudisch (extern)
Hello Jakub,

thank you for you quick answer. We solved it by implementing a ConfigProvider, 
like described here:

https://docs.confluent.io/kafka-connectors/self-managed/userguide.html#configprovider-interface

public class TracingConfigProvider implements ConfigProvider {

@Override
public void configure(Map map) {
final JaegerTracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.registerIfAbsent(tracer);
}


@Override
public ConfigData get(String s) {
return null;
}

@Override
public ConfigData get(String s, Set set) {
return null;
}

@Override
public void close(){}

}


And setting these Environment Variables in Kafka Connect

- CONNECT_CONFIG_PROVIDERS=tracing
- CONNECT_CONFIG_PROVIDERS_TRACING_CLASS=org.example.TracingConfigProvider

Best regards,
Jan



Von: Jakub Scholz 
Datum: Montag, 20. März 2023 um 10:23
An: users@kafka.apache.org 
Betreff: Re: Kafka Connect Startup Hook
In Strimzi, we use a Java agent to register the tracer (
https://github.com/strimzi/strimzi-kafka-operator/tree/main/tracing-agent/
if you wanna check the source code).

Jakub

On Mon, Mar 20, 2023 at 9:18 AM Jan Baudisch (extern) <
jan.baudisch.ext...@bdess.com> wrote:

> Hello,
>
> can someone please give me a hint how to execute two lines of code upon
> Kafka Connect Startup, like:
>
> final JaegerTracer tracer = Configuration.fromEnv().getTracer();
> GlobalTracer.register(tracer);
>
> I implemented using a custom (Fake-)Connector, but there is much overhead,
> because you also need a Task, Config etc.
>
> Is there some simpler way, some kind of hook?
>
> Thanks in advance,
> Jan
>
>
>


Kafka Connect Startup Hook

2023-03-20 Thread Jan Baudisch (extern)
Hello,

can someone please give me a hint how to execute two lines of code upon Kafka 
Connect Startup, like:

final JaegerTracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.register(tracer);

I implemented using a custom (Fake-)Connector, but there is much overhead, 
because you also need a Task, Config etc.

Is there some simpler way, some kind of hook?

Thanks in advance,
Jan




Re: Request for a Jira account

2023-02-03 Thread Jan Hendriks
Hi,
thank you,
however I did not receive an e-mail with account details or ways to
setup the password. I tried the steps at
https://infra.apache.org/jira-guidelines.html#account but am blocked at the
step "If you need to log in, make sure you are using your ldap credentials
that you use for logging into to other Apache services." as I don't have
such an account (btw. not sure which account is meant here) and I am not
sure this is intended for non-ASF users.

Kind regards,
Jan

On Wed, Feb 1, 2023 at 10:41 PM Bill Bejeck 
wrote:

> Jan,
>
> Your account is created, and you have contributor permissions.
>
> Thanks,
> Bill
>
> On Wed, Feb 1, 2023 at 2:34 PM Jan Hendriks  wrote:
>
> > Hi Bill,
> > thank you! My preferred
> > * username: dahoc (or alternatively: JanHendriks)
> > * display name: Jan
> > * email address: dahoc3...@gmail.com
> > Best,
> > Jan
> >
> > On Wed, Feb 1, 2023 at 4:07 PM Bill Bejeck 
> > wrote:
> >
> > > Hi Jan,
> > >
> > > If you can provide your preferred username, display name, and email
> > > address, we can set up your account.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Feb 1, 2023 at 4:44 AM Jan Hendriks 
> wrote:
> > >
> > > > Hi,
> > > > I would hereby like to request a Jira account in order to be able to
> > > file a
> > > > ticket related to dependency convergence errors, see i.e.
> > > > https://github.com/spring-projects/spring-kafka/issues/2561 that
> > > directed
> > > > me to this project.
> > > >
> > > > Kind regards,
> > > > Jan
> > > >
> > >
> >
>


Re: Request for a Jira account

2023-02-01 Thread Jan Hendriks
Hi Bill,
thank you! My preferred
* username: dahoc (or alternatively: JanHendriks)
* display name: Jan
* email address: dahoc3...@gmail.com
Best,
Jan

On Wed, Feb 1, 2023 at 4:07 PM Bill Bejeck 
wrote:

> Hi Jan,
>
> If you can provide your preferred username, display name, and email
> address, we can set up your account.
>
> Thanks,
> Bill
>
> On Wed, Feb 1, 2023 at 4:44 AM Jan Hendriks  wrote:
>
> > Hi,
> > I would hereby like to request a Jira account in order to be able to
> file a
> > ticket related to dependency convergence errors, see i.e.
> > https://github.com/spring-projects/spring-kafka/issues/2561 that
> directed
> > me to this project.
> >
> > Kind regards,
> > Jan
> >
>


Request for a Jira account

2023-02-01 Thread Jan Hendriks
Hi,
I would hereby like to request a Jira account in order to be able to file a
ticket related to dependency convergence errors, see i.e.
https://github.com/spring-projects/spring-kafka/issues/2561 that directed
me to this project.

Kind regards,
Jan


Requesting a Jira account

2023-02-01 Thread Jan Hendriks
Hi,
I would hereby like to request a Jira account in order to be able to file a
ticket related to dependency convergence errors, see i.e.
https://github.com/spring-projects/spring-kafka/issues/2561 that directed
me to this project.

Kind regards,
Jan


Re: Spring-Kafka and maven enforcer plugin

2023-01-30 Thread Jan Hendriks
Nevermind,
https://github.com/spring-projects/spring-kafka is the place to raise this.
BR,
Jan

On Mon, Jan 30, 2023 at 1:33 PM Jan Hendriks  wrote:

> Hi,
> we have issues with dependency convergence with Spring-Kafka-test and the
> maven enforcer plugin.
> A reproducer can be found at
> https://gist.github.com/DaHoC/67daf9a7cb90e8626dbe6a144e93fa16 - simply
> run "mvn clean install" in the folder of the provided pom.xml
> The divergences we encounter (probably not complete):
>
> [INFO] --- maven-enforcer-plugin:3.0.0-M3:enforce (enforce) @
> SpringKafkaDependencyIssue ---
> [WARNING]
> Dependency convergence error for org.scala-lang:scala-library:2.13.6 paths
> to dependency are:
> +-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-org.scala-lang:scala-library:2.13.6
> and
> +-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-com.fasterxml.jackson.module:jackson-module-scala_2.13:2.13.4
> +-org.scala-lang:scala-library:2.13.6
> and
> +-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-org.scala-lang.modules:scala-collection-compat_2.13:2.4.4
> +-org.scala-lang:scala-library:2.13.5
> and
> +-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-org.scala-lang.modules:scala-java8-compat_2.13:1.0.0
> +-org.scala-lang:scala-library:2.13.5
> and
> +-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-org.scala-lang:scala-reflect:2.13.6
> +-org.scala-lang:scala-library:2.13.6
> and
> +-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-com.typesafe.scala-logging:scala-logging_2.13:3.9.3
> +-org.scala-lang:scala-library:2.13.4
> and
> +-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-org.scala-lang:scala-library:2.13.6
>
> [WARNING]
> Dependency convergence error for org.scala-lang:scala-reflect:2.13.6 paths
> to dependency are:
> +-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-org.scala-lang:scala-reflect:2.13.6
> and
> +-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-com.typesafe.scala-logging:scala-logging_2.13:3.9.3
> +-org.scala-lang:scala-reflect:2.13.4
> and
> +-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-org.scala-lang:scala-reflect:2.13.6
>
>
> and (in another project):
>
> Dependency convergence error for org.apache.kafka:kafka-metadata:3.1.2
> paths to dependency are:
> +-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka-metadata:3.1.2
> and
> +-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka-metadata:3.1.2
>   +-org.apache.kafka:kafka-metadata:2.8.2
> and
> +-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-org.apache.kafka:kafka-metadata:3.1.2
> and
> +-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
>   +-org.springframework.kafka:spring-kafka-test:2.8.11
> +-org.apache.kafka:kafka_2.13:3.1.2
>   +-org.apache.kafka:kafka-metadata:3.1.2
>
> As I don't have a Jira account for this project, I cannot open up an
> issue, and I could not find such an issue in the project Jira or user
> mailing list.
> Kind regards,
> Jan
>


Fwd: Spring-Kafka and maven enforcer plugin

2023-01-30 Thread Jan Hendriks
Hi,
we have issues with dependency convergence with Spring-Kafka-test and the
maven enforcer plugin.
A reproducer can be found at
https://gist.github.com/DaHoC/67daf9a7cb90e8626dbe6a144e93fa16 - simply run
"mvn clean install" in the folder of the provided pom.xml
The divergences we encounter (probably not complete):

[INFO] --- maven-enforcer-plugin:3.0.0-M3:enforce (enforce) @
SpringKafkaDependencyIssue ---
[WARNING]
Dependency convergence error for org.scala-lang:scala-library:2.13.6 paths
to dependency are:
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang:scala-library:2.13.6
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-com.fasterxml.jackson.module:jackson-module-scala_2.13:2.13.4
+-org.scala-lang:scala-library:2.13.6
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang.modules:scala-collection-compat_2.13:2.4.4
+-org.scala-lang:scala-library:2.13.5
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang.modules:scala-java8-compat_2.13:1.0.0
+-org.scala-lang:scala-library:2.13.5
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang:scala-reflect:2.13.6
+-org.scala-lang:scala-library:2.13.6
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-com.typesafe.scala-logging:scala-logging_2.13:3.9.3
+-org.scala-lang:scala-library:2.13.4
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang:scala-library:2.13.6

[WARNING]
Dependency convergence error for org.scala-lang:scala-reflect:2.13.6 paths
to dependency are:
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang:scala-reflect:2.13.6
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-com.typesafe.scala-logging:scala-logging_2.13:3.9.3
+-org.scala-lang:scala-reflect:2.13.4
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang:scala-reflect:2.13.6


and (in another project):

Dependency convergence error for org.apache.kafka:kafka-metadata:3.1.2
paths to dependency are:
+-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka-metadata:3.1.2
and
+-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka-metadata:3.1.2
  +-org.apache.kafka:kafka-metadata:2.8.2
and
+-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.apache.kafka:kafka-metadata:3.1.2
and
+-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.apache.kafka:kafka-metadata:3.1.2

As I don't have a Jira account for this project, I cannot open up an issue,
and I could not find such an issue in the project Jira or user mailing list.
Kind regards,
Jan


Spring-Kafka and maven enforcer plugin

2023-01-30 Thread Jan Hendriks
Hi,
we have issues with dependency convergence with Spring-Kafka-test and the
maven enforcer plugin.
A reproducer can be found at
https://gist.github.com/DaHoC/67daf9a7cb90e8626dbe6a144e93fa16 - simply run
"mvn clean install" in the folder of the provided pom.xml
The divergences we encounter (probably not complete):

[INFO] --- maven-enforcer-plugin:3.0.0-M3:enforce (enforce) @
SpringKafkaDependencyIssue ---
[WARNING]
Dependency convergence error for org.scala-lang:scala-library:2.13.6 paths
to dependency are:
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang:scala-library:2.13.6
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-com.fasterxml.jackson.module:jackson-module-scala_2.13:2.13.4
+-org.scala-lang:scala-library:2.13.6
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang.modules:scala-collection-compat_2.13:2.4.4
+-org.scala-lang:scala-library:2.13.5
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang.modules:scala-java8-compat_2.13:1.0.0
+-org.scala-lang:scala-library:2.13.5
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang:scala-reflect:2.13.6
+-org.scala-lang:scala-library:2.13.6
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-com.typesafe.scala-logging:scala-logging_2.13:3.9.3
+-org.scala-lang:scala-library:2.13.4
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang:scala-library:2.13.6

[WARNING]
Dependency convergence error for org.scala-lang:scala-reflect:2.13.6 paths
to dependency are:
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang:scala-reflect:2.13.6
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-com.typesafe.scala-logging:scala-logging_2.13:3.9.3
+-org.scala-lang:scala-reflect:2.13.4
and
+-org.example:SpringKafkaDependencyIssue:1.0-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.scala-lang:scala-reflect:2.13.6


and (in another project):

Dependency convergence error for org.apache.kafka:kafka-metadata:3.1.2
paths to dependency are:
+-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka-metadata:3.1.2
and
+-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka-metadata:3.1.2
  +-org.apache.kafka:kafka-metadata:2.8.2
and
+-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.apache.kafka:kafka-metadata:3.1.2
and
+-org.example:SpringKafkaDependencyIssue:eventing-test:4.39.3-SNAPSHOT
  +-org.springframework.kafka:spring-kafka-test:2.8.11
+-org.apache.kafka:kafka_2.13:3.1.2
  +-org.apache.kafka:kafka-metadata:3.1.2

As I don't have a Jira account for this project, I cannot open up an issue,
and I could not find such an issue in the project Jira or user mailing list.
Kind regards,
Jan


Re: Support for Uni-directional data-diode?

2020-12-24 Thread jan
It might be best to do a web search for companies that know this stuff
 and speak to them.

re. kafka over UDP I dunno but perhaps instead do normal kafka talking
to a proxy machine via TCP and have that proxy forward traffic via
UDP.
If that works, would simplify the problem I guess.

cheers

jan

On 23/12/2020, Danny - Terafence  wrote:
> Thank you Jan,
>
> The aim is to secure the sending side infrastructure and assets. Deny any
> known and unkown attacks from the "outside" while maintaining real-time data
> flowing outbound.
> Data integrity may be maintained in various ways if the forwarded protocol
> has such options.
>
> I wonder if KAFKA can run over UDP... for starters..
>
> Anyone knows?
>
> On Dec 22, 2020 23:25, jan  wrote:
> Dunno if it helps (if in doubt, probably not) but a search for the
> term gets some useful articles (inc.
> <https://en.wikipedia.org/wiki/Unidirectional_network>) and a company
> <https://owlcyberdefense.com/blog/what-is-data-diode-technology-how-does-it-work/>
> who may be worth contacting (I'm not affiliated in any way).
>
> The first question I'd ask myself is, would a burn-to-dvd solution
> work? Failing that, basic stuff like email?
> In any case, what if the data's corrupted, how can the server's detect
> and re-request? What are you protecting against exactly? Stuff like
> that.
>
> jan
>
> On 22/12/2020, Danny - Terafence  wrote:
>> Hello,
>>
>> Merry Christmas,
>>
>> My name is Danny Michaeli, I am Terafence’s Technical Services Manager.
>>
>> One of our customers is using KAFKA to gather ICS SEIM data to collect
>> and
>> forward to AI servers.
>>
>> They have requested us to propose a uni-directional solution to avoid
>> being
>> exposed from the AI server site.
>>
>> Can you, please advise as to if and how can this be done?
>>
>> B. Regards,
>>
>> Danny Michaeli
>> Technical Services Manager
>> [Logo]
>> Tel.: +972-73-3791191
>> Cell: +972-52-882-3108
>>
>>
>


Re: Support for Uni-directional data-diode?

2020-12-22 Thread jan
Dunno if it helps (if in doubt, probably not) but a search for the
term gets some useful articles (inc.
<https://en.wikipedia.org/wiki/Unidirectional_network>) and a company
<https://owlcyberdefense.com/blog/what-is-data-diode-technology-how-does-it-work/>
who may be worth contacting (I'm not affiliated in any way).

The first question I'd ask myself is, would a burn-to-dvd solution
work? Failing that, basic stuff like email?
In any case, what if the data's corrupted, how can the server's detect
and re-request? What are you protecting against exactly? Stuff like
that.

jan

On 22/12/2020, Danny - Terafence  wrote:
> Hello,
>
> Merry Christmas,
>
> My name is Danny Michaeli, I am Terafence’s Technical Services Manager.
>
> One of our customers is using KAFKA to gather ICS SEIM data to collect and
> forward to AI servers.
>
> They have requested us to propose a uni-directional solution to avoid being
> exposed from the AI server site.
>
> Can you, please advise as to if and how can this be done?
>
> B. Regards,
>
> Danny Michaeli
> Technical Services Manager
> [Logo]
> Tel.: +972-73-3791191
> Cell: +972-52-882-3108
>
>


Partitioning per team

2020-10-26 Thread Jan Bols
For a kafka-streams application, we keep data per team. Data from 2 teams
never meet but within a team, data is highly integrated. A team has team
members but also has several types of equipment.
A team has a lifespan of about 1-3 days after which the team is removed and
all data relating to that team should be evicted.

How would you partition the data?
- Using the team id as key for all streams seems not ideal b/c this means
all aggregations need to happen per team involving a ser/deser of the
entire team data. Suppose there's 10 team members and only 1 team member is
sending events that need to be aggregated. In this case, we need a
ser/deser of the entire aggregated team data. I'm afraid this would result
in quite a bit of overhead because.
- Using the user id or equipment id as key would result in much smaller
aggregations but does mean quite a bit of repartitioning when aggregating
and joining users of the same team.

I ended up using the second approach, but I wonder if that was really a
good idea b/c the entire streaming logic does become quite involved.

What is your experience with this type of data?

Best regards
Jan


Re: Advice for Kafka project in Africa...

2020-09-08 Thread jan
Everything's down to requirements. I'm unclear on yours. I tried to
look at your website to see if I could pick up any clues but upsail.co
(and just in case, upsail.com) don't exist, so I dunno.

Some questions, rhetorical really:

Is there any reason a standard SQL database would not do - what does
kafka offer that you think is useful? I presume you know SQL already,

What are you trying to do, more precisely? Do you have a clear idea of
the inputs and outputs? What is the amount of data you expect to
process? Is there some tight time limit?

What is your uptime requirement? What is your budget?

>From my experience, mentioning big data at this point is a major red
flag. Make sure you really can't do it on a laptop or a server (both
in money and business requirements) before you start scaling out
because it gets messy. A decent server can crunch a *lot* of data if
well configured.

Also consider your infrastructure such as power supplies and
networking protection as Africa may not be too stable in those
regards. Also perhaps physical protection.
If you skip your own hardware and go for 'the cloud' then you are
totally in the hands of the networking gods.

These are points to mull over. Doubt I can suggest anything further. Good luck.

jan

On 02/09/2020, cedric sende lubuele  wrote:
> Let me introduce myself, my name is Cedric and I am a network engineer
> passionate about new technologies and as part of my new activity, I am
> interested in Big Data. Currently, I live in Africa (Congo) and as everyone
> knows, Africa is very late in terms of IT infrastructure (the Cloud is
> difficult, we work a lot on premise).
>
> While doing some research, I came across Kai Waehner's article (Kafka
> replace
> database?<https://www.kai-waehner.de/blog/2020/03/12/can-apache-kafka-replace-database-acid-storage-%20transactions-sql-nosql-data-lake%20/>)
> and I would like to be able to get an idea about the possibilities of
> Kafka.
>
> Let me explain, I am working on a project for integrating several databases
> (MySQL, MongoDB, Neo4j, ...) and I have to develop with my team, an alert
> system which must detect anomalies on different criteria linked to a person
> in the various departments of the company.
> Would Kafka be a good solution in order to centralize all the data and
> create several analysis scripts to detect an anomaly and send back an alert
> message such as for example a suspect wanted by the police?
>
> Thank you in advance
>
>
>
> Sende Cedric / Network IT
> sende.ced...@hotmail.com<mailto:sende.ced...@hotmail.com> / 082/8446954
>
> UPSAIL GROUP
> http://upsail.co/<https://htmlsig.com/t/01BFBBXF>
>
> [http://upsail.co/wp-content/themes/upsail/images/logo.png]
>


Re: Get after put in stateStore returns null

2020-04-04 Thread Jan Bols
Ok, Matthias,
thanks for the clarification. This makes sense to me.
Glad I learned something new about kafka-streams. Even if it was the hard
way ;-)

Greetings
Jan

On Wed, Apr 1, 2020 at 11:52 PM Matthias J. Sax  wrote:

> That is expected behavior.
>
> And yes, there is a `Transformer` instance per partition with it's own
> store that holds one shard of the overall state. The reason is, that you
> could run one KafkaStreams instance per partition on different
> hosts/servers and thus, we need to have a `Transformer` and state-store
> per partition.
>
> It's also by design that `transform()` does not do auto-repartitioning
> because it's Processor API integration, and when using the Processor API
> it's the developers responsibility to reason about correct data
> partitioning.
>
>
> -Matthias
>
> On 4/1/20 2:05 PM, Jan Bols wrote:
> > Ok, Matthias,
> >
> > thanks for the hint:
> > *Even if any upstream operation was key-changing, no auto-repartition is
> > triggered. If repartitioning is required, a call to through()
> > <
> https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#through-java.lang.String-
> >
> > should be performed before flatTransformValues(). *
> >
> > Of course, I didn't call *through* before calling the transformer. As a
> > result some calls where being processed by another instance of the
> > transformer running on a different partition. Calling *store.get(key)* on
> > an instance would then not return any value even though another instance
> > did a *store.put(key, value)* before.  Is this expected behaviour? Is
> there
> > a transformer for each partition and does it get its own state store?
> >
> > Best regards
> >
> > Jan
> >
> > On Fri, Mar 27, 2020 at 12:59 AM Matthias J. Sax 
> wrote:
> >
> >> Your code looks correct to me. If you write into the store, you should
> >> also be able to read it back from the store.
> >>
> >> Can you reproduce the issue using `TopologyTestDriver`? How many
> >> partitions does your input topic have? Is your stream partitioned by
> >> key? Note that `transfrom()` does not do auto-repartitioning in contrast
> >> to `groupByKey()`.
> >>
> >>
> >> -Matthias
> >>
> >> On 3/25/20 3:49 AM, Jan Bols wrote:
> >>> Hi all,
> >>> I'm trying to aggregate a stream of messages and return a stream of
> >>> aggregated results using kafka streams.
> >>> At some point, depending on the incoming message, the old aggregate
> needs
> >>> to be closed and a new aggregate needs to be created, just like a
> session
> >>> that is closed due to some close event and at the same time a new
> session
> >>> is started.
> >>>
> >>> For this I'm using transformValues where I store the result of an
> >>> aggregation similar to how a groupByKey().aggregate() is done. When the
> >> old
> >>> session needs to be closed, it's sent first after the new value.
> >>>
> >>> The state store returns null for a given key at first retrieval and the
> >> new
> >>> aggregation result is stored under the same key.
> >>> However, at the second pass, the value for the same key is still null
> >> even
> >>> though it has just been stored before.
> >>>
> >>> How can this be possible?
> >>>
> >>>
> >>>
> >>> I'm using transformValues in the following way:
> >>>
> >>> val storeName = "aggregateOverflow_binReportAgg"
> >>> val store = Stores.keyValueStoreBuilder >>> V>(Stores.persistentKeyValueStore(storeName), serde.serde(),
> >> serde.serde())
> >>> streamsBuilder.addStateStore(store)
> >>>
> >>> ...
> >>>
> >>> stream
> >>>.flatTransformValues(ValueTransformerWithKeySupplier {
> >>> AggregateOverflow(storeName, transformation) }, storeName)
> >>>
> >>>
> >>> where AggregateOverflow gets the previous value from the state store,
> >>> transforms the result into a AggregateOverflowResult.
> >>> AggregateOverflowResult is a data class containing the current value
> and
> >> an
> >>> optional overflow value like this:
> >>>
> >>> data class AggregateOverflowResult(val current: V, val overflow: V?)
> >>>
> >>> When the overflow value is no

Re: Get after put in stateStore returns null

2020-04-01 Thread Jan Bols
Ok, Matthias,

thanks for the hint:
*Even if any upstream operation was key-changing, no auto-repartition is
triggered. If repartitioning is required, a call to through()
<https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#through-java.lang.String->
should be performed before flatTransformValues(). *

Of course, I didn't call *through* before calling the transformer. As a
result some calls where being processed by another instance of the
transformer running on a different partition. Calling *store.get(key)* on
an instance would then not return any value even though another instance
did a *store.put(key, value)* before.  Is this expected behaviour? Is there
a transformer for each partition and does it get its own state store?

Best regards

Jan

On Fri, Mar 27, 2020 at 12:59 AM Matthias J. Sax  wrote:

> Your code looks correct to me. If you write into the store, you should
> also be able to read it back from the store.
>
> Can you reproduce the issue using `TopologyTestDriver`? How many
> partitions does your input topic have? Is your stream partitioned by
> key? Note that `transfrom()` does not do auto-repartitioning in contrast
> to `groupByKey()`.
>
>
> -Matthias
>
> On 3/25/20 3:49 AM, Jan Bols wrote:
> > Hi all,
> > I'm trying to aggregate a stream of messages and return a stream of
> > aggregated results using kafka streams.
> > At some point, depending on the incoming message, the old aggregate needs
> > to be closed and a new aggregate needs to be created, just like a session
> > that is closed due to some close event and at the same time a new session
> > is started.
> >
> > For this I'm using transformValues where I store the result of an
> > aggregation similar to how a groupByKey().aggregate() is done. When the
> old
> > session needs to be closed, it's sent first after the new value.
> >
> > The state store returns null for a given key at first retrieval and the
> new
> > aggregation result is stored under the same key.
> > However, at the second pass, the value for the same key is still null
> even
> > though it has just been stored before.
> >
> > How can this be possible?
> >
> >
> >
> > I'm using transformValues in the following way:
> >
> > val storeName = "aggregateOverflow_binReportAgg"
> > val store = Stores.keyValueStoreBuilder > V>(Stores.persistentKeyValueStore(storeName), serde.serde(),
> serde.serde())
> > streamsBuilder.addStateStore(store)
> >
> > ...
> >
> > stream
> >.flatTransformValues(ValueTransformerWithKeySupplier {
> > AggregateOverflow(storeName, transformation) }, storeName)
> >
> >
> > where AggregateOverflow gets the previous value from the state store,
> > transforms the result into a AggregateOverflowResult.
> > AggregateOverflowResult is a data class containing the current value and
> an
> > optional overflow value like this:
> >
> > data class AggregateOverflowResult(val current: V, val overflow: V?)
> >
> > When the overflow value is not null, it's sent downstream first after the
> > current value. In each case, the current result is stored in the
> statestore
> > for later retrieval like the following:
> >
> > class AggregateOverflow(
> >  private val storeName: String,
> >  private val transformation: (K, V, VR?) ->
> AggregateOverflowResult?) :
> > ValueTransformerWithKey> {
> >  private val logger = KotlinLogging.logger{}
> >  private lateinit var state: KeyValueStore
> >
> >  init {
> >logger.debug { "$storeName: created" }
> >  }
> >
> >  override fun init(context: ProcessorContext) {
> >logger.debug { "$storeName: init called" }
> >this.state = context.getStateStore(storeName) as KeyValueStore;
> >  }
> >
> >  override fun transform(key: K, value: V): Iterable {
> >val acc = state.get(key)
> >if (acc == null) logger.debug { "$storeName: Found empty value for
> $key"
> > }
> >val result = transformation(key, value, acc)
> >state.put(key, result?.current)
> >logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate
> > old: $acc\n aggregate new: $result" }
> >return listOfNotNull(result?.overflow, result?.current) //prevAcc will
> > be forwarded first if not null
> >  }
> >
> >  override fun close() {
> >logger.debug { "$storeName: close called" }
> >  }
> > }
> >
> > In the log file you can see that the first invocation is returning an
> empty
> > value for the given key, you can also see that the new value is being
> > serialized in the store.
> > At the second invocation a few seconds later, the value for the same key
> is
> > still null.
> >
> > Any idea's why this is?
> > Best regards
> > Jan
> >
>
>


Get after put in stateStore returns null

2020-03-25 Thread Jan Bols
Hi all,
I'm trying to aggregate a stream of messages and return a stream of
aggregated results using kafka streams.
At some point, depending on the incoming message, the old aggregate needs
to be closed and a new aggregate needs to be created, just like a session
that is closed due to some close event and at the same time a new session
is started.

For this I'm using transformValues where I store the result of an
aggregation similar to how a groupByKey().aggregate() is done. When the old
session needs to be closed, it's sent first after the new value.

The state store returns null for a given key at first retrieval and the new
aggregation result is stored under the same key.
However, at the second pass, the value for the same key is still null even
though it has just been stored before.

How can this be possible?



I'm using transformValues in the following way:

val storeName = "aggregateOverflow_binReportAgg"
val store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), serde.serde(), serde.serde())
streamsBuilder.addStateStore(store)

...

stream
   .flatTransformValues(ValueTransformerWithKeySupplier {
AggregateOverflow(storeName, transformation) }, storeName)


where AggregateOverflow gets the previous value from the state store,
transforms the result into a AggregateOverflowResult.
AggregateOverflowResult is a data class containing the current value and an
optional overflow value like this:

data class AggregateOverflowResult(val current: V, val overflow: V?)

When the overflow value is not null, it's sent downstream first after the
current value. In each case, the current result is stored in the statestore
for later retrieval like the following:

class AggregateOverflow(
 private val storeName: String,
 private val transformation: (K, V, VR?) -> AggregateOverflowResult?) :
ValueTransformerWithKey> {
 private val logger = KotlinLogging.logger{}
 private lateinit var state: KeyValueStore

 init {
   logger.debug { "$storeName: created" }
 }

 override fun init(context: ProcessorContext) {
   logger.debug { "$storeName: init called" }
   this.state = context.getStateStore(storeName) as KeyValueStore;
 }

 override fun transform(key: K, value: V): Iterable {
   val acc = state.get(key)
   if (acc == null) logger.debug { "$storeName: Found empty value for $key"
}
   val result = transformation(key, value, acc)
   state.put(key, result?.current)
   logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate
old: $acc\n aggregate new: $result" }
   return listOfNotNull(result?.overflow, result?.current) //prevAcc will
be forwarded first if not null
 }

 override fun close() {
   logger.debug { "$storeName: close called" }
 }
}

In the log file you can see that the first invocation is returning an empty
value for the given key, you can also see that the new value is being
serialized in the store.
At the second invocation a few seconds later, the value for the same key is
still null.

Any idea's why this is?
Best regards
Jan


Get after put in stateStore returns null

2020-03-24 Thread Jan Bols
Hi all,
I'm trying to aggregate a stream of messages and return a stream of
aggregated results using kafka streams.
At some point, depending on the incoming message, the old aggregate needs
to be closed and a new aggregate needs to be created, just like a session
that is closed due to some close event and at the same time a new session
is started.

For this I'm using transformValues where I store the result of an
aggregation similar to how a groupByKey().aggregate() is done. When the old
session needs to be closed, it's sent first after the new value.

The state store returns null for a given key at first retrieval and the new
aggregation result is stored under the same key.
However, at the second pass, the value for the same key is still null even
though it has just been stored before.

How can this be possible?



I'm using transformValues in the following way:

val storeName = "aggregateOverflow_binReportAgg"
val store = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), serde.serde(), serde.serde())
streamsBuilder.addStateStore(store)

...

stream
   .flatTransformValues(ValueTransformerWithKeySupplier {
AggregateOverflow(storeName, transformation) }, storeName)


where AggregateOverflow gets the previous value from the state store,
transforms the result into a AggregateOverflowResult.
AggregateOverflowResult is a data class containing the current value and an
optional overflow value like this:

data class AggregateOverflowResult(val current: V, val overflow: V?)

When the overflow value is not null, it's sent downstream first after the
current value. In each case, the current result is stored in the statestore
for later retrieval like the following:

class AggregateOverflow(
 private val storeName: String,
 private val transformation: (K, V, VR?) -> AggregateOverflowResult?) :
ValueTransformerWithKey> {
 private val logger = KotlinLogging.logger{}
 private lateinit var state: KeyValueStore

 init {
   logger.debug { "$storeName: created" }
 }

 override fun init(context: ProcessorContext) {
   logger.debug { "$storeName: init called" }
   this.state = context.getStateStore(storeName) as KeyValueStore;
 }

 override fun transform(key: K, value: V): Iterable {
   val acc = state.get(key)
   if (acc == null) logger.debug { "$storeName: Found empty value for $key"
}
   val result = transformation(key, value, acc)
   state.put(key, result?.current)
   logger.trace { "$storeName: \n Key: $key\n Value: $value\n aggregate
old: $acc\n aggregate new: $result" }
   return listOfNotNull(result?.overflow, result?.current) //prevAcc will
be forwarded first if not null
 }

 override fun close() {
   logger.debug { "$storeName: close called" }
 }
}

In the log file you can see that the first invocation is returning an empty
value for the given key, you can also see that the new value is being
serialized in the store.
At the second invocation a few seconds later, the value for the same key is
still null.

Any idea's why this is?
Best regards
Jan


Re: complicated logic for tombstone records

2020-01-08 Thread Jan Bols
Hi Boyang, Hi Alex,

thank you for your reply. I can't use windowing so currently I'm managing
removals by wrapping messages in a delete-aware wrapper whenever I have to
do aggregation but this has a big impact on all the logic.

For me the ideal situation would be to get a handle on the state stores
that are being used during aggregation and other processors of the streams
DSL and programmatically delete them from the store whenever needed. This
way I can keep the changes to my streaming logic minimal and still delete
parts of it whenever needed.

Is there any way to do that? I know I can get a read-only reference to the
state stores using queryable stores but that won't do.

Jan

On Thu, Jan 2, 2020 at 11:17 PM Alex Brekken  wrote:

> Hi Jan, unfortunately there is no easy or automatic way to do this.
> Publishing null values directly to the changelog topics will remove them
> from the topic, but it won't remove the corresponding row from the RocksDB
> state store.  (though deleting data programmatically from a state-store
> WILL also remove it from the changelog topic)  Given that you want to
> completely remove the data for a given set of keys, your best option might
> be to modify your topology to handle null messages so that they can get
> deleted from your aggregations. (and publish those from an outside app)
> Hopefully this isn't too self-serving, but I actually wrote a blog post
> about managing state-store data not long ago:
>
> https://objectpartners.com/2019/07/31/slimming-down-your-kafka-streams-data/
> .
> Hopefully that might give you some ideas.
>
> Alex
>
> On Thu, Jan 2, 2020 at 4:11 PM Boyang Chen 
> wrote:
>
> > Hey Jan,
> >
> > although I believe your case is much more complicated, but would time
> based
> > retention work for you at all? If yes, time window store is like the best
> > option.
> >
> > If no, streams has no out-of-box solution for invalidating the
> aggregation
> > record. It seems at least we could provide an API to inject
> > tombstone records for aggregation logic
> > so that they don't get ignored eventually. This sounds like a good future
> > work.
> >
> > Boyang
> >
> > On Thu, Jan 2, 2020 at 1:47 PM Jan Bols  wrote:
> >
> > > Hi,
> > > I have a rather complicated kafka streams application involving
> multiple
> > > joins, aggregates, maps etc. At a certain point, parts of the data
> needs
> > to
> > > be removed throughout the entire streams topology, both in the topics,
> > the
> > > changelogs and the rocksdb state stores.
> > >
> > > Managing this requires a lot of effort and things get very complex.
> F.e.
> > > when a KStream has a null value and is aggregated, you first need to
> > > convert it into some optional value instead b/c aggregates ignore
> nulls.
> > >
> > > Is there a better way or a way that does not impact all the existing
> > > streaming logic?
> > >
> > > I was thinking about having an out-of-bound process that sends null
> > values
> > > to all topics with the correct keys. I could then filter out all null
> > > values before doing the rest of the existing stream logic.
> > > Would that make sense?
> > >
> > > I can send null values to all my topics, but how do I get the changelog
> > > topics created by kafka-streams. And what about the state store?
> > >
> > > Best regards
> > > Jan
> > >
> >
>


complicated logic for tombstone records

2020-01-02 Thread Jan Bols
Hi,
I have a rather complicated kafka streams application involving multiple
joins, aggregates, maps etc. At a certain point, parts of the data needs to
be removed throughout the entire streams topology, both in the topics, the
changelogs and the rocksdb state stores.

Managing this requires a lot of effort and things get very complex. F.e.
when a KStream has a null value and is aggregated, you first need to
convert it into some optional value instead b/c aggregates ignore nulls.

Is there a better way or a way that does not impact all the existing
streaming logic?

I was thinking about having an out-of-bound process that sends null values
to all topics with the correct keys. I could then filter out all null
values before doing the rest of the existing stream logic.
Would that make sense?

I can send null values to all my topics, but how do I get the changelog
topics created by kafka-streams. And what about the state store?

Best regards
Jan


Allow keys to specify partitionKey

2019-10-21 Thread Jan Bols
The default partitioner takes a hash of the key of a topic to determine the 
partition number.

It would be useful for a key to be able to specify the object on which the 
default partitioner should base its hash on. This would allow us to use 
different composite keys and still be certain that they arrive on the same 
partition.

For example: I have 2 topics. One keys with a composite key containing a 
student with its classRoom. The other one is keyed with a treacher and its 
classRoom.

I want to make sure both end up in the same partition, even though they have 
different keys. However, they share the same classRoom.

I can create a custom partitioner where I specifically partition by classRoom, 
but things break down when using kafka-streams because kafka-streams always 
uses the default partitioner for internal repartitioning during maps, joins, 
etc

Would it be a useful idea to allow keys to implement a well-known interface 
like the following:
interface PartitionKeyAware{
  Object partitionKey()
}

The default partitioner could then look for objects of that type and call the 
partitionKey to do its hash.

Or is this a really bad idea not worth the effort to put it in a KIP?



LogAppendTime handling in consumer

2019-10-09 Thread Jan Hruban
Hi,

I have a Kafka topic configured with:

  message.timestamp.type=LogAppendTime

I'm using the "brod" [1] Kafka client and I have noticed that it does
return the CreateTime instead of LogAppendTime when fetching the
messages.

I have tracked down that the "kafka_protocol" library (used by the brod
client) always uses the firstTimestamp from the Record Batch and
timestampDelta from the Record to compute each record's timestamp [2].
This always gives the CreateTime.

In the official Java client, it looks like that when LogAppendTime is in
effect (determined by the attribute timestampType in the Record Batch),
it uses the maxTimestamp from the Record Batch [3] to set the timestamp
in each Record [4].

Is this the exact behaviour which is expected to be followed by clients?
I've come just across several resources which gave me few hints:

  * KIP-32 [5], which just talks about the Message format with magic < 2.

  * KAFKA-5353 [6], which changed the baseTimeStamp to always be the
create timestamp.

On the other hand, the documentation does not give a clue that clients
should use the maxTimestamp when LogAppendTime is in use:

  * The Record Batch documentation [7] does not explain the individual
fields semantics.

  * Wiki page "A Guide To The Kafka Protocol" [8] is more detailed on
the FirstTimestamp, TimestampDelta and MaxTimestamp, but does not
mention what implications does have the timestamp type on those
fields.

From my point of view, this is either a deficiency in Kafka, which
should instead always provide the correct authoritative timestamp to
consumers. Or if it is indeed expected that this logic is handled by
clients, it should be explicitly written in the official documentation.


For the record, here's a pull request [9] to the kafka_protocol
library.




[1] https://github.com/klarna/brod

[2] 
https://github.com/klarna/kafka_protocol/blob/cc13902191b9ca3970a65388697c1069ae68fd2a/src/kpro_batch.erl#L249

[3] 
https://github.com/apache/kafka/blob/1f1179ea64bbaf068d759aae988bd2a6fe966161/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L558

[4] 
https://github.com/apache/kafka/blob/1f1179ea64bbaf068d759aae988bd2a6fe966161/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L330-L331

[5] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message

[6] https://issues.apache.org/jira/browse/KAFKA-5353

[7] http://kafka.apache.org/documentation/#recordbatch

[8] 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

[9] https://github.com/klarna/kafka_protocol/pull/60


-- 
Jan Hruban


signature.asc
Description: PGP signature


Re: Topic gets truncated

2019-06-27 Thread jan . kosecki91



On 2019/06/27 15:39:29, Jan Kosecki  wrote: 
> Hi,
> 
> I have a hyperledger fabric cluster that uses a cluster of 3 kafka nodes +
> 3 zookeepers.
> Fabric doesn't support pruning yet so any change to recorded offsets is
> detected by it and it fails to reconnect to kafka cluster.
> Today morning all kafka nodes have gone offline and then slowly restarted.
> After that fabric doesn't work anymore. After checking the logs I've learnt
> that fabric has recorded last offset of 13356
> 
> 2019-06-27 10:34:51.325 UTC [orderer/consensus/kafka] newChain -> INFO 144
> [channel: audit] Starting chain with last persisted offset 13356 and last
> recorded block 2881
> 
> ,however when I compared it with kafka logs of the first node that started
> (kafka-0), I've found out that, although initially highwater mark was
> around this value, for some reason, during reboot, the node truncated the
> topic and I've lost all messages.
> 
> The topic audit has 1 partition and replication factor of 3.
> I've added logs from kafka-0 that contain any reference to audit topic.
> 
> Any suggestions, why the topic's offset has been truncated to 1?
> 
> Thanks in advance,
> Jan
> 

Failed to attach logs.
Fixing it here:

[2019-06-27 08:47:06,967] WARN [Log partition=audit-0, 
dir=/var/lib/kafka/data/topics] Found a corrupted index file corresponding to 
log file /var/lib/kafka/data/topics/audit-0/00012225.log due to 
Corrupt time index found, time index file 
(/var/lib/kafka/data/topics/audit-0/00012225.timeindex) has 
non-zero size but the last timestamp is 0 which is less than the first 
timestamp 1561534356375}, recovering segment and rebuilding index files... 
(kafka.log.Log)
[2019-06-27 08:47:06,967] INFO [Log partition=audit-0, 
dir=/var/lib/kafka/data/topics] Loading producer state till offset 12225 with 
message format version 2 (kafka.log.Log)
[2019-06-27 08:47:06,967] INFO [ProducerStateManager partition=audit-0] Loading 
producer state from snapshot file 
'/var/lib/kafka/data/topics/audit-0/00012225.snapshot' 
(kafka.log.ProducerStateManager)
[2019-06-27 08:47:06,997] INFO [ProducerStateManager partition=audit-0] Writing 
producer snapshot at offset 13358 (kafka.log.ProducerStateManager)
[2019-06-27 08:47:06,997] INFO [Log partition=audit-0, 
dir=/var/lib/kafka/data/topics] Recovering unflushed segment 12225 
(kafka.log.Log)
[2019-06-27 08:47:06,997] INFO [Log partition=audit-0, 
dir=/var/lib/kafka/data/topics] Loading producer state till offset 12225 with 
message format version 2 (kafka.log.Log)
[2019-06-27 08:47:07,001] INFO [ProducerStateManager partition=audit-0] Loading 
producer state from snapshot file 
'/var/lib/kafka/data/topics/audit-0/00012225.snapshot' 
(kafka.log.ProducerStateManager)
[2019-06-27 08:47:07,016] INFO [ProducerStateManager partition=audit-0] Writing 
producer snapshot at offset 13358 (kafka.log.ProducerStateManager)
[2019-06-27 08:47:07,021] INFO [Log partition=audit-0, 
dir=/var/lib/kafka/data/topics] Loading producer state till offset 13358 with 
message format version 2 (kafka.log.Log)
[2019-06-27 08:47:07,022] INFO [ProducerStateManager partition=audit-0] Loading 
producer state from snapshot file 
'/var/lib/kafka/data/topics/audit-0/00013358.snapshot' 
(kafka.log.ProducerStateManager)
[2019-06-27 08:47:07,022] INFO [Log partition=audit-0, 
dir=/var/lib/kafka/data/topics] Completed load of log with 2 segments, log 
start offset 0 and log end offset 13358 in 61 ms (kafka.log.Log)
[2019-06-27 08:49:19,360] INFO [Controller id=0] Current list of topics in the 
cluster: Set(audit) (kafka.controller.KafkaController)
[2019-06-27 08:49:19,382] INFO [Controller id=0] List of topics ineligible for 
deletion: audit(kafka.controller.KafkaController)
[2019-06-27 08:49:20,209] INFO [ReplicaStateMachine controllerId=0] Started 
replica state machine with initial state -> 
Map([Topic=audit,Partition=0,Replica=0] -> OnlineReplica, 
[Topic=audit,Partition=0,Replica=2] -> OnlineReplica, 
[Topic=audit,Partition=0,Replica=1] -> ReplicaDeletionIneligible) 
(kafka.controller.ReplicaStateMachine)
[2019-06-27 08:49:22,943] INFO [PartitionStateMachine controllerId=0] Started 
partition state machine with initial state -> Map(audit-0 -> OnlinePartition) 
(kafka.controller.PartitionStateMachine)
[2019-06-27 08:49:26,337] INFO Replica loaded for partition audit-0 with 
initial high watermark 0 (kafka.cluster.Replica)
[2019-06-27 08:49:26,337] INFO Replica loaded for partition audit-0 with 
initial high watermark 0 (kafka.cluster.Replica)
[2019-06-27 08:49:26,349] INFO Replica loaded for partition audit-0 with 
initial high watermark 13358 (kafka.cluster.Replica)
[2019-06-27 08:49:27,504] INFO [ReplicaFetcherManager on broker 0] Removed 
fetcher for partitions Set(audit

Topic gets truncated

2019-06-27 Thread Jan Kosecki
Hi,

I have a hyperledger fabric cluster that uses a cluster of 3 kafka nodes +
3 zookeepers.
Fabric doesn't support pruning yet so any change to recorded offsets is
detected by it and it fails to reconnect to kafka cluster.
Today morning all kafka nodes have gone offline and then slowly restarted.
After that fabric doesn't work anymore. After checking the logs I've learnt
that fabric has recorded last offset of 13356

2019-06-27 10:34:51.325 UTC [orderer/consensus/kafka] newChain -> INFO 144
[channel: audit] Starting chain with last persisted offset 13356 and last
recorded block 2881

,however when I compared it with kafka logs of the first node that started
(kafka-0), I've found out that, although initially highwater mark was
around this value, for some reason, during reboot, the node truncated the
topic and I've lost all messages.

The topic audit has 1 partition and replication factor of 3.
I've added logs from kafka-0 that contain any reference to audit topic.

Any suggestions, why the topic's offset has been truncated to 1?

Thanks in advance,
Jan


Re: kstream transform forward to different topics

2019-02-13 Thread Jan Filipiak
For now, just use the name it gets automatically, or crack the
AbstractStream open with reflection ;)

307 is doing it the wrong way again, just make name accessible instead
of make the users put them :face_with_rolling_eyes:

On 08.02.2019 02:36, Guozhang Wang wrote:
> Hi Nan,
> 
> Glad it helps with your case. Just another note that in the next release
> when KIP-307 is in place [1], you can actually combine the DSL with PAPI by
> naming the last operator that creates your transformed KStream, and then
> manually add the sink nodes like:
> 
> stream2 = stream1.transform(Named.as("myName"));
> 
> topology = builder.build();
> 
> // continue adding to the built topology
> topology.addSink(... "myName");
> 
> -


Re: [ANNOUNCE] New Committer: Vahid Hashemian

2019-01-15 Thread Jan Filipiak
Congratz!

On 15.01.2019 23:44, Jason Gustafson wrote:
> Hi All,
>
> The PMC for Apache Kafka has invited Vahid Hashemian as a project committer 
> and
> we are
> pleased to announce that he has accepted!
>
> Vahid has made numerous contributions to the Kafka community over the past
> few years. He has authored 13 KIPs with core improvements to the consumer
> and the tooling around it. He has also contributed nearly 100 patches
> affecting all parts of the codebase. Additionally, Vahid puts a lot of
> effort into community engagement, helping others on the mail lists and
> sharing his experience at conferences and meetups.
>
> We appreciate the contributions and we are looking forward to more.
> Congrats Vahid!
>
> Jason, on behalf of the Apache Kafka PMC
>


Re: Doubts in Kafka

2019-01-08 Thread Jan Filipiak

On 08.01.2019 17:11, aruna ramachandran wrote:
> I need to process single sensor messages in serial (order of messages
> should not be changed)at the same time I have to process 1 sensors
> messages in parallel please help me to configure the topics and partitions.
>


If you want to process event in order, you only need to make sure that 
the messages of every sensor go into the same partition. Not every 
sensor needs partition of its own. Just use sensor_id or UUID or 
something as a key while producing. Then you can freely pick the number 
of partitions.

Then take the retention time and throughput and try to come out at ~50GB 
at rest. If you need more throughput you can still use more partitions then

best Jan



Re: Please explain Rest API

2018-11-30 Thread jan
I may have missed this (I'm missing the first few messages), so sorry
in advance if I have, but what OS are you using?
Kafka does not work well on windows, I had problems using it that
sounds a little like this (just a little though) when on win.

jan

On 30/11/2018, Satendra Pratap Singh  wrote:
> Hi Sönke,
>
> when topic got created so i tried to read the topic data using console
> consumer but it didn't worked out consumer didn't consumer a single
> message. i read the log and got this every time. i don't understand where m
> i making mistake.
>
>
> [\!�qg_z��g_z�(�"{"error":{"name":"Error","status":404,"message":"Shared
> class  \"Dia\" has no method handling GET
> /getDia","statusCode":404,"stack":"Error: Shared class  \"Dia\" has no
> method handling GET /getDia\nat restRemoteMethodNotFound
> (/Users/satendra/Api/node_modules/loopback/node_modules/strong-remoting/lib/rest-adapter.js:371:17)\n
>   at Layer.handle [as handle_request]
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/layer.js:95:5)\n
>   at trim_prefix
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:317:13)\n
>   at
> /Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:284:7\n
>   at Function.process_params
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:335:12)\n
>   at next
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:275:10)\n
>   at Function.handle
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:174:3)\n
>   at router
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:47:12)\n
>   at Layer.handle [as handle_request]
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/layer.js:95:5)\n
>   at trim_prefix
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:317:13)\n
>   at
> /Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:284:7\n
>   at Function.process_params
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:335:12)\n
>   at next
> (/Users/satendra/Api/node_modules/loopback/node_modules/express/lib/router/index.js:275:10)\n
>   at jsonParser
> (/Users/satendra/analyticsApi/node_modules/body-parser/lib/types/json.js:109:7)\n
>   at Layer.handle [as handle_request]
> (/Users/satendra/analyticsApi/node_modules/loopback/node_modules/express/lib/router/layer.js:95:5)\n
>   at trim_prefix
> (/Users/satendra/analyticsApi/node_modules/loopback/node_modules/express/lib/router/index.js:317:13)"}}@access-control-allow-credentials["true"]connection["keep-alive"]content-typeF["application/json;
> charset=utf-8"dateB["Thu, 29 Nov 2018 12:38:59
> GMT"etagR["W/\"89a-6BIHHr4YrwjSfEHOpFjIThnXbCY\""]"transfer-encoding["chunked"vary6["Origin,
> Accept-Encoding"],x-content-type-options["nosniff"]$x-download-options["noopen"]x-frame-options["DENY"]
> x-xss-protection"["1; mode=block"]
>
> On Fri, Nov 30, 2018 at 2:30 AM Sönke Liebau
>  wrote:
>
>> Hi Satendra,
>>
>> if  I understand correctly you are using the RestSourceConnector to pull
>> data into a Kafka topic and that seems to work - at least as far as the
>> topic getting created.
>>
>> But are you saying that you tried to read data with 'cat filename.log' on
>> a
>> file within your kafka data directory? I assume I misunderstood this
>> part,
>> can you please add some detail around what you did there?
>>
>> Best regards,
>> Sönke
>>
>> On Thu, Nov 29, 2018 at 7:27 PM Satendra Pratap Singh <
>> sameerp...@gmail.com>
>> wrote:
>>
>> > Hi Sönke,
>> >
>> > I have single node kafka cluster, i have setup web server locally, many
>> > apps are sending logs to this server. I have Rest API which
>> > send/receive
>> > client request to/from server.
>> >
>> > here is my connector configuration : curl -i -X POST -H
>> > "Accept:application/json"  -H  "Content-Type: application/json"
>> > http://$CONNECT_HOST_SOURCE:8083/connectors/
>> > -d '{ "name": "source_rest_telemetry_data”, "config": {
>> > "key.converter":"org.apache.kafka.connect.storage.StringConverter",
>> > "value.converter"

Re: Kafka on Windows

2018-09-05 Thread jan
@M.Manna - thanks for your reply.
I did read that document but clearly didn't digest that bit properly.


@Liam Clarke
I don't expect code's users (either maintainers or end-users) to need
to deduce *anything*. If it needs knowing, I documented it.
I'll send you an actual example so you know it's not all talk.

>your 3 days you expended on learning the hard way that Windows is the minority 
>market share in servers

I had been accepted for a job which was to be a major career change.
For this I had to learn some basic linux (no, I've still never written
a bash script), scala (in depth) and kafka, and spark as well if I
could fit it in. IIRC I had less than 4 weeks to do it. To lose those
3 days mattered.

> you're resenting a free open source project?
Hardly, just the pointless loss of time.
I also try to add back to the community rather than just take. I had
been working on ANTLR stuff (which I'll return to when bits of me stop
hurting) and currently am trying to suss if permutation encoding can
be done from L1 cache  for large permutations in less than a single
DRAM access time. Look up cuckoo filters to see why.

jan

On 05/09/2018, Liam Clarke  wrote:
> Hi Jan,
>
> .sh is not .bat and easily googleable.
>
> Nothing is terrifying about updating the documentation, I just took umbrage
> at you "resenting" your 3 days you expended on learning the hard way that
> Windows is the minority market share in servers - you're resenting a free
> open source project? Because you don't know what a .sh file signifies?
>
> I'm sorry mate, but that's on you. I code for *nix but I know what .bat or
> .ps1 means.
>
> And given that 99.99% of FOSS is not built for Windows, you should always
> assume the worst unless you downloaded it from Codeplex or via NuGet.
>
> Consider those three days valuable learning instead.
>
> Kind regards,
>
> Liam Clarke
>
> On Wed, 5 Sep. 2018, 9:22 pm jan,  wrote:
>
>> Hi Liam,
>> as a DB guy  that does MSSQL (on windows, obviously) I literally have
>> no idea what a .sh file is,or what that would imply. I guess it's bash
>> so yeah.
>> But what's so terrifying about actually stating in the FAQ that you
>> should not run it as production on windows?
>> Why should I conclude that kafka on windows, with java (which is
>> supported on windows), and a filesystem (which windows has), and
>> reliable networking (ditto), doesn't work?
>> When is "just read the code" ever an acceptable answer?
>> Why should I lose 3 days just because someone couldn't put "don't do
>> it" in the FAQ (which I did read), and also have other people need to
>> ask here whether windows is supported?
>> Why?
>> This is just nuts.
>>
>> cheers
>>
>> jan
>>
>>
>> On 05/09/2018, Liam Clarke  wrote:
>> > Hi Jan,
>> >
>> > I'd presume that downloading an archive and seeing a bunch of .sh files
>> > would imply that Kafka wasn't built to run on Windows.
>> >
>> > Given that it's a cluster based technology, I'd assume that it wouldn't
>> be
>> > supported for Windows given that most server based stuff isn't unless
>> > Microsoft built it.
>> >
>> > Kind regards,
>> >
>> > Liam Clarke
>> >
>> > On Wed, 8 Aug. 2018, 2:42 am jan, 
>> wrote:
>> >
>> >> This is an excellent suggestion and I intend to do so henceforth
>> >> (thanks!), but it would be an adjunct to my request rather than the
>> >> answer; it still needs to be made clear in the docs/faq that you
>> >> *can't* use windows directly.
>> >>
>> >> jan
>> >>
>> >> On 07/08/2018, Rahul Singh  wrote:
>> >> > I would recommend using Docker — it would end up being run on a
>> >> > Linux
>> >> kernel
>> >> > VM on windows and is easier to get started on with a bit of learning
>> >> curve
>> >> > for Docker. Less time wasted overall and at least at that point you
>> >> > would
>> >> > know Docker.
>> >> >
>> >> > Rahul
>> >> > On Aug 7, 2018, 4:50 AM -0400, jan ,
>> >> wrote:
>> >> >> I tried using it just for learning a while back and wasted 3 days
>> >> >> because it's not supported on windows. Even basic stuff didn't
>> >> >> work.
>> I
>> >> >> did read the docs first!
>> >> >>
>> >> >> I think I

Re: Kafka on Windows

2018-09-05 Thread jan
Hi Liam,
as a DB guy  that does MSSQL (on windows, obviously) I literally have
no idea what a .sh file is,or what that would imply. I guess it's bash
so yeah.
But what's so terrifying about actually stating in the FAQ that you
should not run it as production on windows?
Why should I conclude that kafka on windows, with java (which is
supported on windows), and a filesystem (which windows has), and
reliable networking (ditto), doesn't work?
When is "just read the code" ever an acceptable answer?
Why should I lose 3 days just because someone couldn't put "don't do
it" in the FAQ (which I did read), and also have other people need to
ask here whether windows is supported?
Why?
This is just nuts.

cheers

jan


On 05/09/2018, Liam Clarke  wrote:
> Hi Jan,
>
> I'd presume that downloading an archive and seeing a bunch of .sh files
> would imply that Kafka wasn't built to run on Windows.
>
> Given that it's a cluster based technology, I'd assume that it wouldn't be
> supported for Windows given that most server based stuff isn't unless
> Microsoft built it.
>
> Kind regards,
>
> Liam Clarke
>
> On Wed, 8 Aug. 2018, 2:42 am jan,  wrote:
>
>> This is an excellent suggestion and I intend to do so henceforth
>> (thanks!), but it would be an adjunct to my request rather than the
>> answer; it still needs to be made clear in the docs/faq that you
>> *can't* use windows directly.
>>
>> jan
>>
>> On 07/08/2018, Rahul Singh  wrote:
>> > I would recommend using Docker — it would end up being run on a Linux
>> kernel
>> > VM on windows and is easier to get started on with a bit of learning
>> curve
>> > for Docker. Less time wasted overall and at least at that point you
>> > would
>> > know Docker.
>> >
>> > Rahul
>> > On Aug 7, 2018, 4:50 AM -0400, jan ,
>> wrote:
>> >> I tried using it just for learning a while back and wasted 3 days
>> >> because it's not supported on windows. Even basic stuff didn't work. I
>> >> did read the docs first!
>> >>
>> >> I think I've seen other people on this list have questions
>> >> about/problems for exactly the same reason, and that could be a lot of
>> >> time saved if it was in the docs - it needs to be. So how do I ask the
>> >> maintainers to put 'No, Not Windows" in there?
>> >> Serious question.
>> >>
>> >> I resent losing 3 days of work because of essential missing info. It
>> >> sounds like (compared to @M. Manna) that I got off lightly.
>> >>
>> >> So can we put a clear caveat in the documentation, please, right at
>> >> the
>> >> top?
>> >>
>> >> jan
>> >>
>> >> On 07/08/2018, M. Manna  wrote:
>> >> > The answer is - Absolutely not. If you don’t have Linux rack, or
>> >> > Kubernetes
>> >> > deployment -it will not work on Windows as guaranteed.
>> >> >
>> >> > I know this because I have tried to make it work for the past 1
>> >> > year.
>> >> > File
>> >> > handling always fails and crashes the cluster on Windows.
>> >> >
>> >> > Thanks,
>> >> >
>> >> >
>> >> >
>> >> > On Tue, 7 Aug 2018 at 01:54, Alew  wrote:
>> >> >
>> >> > > Hi!
>> >> > >
>> >> > > Is it recommended to use production Kafka cluster on Windows?
>> >> > >
>> >> > > Can't get it from the documentation. It is possible to start Kafka
>> on
>> >> > > Windows, but maybe it's for development purposes only.
>> >> > >
>> >> > > Thanks.
>> >> > >
>> >> > >
>> >> > >
>> >> > >
>> >> >
>> >
>>
>


Re: Kafka on Windows

2018-08-07 Thread jan
@Jacob Sheck: It was 18 months ago so I don't recall but I was told
clearly windows ain't supported when I reported problems.

I don't know if my problems were down to my inexperience or my use of
windows, but I basically assuming I was controlling for one variable
(n00bish inexperience) but was informed I now had two (n00bness, and
an unsupported platform).

tl;dr if it doesn't work on X, we need to say so clearly. It's just...
good manners, surely?

cheers

jan


On 07/08/2018, M. Manna  wrote:
> By fully broken, i mean not designed and tested to work on Windows.
>
> On Tue, 7 Aug 2018, 16:34 M. Manna,  wrote:
>
>> Log Cleaner functionality is fully broken... If you haven't tried that
>> already.
>>
>>
>>
>> On 7 Aug 2018 4:24 pm, "Jacob Sheck"  wrote:
>>
>> While I agree that it is less frustrating to run Kafka on Linux, I am
>> interested to hear what specific issues you are running into on windows?
>>
>>
>> On Tue, Aug 7, 2018 at 9:42 AM jan 
>> wrote:
>>
>> > This is an excellent suggestion and I intend to do so henceforth
>> > (thanks!), but it would be an adjunct to my request rather than the
>> > answer; it still needs to be made clear in the docs/faq that you
>> > *can't* use windows directly.
>> >
>> > jan
>> >
>> > On 07/08/2018, Rahul Singh  wrote:
>> > > I would recommend using Docker — it would end up being run on a Linux
>> > kernel
>> > > VM on windows and is easier to get started on with a bit of learning
>> > curve
>> > > for Docker. Less time wasted overall and at least at that point you
>> would
>> > > know Docker.
>> > >
>> > > Rahul
>> > > On Aug 7, 2018, 4:50 AM -0400, jan ,
>> > wrote:
>> > >> I tried using it just for learning a while back and wasted 3 days
>> > >> because it's not supported on windows. Even basic stuff didn't work.
>> > >> I
>> > >> did read the docs first!
>> > >>
>> > >> I think I've seen other people on this list have questions
>> > >> about/problems for exactly the same reason, and that could be a lot
>> > >> of
>> > >> time saved if it was in the docs - it needs to be. So how do I ask
>> > >> the
>> > >> maintainers to put 'No, Not Windows" in there?
>> > >> Serious question.
>> > >>
>> > >> I resent losing 3 days of work because of essential missing info. It
>> > >> sounds like (compared to @M. Manna) that I got off lightly.
>> > >>
>> > >> So can we put a clear caveat in the documentation, please, right at
>> the
>> > >> top?
>> > >>
>> > >> jan
>> > >>
>> > >> On 07/08/2018, M. Manna  wrote:
>> > >> > The answer is - Absolutely not. If you don’t have Linux rack, or
>> > >> > Kubernetes
>> > >> > deployment -it will not work on Windows as guaranteed.
>> > >> >
>> > >> > I know this because I have tried to make it work for the past 1
>> year.
>> > >> > File
>> > >> > handling always fails and crashes the cluster on Windows.
>> > >> >
>> > >> > Thanks,
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Tue, 7 Aug 2018 at 01:54, Alew  wrote:
>> > >> >
>> > >> > > Hi!
>> > >> > >
>> > >> > > Is it recommended to use production Kafka cluster on Windows?
>> > >> > >
>> > >> > > Can't get it from the documentation. It is possible to start
>> > >> > > Kafka
>> > on
>> > >> > > Windows, but maybe it's for development purposes only.
>> > >> > >
>> > >> > > Thanks.
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> > >
>> > >> >
>> > >
>> >
>>
>>
>>
>


Re: Kafka on Windows

2018-08-07 Thread jan
This is an excellent suggestion and I intend to do so henceforth
(thanks!), but it would be an adjunct to my request rather than the
answer; it still needs to be made clear in the docs/faq that you
*can't* use windows directly.

jan

On 07/08/2018, Rahul Singh  wrote:
> I would recommend using Docker — it would end up being run on a Linux kernel
> VM on windows and is easier to get started on with a bit of learning curve
> for Docker. Less time wasted overall and at least at that point you would
> know Docker.
>
> Rahul
> On Aug 7, 2018, 4:50 AM -0400, jan , wrote:
>> I tried using it just for learning a while back and wasted 3 days
>> because it's not supported on windows. Even basic stuff didn't work. I
>> did read the docs first!
>>
>> I think I've seen other people on this list have questions
>> about/problems for exactly the same reason, and that could be a lot of
>> time saved if it was in the docs - it needs to be. So how do I ask the
>> maintainers to put 'No, Not Windows" in there?
>> Serious question.
>>
>> I resent losing 3 days of work because of essential missing info. It
>> sounds like (compared to @M. Manna) that I got off lightly.
>>
>> So can we put a clear caveat in the documentation, please, right at the
>> top?
>>
>> jan
>>
>> On 07/08/2018, M. Manna  wrote:
>> > The answer is - Absolutely not. If you don’t have Linux rack, or
>> > Kubernetes
>> > deployment -it will not work on Windows as guaranteed.
>> >
>> > I know this because I have tried to make it work for the past 1 year.
>> > File
>> > handling always fails and crashes the cluster on Windows.
>> >
>> > Thanks,
>> >
>> >
>> >
>> > On Tue, 7 Aug 2018 at 01:54, Alew  wrote:
>> >
>> > > Hi!
>> > >
>> > > Is it recommended to use production Kafka cluster on Windows?
>> > >
>> > > Can't get it from the documentation. It is possible to start Kafka on
>> > > Windows, but maybe it's for development purposes only.
>> > >
>> > > Thanks.
>> > >
>> > >
>> > >
>> > >
>> >
>


Re: Kafka on Windows

2018-08-07 Thread jan
I tried using it just for learning a while back and wasted 3 days
because it's not supported on windows. Even basic stuff didn't work. I
did read the docs first!

I think I've seen other people on this list have questions
about/problems for exactly the same reason, and that could be a lot of
time saved if it was in the docs - it needs to be. So how do I ask the
maintainers to put 'No, Not Windows" in there?
Serious question.

I resent losing 3 days of work because of essential missing info. It
sounds like (compared to @M. Manna) that I got off lightly.

So can we put a clear caveat in the documentation, please, right at the top?

jan

On 07/08/2018, M. Manna  wrote:
> The answer is - Absolutely not. If you don’t have Linux rack, or Kubernetes
> deployment  -it will not work on Windows as guaranteed.
>
> I know this because I have tried to make it work for the past 1 year. File
> handling always fails and crashes the cluster on Windows.
>
> Thanks,
>
>
>
> On Tue, 7 Aug 2018 at 01:54, Alew  wrote:
>
>> Hi!
>>
>> Is it recommended to use production Kafka cluster on Windows?
>>
>> Can't get it from the documentation. It is possible to start Kafka on
>> Windows, but maybe it's for development purposes only.
>>
>> Thanks.
>>
>>
>>
>>
>


Re: Documentation/Article/Case Study on Scala as the Kafka Backbone Language

2018-07-28 Thread jan
I'm not a scala expert and haven't touched it for 18 months, but with
respect to Mr. Singh, I'd like to clarify or question a few of his
points.

1. Statelessness is a tool; not an end in itself but a means to an
end. As someone on HackerNews says, "control your state space or die",
but the same guy is *not* saying remove it all. I've seen immutability
overused. Sometimes a bit of state makes things both more
comprehensible and faster.

2. I don't know, and 3. true,

4. @Rahul, I don't understand, can you clarify?

5. Largely true and a huge bonus when used appropriately, but it can
be overused. Sometimes it seems emphasis on "helpful" syntactic
formatting without asking whether it actually helps the programmer.

6. Sounds like you've had more experience with them than me! Perhaps I
don't know how to use them appropriately. I may be missing a trick.

7. I wouldn't argue but I'd warn that some abstractions can be
expensive and I suspect shapeless may be one. Also, for parsers may I
suggest looking at ANTLR?

Idiomatic scala code can be expensive *as curremtly implemented*. Just
understand that cost by profiling, and de-idiomise in hot code as
needed.

It's a fab language.

jan

On 23/07/2018, Rahul Singh  wrote:
> Not necessarily for Kafka, but you can definitely google “Java vs. Scala”
> and find a variety of reasons . I did a study for a client and ultimately
> here are the major reasons I found :
>
> 1. Functional programming language which leads itself to stateless systems
> 2. Better / easier to use stream processing syntax (then at that time in
> Java 8)
> 3. REPL available to quickly test functionality interactively.
> 4. Case classes which can be inferred with or without strongly typed cases.
> 5. Abilty to quickly create DSLs that seem natural to developers
> 6. Awesome partial function syntax
> 7. My personal favorite — as I was using parboiled2 to build a parser —
> libraries like shapeless
>
> Best
>
> --
> Rahul Singh
> rahul.si...@anant.us
>
> Anant Corporation
> On Jul 23, 2018, 8:40 AM -0400, M. Manna , wrote:
>> Hello,
>>
>> Is anyone aware of any links or website where I can find information/case
>> study etc. to why Scala was the best choice for kafka design? I hope this
>> is not too much of a "Naive" question since I have had a very humble
>> introduction to Scala.
>>
>> I understand that Scala is considered where distributed/scalable systems
>> need to be designed. Also, in some cases it reduces multiple complex
>> statements to be formed using a single complex statements i.e. reduce
>> incremental verbosity.
>>
>> So, as a person who has background in Java, but relatively novice in
>> Scala,
>> I wanted to understand whether a study document exists to document the
>> choice?
>>
>> Regards,
>


Re: Graceful Shutdown always fails on multi-broker setup (Windows)

2018-05-09 Thread Jan Filipiak

Hi,

yes, your case is the exception. In usual deployments kafka has to be 
there 100% all the time.
So as the name rolling restart suggest, you usually upgrade / do 
maitenance  on boxes (a few at a time) depending how your

topics are laied our across brokers.





On 09.05.2018 12:13, M. Manna wrote:

Thanks Jan. We have 9 broker-zookeeper setup in production and during
monthly maintenance we need to shut it down gracefully (or reasonably) to
do our work.
Are you saying that it's okay not to shut down the entire cluster?

Also, will this hold true even when we are trying to do rolling upgrade to
1.0x as prescribed here - https://kafka.apache.org/documentation/#upgrade ?

Regards,

On 9 May 2018 at 11:06, Jan Filipiak  wrote:


Hi,

  this is expected.  A gracefully shutdown means the broker is only
shutting down when it is not the leader of any partition.
Therefore you should not be able to gracefully shut down your entire
cluster.

Hope that helps

Best Jan



On 09.05.2018 12:02, M. Manna wrote:


Hello,

I have followed the graceful shutdown process by using the following (in
addition to the default controlled.shutdown.enable)

controlled.shutdown.max.retries=10
controlled.shutdown.retry.backoff.ms=3000

I am always having issues where not all the brokers are shutting
gracefully. And it's always Kafka, not zookeeper.

Has anyone experienced this discrepancy ? If so, could you please let me
know how to get around this issue?

Regards,






Re: Graceful Shutdown always fails on multi-broker setup (Windows)

2018-05-09 Thread Jan Filipiak

Hi,

 this is expected.  A gracefully shutdown means the broker is only 
shutting down when it is not the leader of any partition.
Therefore you should not be able to gracefully shut down your entire 
cluster.


Hope that helps

Best Jan


On 09.05.2018 12:02, M. Manna wrote:

Hello,

I have followed the graceful shutdown process by using the following (in
addition to the default controlled.shutdown.enable)

controlled.shutdown.max.retries=10
controlled.shutdown.retry.backoff.ms=3000

I am always having issues where not all the brokers are shutting
gracefully. And it's always Kafka, not zookeeper.

Has anyone experienced this discrepancy ? If so, could you please let me
know how to get around this issue?

Regards,





Re: why kafka index file use memory mapped files ,however log file doesn't

2018-02-13 Thread jan
1. Perhaps a human-readable log, being write-only, and which may
buffer on the user side or in the kernel, may be more efficient
because small writes are accumulated in a buffer (cheap) actually
pushed to disk (less cheap)? If you mmap'd this instead, how do you
feel it would behave?

2. Did you read the link to the post about mmapping? The guy knows
more about it than I'll probably ever know and he says it's not that
simple. He's saying mmap not a magic answer to anything.
This bit may be relevant: "APPLICATION BUFFERS WHICH EASILY FIT IN THE
L2 CACHE COST VIRTUALLY NOTHING ON A MODERN CPU!" (NB. post is from
2004 so with cache+cpu VS ram/disk discrepancies growing larger, it
may be more true).
Kafka messages can be largish so perhaps that suggests why they use it
for data files.

If this comes across as bit rude that wasn't intended. I can't really
answer your question, just suggest a bit of reading and some
guesswork.

cheers

jan

On 13/02/2018, YuFeng Shen  wrote:
>  If that is like what you said , why index file use the memory mapped file?
>
> 
> From: jan 
> Sent: Monday, February 12, 2018 9:26 PM
> To: users@kafka.apache.org
> Subject: Re: why kafka index file use memory mapped files ,however log file
> doesn't
>
> A human-readable log file is likely to have much less activity in it
> (it was a year ago I was using kafka and we could eat up gigs for the
> data files but the log files were a few meg). So there's perhaps
> little to gain.
>
> Also if the power isn't pulled and the OS doesn't crash, log messages
> will be, I guess, buffered by the OS then written out as a full
> buffer, or perhaps every nth tick if the buffer fills up very slowly.
> So it's still reasonably efficient.
>
> Adding a few hundred context switches a second for the human log
> probably isn't a big deal. I remember seeing several tens of
> thousands/sec  when using kafka (although it was other processes
> running on those multicore machines to be fair). I guess logging
> overhead is down in the noise, though that's just a guess.
>
> Also I remember reading a rather surprising post about mmaping. Just
> found it
> <https://lists.freebsd.org/pipermail/freebsd-questions/2004-June/050371.html>.
> Sniplets:
> "There are major hardware related overheads to the use of mmap(), on
> *ANY* operating system, that cannot be circumvented"
> -and-
> "you are assuming that copying is always bad (it isn't), that copying
> is always horrendously expensive (it isn't), that memory mapping is
> always cheap (it isn't cheap),"
>
> A bit vague on my part, but HTH anyway
>
> jan
>
>
> On 12/02/2018, YuFeng Shen  wrote:
>> Hi jan ,
>>
>> I think the reason is the same as why index file using  memory mapped
>> file.
>>
>> As the memory mapped file can avoid the data copy between user and kernel
>> buffer space, so it can improve the performance for the index file IO
>> operation ,right? If it is ,why Log file cannot achieve the same
>> performance
>> improvement as memory mapped index file?
>>
>>
>> Jacky
>>
>>
>> 
>> From: jan 
>> Sent: Saturday, February 10, 2018 8:33 PM
>> To: users@kafka.apache.org
>> Subject: Re: why kafka index file use memory mapped files ,however log
>> file
>> doesn't
>>
>> I'm not sure I can answer your question, but may I pose another in
>> return: why do you feel having a memory mapped log file would be a
>> good thing?
>>
>>
>> On 09/02/2018, YuFeng Shen  wrote:
>>> Hi Experts,
>>>
>>> We know that kafka use memory mapped files for it's index files ,however
>>> it's log files don't use the memory mapped files technology.
>>>
>>> May I know why index files use memory mapped files, however log files
>>> don't
>>> use the same technology?
>>>
>>>
>>> Jacky
>>>
>>
>


Re: Kafka Consumer Offsets unavailable during rebalancing

2018-02-13 Thread Jan Filipiak

I would encourage you todo so.
I also think its not reasonable behavior

On 13.02.2018 11:28, Wouter Bancken wrote:

We have upgraded our Kafka version as an attempt to solve this issue.
However, the issue is still present in Kafka 1.0.0.

Can I log a bug for this in JIRA?

Wouter

On 5 February 2018 at 09:22, Wouter Bancken 
wrote:


The consumers in consumer group 'X' do not have a regex subscription
matching the newly created topic 'C'. They simply subscribe with
the subscribe(java.util.Collection topics) method on
topics 'A' and 'B'.

Shouldn't the consumer group have a different state from "Stable" during a
rebalancing regardless of the cause? How else can we determine the consumer
lag of the group during the rebalancing?

Best regards,
Wouter

Have a look at our brand NEW job website: jobs.aca-it.be !


*ACA IT-Solutions NV*
*HQ:* Herkenrodesingel 8B 2.01 | 3500 Hasselt
T +32(0)11 26 50 10 | F +32(0)11 26 50 11
www.aca-it.be | Twitter  | Facebook
 |
Linkedin 

On 5 February 2018 at 00:13, Hans Jespersen  wrote:


Do the consumers in consumer group ‘X’ have a regex subscription that
matches the newly created topic ‘C’?

If they do then they will only discover this new topic once their ‘
metadata.max.age.ms’  metadata refresh interval has passed, which
defaults to 5 minutes.

metadata.max.age.ms The period of time in milliseconds after which
we force a refresh of metadata even if we haven't seen any partition
leadership changes to proactively discover any new brokers or partitions
-hans



On Feb 4, 2018, at 2:16 PM, Wouter Bancken 

wrote:

Hi Hans,

Thanks for the response!

However, I get this result for all topics, not just for the newly

created

topic.

Situation sketch:
1. I have a consumer group 'X' subscribed to topics 'A' and 'B' with
partition assignments and lag information. Consumer group 'X' is

"Stable".

2a. Topic 'C' is (being) created.
2b. During this creation, I do not have a partition assignment for

consumer

group 'X' for topics 'A' and 'B' but the consumer group is still

"Stable".

3. A second later: I have a partition assignment for consumer group 'X'

for

topics 'A' and 'B' again and the consumer group is still "Stable".

I expected the state of consumer group 'X' during step 2b to be
"PreparingRebalance" or "AwaitingSync".

Best regards,
Wouter


On 4 February 2018 at 21:25, Hans Jespersen  wrote:

I believe this is expected behavior.

If there are no subscriptions to a new topic, and therefor no partition
assignments, and definitely no committed offsets, then lag is an

undefined

concept. When the consumers subscribe to this new topic they may chose

to

start at the beginning or end of the commit log so the lag cannot be
predicted in advance.

-hans


On Feb 4, 2018, at 11:51 AM, Wouter Bancken 
wrote:

Can anyone clarify if this is a bug in Kafka or the expected behavior?

Best regards,
Wouter


On 30 January 2018 at 21:04, Wouter Bancken 
Hi,

I'm trying to write an external tool to monitor consumer lag on

Apache

Kafka.

For this purpose, I'm using the kafka-consumer-groups tool to fetch

the

consumer offsets.

When using this tool, partition assignments seem to be unavailable
temporarily during the creation of a new topic even if the consumer

group

has no subscription on this new topic. This seems to match the
documentation


saying *"Topic metadata changes which have no impact on subscriptions
cause resync"*.

However, when this occurs I'd expect the state of the consumer to be
"PreparingRebalance" or "AwaitingSync" but it is simply "Stable".

Is this a bug in the tooling or is there a different way to obtain

the

correct offsets for a consumer group during a rebalance?

I'm using Kafka 10.2.1 but I haven't found any related issues in

recent

changelogs.
Best regards,
Wouter







Re: why kafka index file use memory mapped files ,however log file doesn't

2018-02-12 Thread jan
A human-readable log file is likely to have much less activity in it
(it was a year ago I was using kafka and we could eat up gigs for the
data files but the log files were a few meg). So there's perhaps
little to gain.

Also if the power isn't pulled and the OS doesn't crash, log messages
will be, I guess, buffered by the OS then written out as a full
buffer, or perhaps every nth tick if the buffer fills up very slowly.
So it's still reasonably efficient.

Adding a few hundred context switches a second for the human log
probably isn't a big deal. I remember seeing several tens of
thousands/sec  when using kafka (although it was other processes
running on those multicore machines to be fair). I guess logging
overhead is down in the noise, though that's just a guess.

Also I remember reading a rather surprising post about mmaping. Just
found it 
<https://lists.freebsd.org/pipermail/freebsd-questions/2004-June/050371.html>.
Sniplets:
"There are major hardware related overheads to the use of mmap(), on
*ANY* operating system, that cannot be circumvented"
-and-
"you are assuming that copying is always bad (it isn't), that copying
is always horrendously expensive (it isn't), that memory mapping is
always cheap (it isn't cheap),"

A bit vague on my part, but HTH anyway

jan


On 12/02/2018, YuFeng Shen  wrote:
> Hi jan ,
>
> I think the reason is the same as why index file using  memory mapped file.
>
> As the memory mapped file can avoid the data copy between user and kernel
> buffer space, so it can improve the performance for the index file IO
> operation ,right? If it is ,why Log file cannot achieve the same performance
> improvement as memory mapped index file?
>
>
> Jacky
>
>
> 
> From: jan 
> Sent: Saturday, February 10, 2018 8:33 PM
> To: users@kafka.apache.org
> Subject: Re: why kafka index file use memory mapped files ,however log file
> doesn't
>
> I'm not sure I can answer your question, but may I pose another in
> return: why do you feel having a memory mapped log file would be a
> good thing?
>
>
> On 09/02/2018, YuFeng Shen  wrote:
>> Hi Experts,
>>
>> We know that kafka use memory mapped files for it's index files ,however
>> it's log files don't use the memory mapped files technology.
>>
>> May I know why index files use memory mapped files, however log files
>> don't
>> use the same technology?
>>
>>
>> Jacky
>>
>


Re: why kafka index file use memory mapped files ,however log file doesn't

2018-02-10 Thread jan
I'm not sure I can answer your question, but may I pose another in
return: why do you feel having a memory mapped log file would be a
good thing?


On 09/02/2018, YuFeng Shen  wrote:
> Hi Experts,
>
> We know that kafka use memory mapped files for it's index files ,however
> it's log files don't use the memory mapped files technology.
>
> May I know why index files use memory mapped files, however log files don't
> use the same technology?
>
>
> Jacky
>


Re: Broker won't exit...

2018-01-10 Thread Jan Filipiak

HI

brokers still try todo a gracefull shutdown I supose?
It would only shut down if it is not the leader of any partition anymore.

Can you verify: there are other brokers alive that took over leadership?
and the broker in question stepped down as a leader for all partitions?

Best Jan



On 10.01.2018 12:57, Ted Yu wrote:

Skip:Can you pastebin the stack trace of the stuck broker ?
Thanks
 Original message From: Skip Montanaro 
 Date: 1/10/18  3:52 AM  (GMT-08:00) To: 
users@kafka.apache.org Subject: Re: Broker won't exit...
Did you stop the broker before stoping zookeeper?


Yes. My stop script executes the server stop scripts in reverse order from
my start script. Should I have stuck in a couple second sleep between
stopping the brokers and stopping zookeeper?

I was actually running two brokers. The one my stop script stopped first
exited properly.

Skip




Re: Consuming a state store (KTable) basics - 1.0.0

2017-12-07 Thread Jan Filipiak

Hi Peter,

glad it helped,

these are the preferred ways indeed.




On 07.12.2017 15:58, Peter Figliozzi wrote:

Thanks Jan, super helpful!  To summarize (I hope I've got it right), there
are only two ways for external applications to access data derived from a
KTable:

1.  Inside the streams application that builds the KTable, create a
KafkaStreams.store and expose to the outside via a service.

2.  Convert the KTable to a stream and write to a new Kafka topic.  Then
external apps can just consume this feed.  If we only care about the latest
updates, make the topic log-compacted.

latest value per key or last updated might be a different story here,
in the end there is a lot of flexibility here that everyone is free to 
explore


Best Jan



Thanks,

Pete

On Thu, Dec 7, 2017 at 1:42 AM, Jan Filipiak 
wrote:


Hi,

you should be able to retrieve your store with

https://github.com/apache/kafka/blob/trunk/streams/src/main/
java/org/apache/kafka/streams/KafkaStreams.java#L1021

This would give you access to the store from inside your current
application. In your Streams application your could then
expose this Store with a say REST or any other RPC interface, to let
applications from outside your JVM query it.

So i would say the blogpost still applies quite well.

Hope this helps

Best Jan


On 07.12.2017 04:59, Peter Figliozzi wrote:


I've written a Streams application which creates a KTable like this:

val myTable: KTable[String, GenericRecord] = myStream
  .groupByKey()
  .aggregate(myInitializer, myAdder, myStore)

where myStore was configured like this:

val myStore
  : Materialized[String, GenericRecord, KeyValueStore[Bytes,
Array[Byte]]] =
  Materialized
.as("my-store")
.withKeySerde(Serdes.String())
.withValueSerde(genericValueSerde)

What I'd like to do now is query (read) this store from a separate
application.  How do I query it in 1.0.0?  With a KTable constructor,
using
the store string as the topic, i.e.:

public  KTable table(
java.lang.String topic,
Materialized>
materialized)

Or some other way?

I saw this blog post
<https://blog.codecentric.de/en/2017/03/interactive-queries-
in-apache-kafka-streams/>
but it appears to be only applicable to the older version of Kafka (please
correct me if I'm wrong).

Thanks,

Pete






Re: Consuming a state store (KTable) basics - 1.0.0

2017-12-06 Thread Jan Filipiak

Hi,

you should be able to retrieve your store with

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1021

This would give you access to the store from inside your current 
application. In your Streams application your could then
expose this Store with a say REST or any other RPC interface, to let 
applications from outside your JVM query it.


So i would say the blogpost still applies quite well.

Hope this helps

Best Jan


On 07.12.2017 04:59, Peter Figliozzi wrote:

I've written a Streams application which creates a KTable like this:

val myTable: KTable[String, GenericRecord] = myStream
 .groupByKey()
 .aggregate(myInitializer, myAdder, myStore)

where myStore was configured like this:

val myStore
 : Materialized[String, GenericRecord, KeyValueStore[Bytes,
Array[Byte]]] =
 Materialized
   .as("my-store")
   .withKeySerde(Serdes.String())
   .withValueSerde(genericValueSerde)

What I'd like to do now is query (read) this store from a separate
application.  How do I query it in 1.0.0?  With a KTable constructor, using
the store string as the topic, i.e.:

public  KTable table(
java.lang.String topic,
Materialized>
materialized)

Or some other way?

I saw this blog post
<https://blog.codecentric.de/en/2017/03/interactive-queries-in-apache-kafka-streams/>
but it appears to be only applicable to the older version of Kafka (please
correct me if I'm wrong).

Thanks,

Pete





Re: Mirrormaker consumption slowness

2017-12-06 Thread Jan Filipiak

Hi,

two questions. Is your MirrorMaker collocated with the source or the target?
what are the send and receive buffer sizes on the connections that do span
across WAN?

Hope we can get you some help.

Best jan



On 06.12.2017 14:36, Xu, Zhaohui wrote:

Any update on this issue?

We also run into similar situation recently. The mirrormaker is leveraged to 
replicate messages between clusters in different dc. But sometimes a portion of 
partitions are with high consumer lag and tcpdump also shows similar packet 
delivery pattern. The behavior is sort of weird and is not self-explaining. 
Wondering whether it has anything to do with the fact that number of consumers 
is too large?  In our example, we have around 100 consumer connections per 
broker.

Regards,
Jeff

On 12/5/17, 10:14 AM, "tao xiao"  wrote:

 Hi,
 
 any pointer will be highly appreciated
 
 On Thu, 30 Nov 2017 at 14:56 tao xiao  wrote:
 
 > Hi There,

 >
 >
 >
 > We are running into a weird situation when using Mirrormaker to replicate
 > messages between Kafka clusters across datacenter and reach you for help 
in
 > case you also encountered this kind of problem before or have some 
insights
 > in this kind of issue.
 >
 >
 >
 > Here is the scenario. We have setup a deployment where we run 30
 > Mirrormaker instances on 30 different nodes. Each Mirrormaker instance is
 > configure with num.streams=1 thus only one consumer runs. The topics to
 > replicate is configure with 100 partitions and data is almost evenly
 > distributed across all partitions. After running a period of time, weird
 > things happened that some of the Mirrormaker instances seems to slow down
 > and consume at a relative slow speed from source Kafka cluster. The 
output
 > of tcptrack shows the consume rate of problematic instances dropped to
 > ~1MB/s, while the other healthy instances consume at a rate of  ~3MB/s. 
As
 > a result, the consumer lag for corresponding partitions are going high.
 >
 >
 >
 >
 > After triggering a tcpdump, we noticed the traffic pattern in tcp
 > connection of problematic Mirrmaker instances is very different from
 > others. Packets flowing in problematic tcp connections are relatively 
small
 > and seq and ack packets are basically coming in one after another. On the
 > other hand, the packets in healthy tcp connections are coming in a
 > different pattern, basically several seq packets comes with an ack 
packets.
 > Below screenshot shows the situation, and these two captures are got on 
the
 > same mirrormaker node.
 >
 >
 >
 > problematic connection.  ps. 10.kfk.kfk.kfk is kafka broker, 10.mm.mm.mm
 > is Mirrormaker node
 >
 > 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2FZ3odjjT&data=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028&sdata=2DdGcjPWD7QI7lZ7v7QDN6I53P9tsSTMzEGdw6IywmU%3D&reserved=0
 >
 >
 > healthy connection
 >
 > 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2Fw0A6qHT&data=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028&sdata=v52DmmY9LHN2%2F59Hb5Xo77JuLreOA3lfDyq8eHKmISQ%3D&reserved=0
 >
 >
 > If we stop the problematic Mirrormaker instance and when other instances
 > take over the lagged partitions, they can consume messages quickly and
 > catch up the lag soon. So the broker in source Kafaka cluster is supposed
 > to be good. But if Mirrormaker itself causes the issue, how can one tcp
 > connection is good but others are problematic since the connections are 
all
 > established in the same manner by Kafka library.
 >
 >
 >
 > Consumer configuration for Mirrormaker instance as below.
 >
 > auto.offset.reset=earliest
 >
 >
 > 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
 >
 > heartbeat.interval.ms=1
 >
 > session.timeout.ms=12
 >
 > request.timeout.ms=15
 >
 > receive.buffer.bytes=1048576
 >
 > max.partition.fetch.bytes=2097152
 >
 > fetch.min.bytes=1048576
 >
 >
 >
 > Kafka version is 0.10.0.0 and we have Kafka and Mirrormaker run on Ubuntu
 > 14.04
 >
 >
 >
 > Any response is appreciated.
 >
 > Regards,
 >
 > Tao
 >
 





Re: Configuration: Retention and compaction

2017-12-03 Thread Jan Filipiak

Hi

the only retention time that applies for compacted topics is the 
delete.retention.ms
The duration that tombstones for deletes will be kept in the topic 
during compaction.


A very detail explaination on what is going on can be found here:

https://kafka.apache.org/documentation/#compaction

Hope this helps

Best Jan


On 03.12.2017 20:27, Dmitry Minkovsky wrote:

This is a pretty stupid question. Mostly likely I should verify these by
observation, but really I want to verify that my understanding of the
documentation is correct:

Suppose I have topic configurations like:

retention.ms=$time
cleanup.policy=compact


My questions are:

1. After $time, any offsets older than $time will be eligible for
compaction?
2. Regardless of $time, any offsets in the current segment will not be
compacted?


Thank you,
Dmitry





Re: Joins in Kafka Streams and partitioning of the topics

2017-11-30 Thread Jan Filipiak
There are some oddities in your topology that make make we wonder if 
they are the true drivers of your question.


https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L300
Feels like it should be a KTable to begin with for example otherwise it 
is not clear how big this is supposed to grow

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java#L325
Same thing for policies. KGlobalTable might be chipped in later if you 
fat up from too many repartitions as some sort of

performance optimisation, but my opinions on it are not to high.


Hope that helps, just keep the questions coming, also check if you might 
want to join confluentcommunity on slack.
Could never imaging that something like a insurance can really be 
modelled as 4 streams ;)


Best Jan





On 30.11.2017 21:07, Artur Mrozowski wrote:

what if I start two instances of that application?  Does the state migrate
between the applications? Is it then I have to use a global table?

BR
Artur

On Thu, Nov 30, 2017 at 7:40 PM, Jan Filipiak 
wrote:


Hi,

Haven't checked your code. But from what you describe you should be fine.
Upgrading the version might help here and there but should still work with
0.10
I guess.

Best Jan



On 30.11.2017 19:16, Artur Mrozowski wrote:


Thank you Damian, it was very helpful.
I have implemented my solution in version 0.11.0.2 but there is one thing
I
still wonder.
So what I try to do is what is described in KIP 150. Since it didn't make
to the release for 1.0 I do it the old fashioned way.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+
Kafka-Streams+Cogroup
First
KTable table1 =
builder.stream("topic1").groupByKey().aggregate(initializer1,
aggregator1, aggValueSerde1, storeName1);


for all the four topics and then I join the results.
And here is the thing, the topics are partitioned and I don't used global
tables, nor keyed messages and it seems to work fine.

  From Confluents documentation one could get impression that when reading
from partitoned topics you need to use global tables. But is it really
necessary in this case?
And if not then why?

Thanks again
Artur

Here is the link to my implementation

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/
java/kstream.demo/CustomerStreamPipelineHDI.java

On Wed, Nov 22, 2017 at 12:10 PM, Damian Guy 
wrote:

Hi Artur,

KafkaStreams 0.10.0.0 is quite old and a lot has changed and been fixed
since then. If possible i'd recommend upgrading to at least 0.11.0.2 or
1.0.
For joins you need to ensure that the topics have the same number of
partitions (which they do) and that they are keyed the same.

Thanks,
Damian

On Wed, 22 Nov 2017 at 11:02 Artur Mrozowski  wrote:

Hi,

I am joining 4 different topic with 4 partitions each using 0.10.0.0
version of Kafka Streams.  The joins are KTable to KTable. Is there
anything I should be aware of considering partitions or version of Kafka
Streams? In other words should I be expecting consistent results or do I
need to for example use Global tables.

I'd like to run that application on Kubernetes later on. Should I think


of


anything or do different instances of the same Kafka Streams application
take care of management of the state?

Grateful for any thoughts or a piece of advice

Best Regards
/Artur






Re: Joins in Kafka Streams and partitioning of the topics

2017-11-30 Thread Jan Filipiak

Hi,

Haven't checked your code. But from what you describe you should be fine.
Upgrading the version might help here and there but should still work 
with 0.10

I guess.

Best Jan


On 30.11.2017 19:16, Artur Mrozowski wrote:

Thank you Damian, it was very helpful.
I have implemented my solution in version 0.11.0.2 but there is one thing I
still wonder.
So what I try to do is what is described in KIP 150. Since it didn't make
to the release for 1.0 I do it the old fashioned way.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup
First
KTable table1 =
builder.stream("topic1").groupByKey().aggregate(initializer1,
aggregator1, aggValueSerde1, storeName1);


for all the four topics and then I join the results.
And here is the thing, the topics are partitioned and I don't used global
tables, nor keyed messages and it seems to work fine.

 From Confluents documentation one could get impression that when reading
from partitoned topics you need to use global tables. But is it really
necessary in this case?
And if not then why?

Thanks again
Artur

Here is the link to my implementation

https://github.com/afuyo/KStreamsDemo/blob/master/src/main/java/kstream.demo/CustomerStreamPipelineHDI.java

On Wed, Nov 22, 2017 at 12:10 PM, Damian Guy  wrote:


Hi Artur,

KafkaStreams 0.10.0.0 is quite old and a lot has changed and been fixed
since then. If possible i'd recommend upgrading to at least 0.11.0.2 or
1.0.
For joins you need to ensure that the topics have the same number of
partitions (which they do) and that they are keyed the same.

Thanks,
Damian

On Wed, 22 Nov 2017 at 11:02 Artur Mrozowski  wrote:


Hi,
I am joining 4 different topic with 4 partitions each using 0.10.0.0
version of Kafka Streams.  The joins are KTable to KTable. Is there
anything I should be aware of considering partitions or version of Kafka
Streams? In other words should I be expecting consistent results or do I
need to for example use Global tables.

I'd like to run that application on Kubernetes later on. Should I think

of

anything or do different instances of the same Kafka Streams application
take care of management of the state?

Grateful for any thoughts or a piece of advice

Best Regards
/Artur





Re: Plans to extend streams?

2017-11-30 Thread Jan Filipiak

hi,

I understand your point better now.

I think systems of that kind have been build plenty and I never liked 
their trade-offs.


Samza and Kafka-streams form a great alternative to what is out there in 
great numbers.


I am a big fan of how this is designed and think its really great. Maybe 
you should

give it a shot?

Just to get you interested:
With extreme detailed partition assignment and deploying stream jobs on 
broker instances
you can align all the topics so that you get basically the same kind of 
shuffle other system use.
attaching a little bit of "cruise-control" 
https://engineering.linkedin.com/blog/2017/08/open-sourcing-kafka-cruise-control
could also handle node failures. But usually this is not necessary. The 
hop across the broker is usually just to efficient

to have this kind of fuzz going on.

Hope this can convince you to try it out.


Best Jan


On 29.11.2017 20:15, Guozhang Wang wrote:

Hello Adrienne,

I think your suggested feature to to use not only Kafka as inter-process
communication but also configurable to use TCP directly, right?

There are a few people asking about this before, especially for not using
Kafka for repartitioning (think: shuffling in the batch world), but let
them go through TCP between processes. Though this is doable, I'd point out
that it may have many side-effects such as:

1) back pressure: Streams library do not worry about back pressure at all
since all communication channels are persistent (Kafka topics), using TCP
then you need to face the back pressure issue again.
2) exactly once semantics: the transactional messaging is leveraged by
Streams to achieve EOS, and extending TCP means that we need to add more
gears to handle TCP data loss / duplicates (e.g. other frameworks have been
using buffers with epoch boundaries to do that).
3) state snapshots: imagine if you are shutting down your app, we then need
to make sure all in-flight messages with TCP are drained because otherwise
we are not certain if the committed offsets are valid or not.



Guozhang


On Wed, Nov 29, 2017 at 8:26 AM, Adrienne Kole 
wrote:


Hi,

You misunderstood the focus of the post perhaps or I could not explain
properly. I am not claiming the streams is limited to single node.
Although the whole topology instance can be limited to a single node (each
node run all topology), this is sth else.
Also, I think that "moving 100s of GB data per day" claim is orthogonal
and as this is not big/fast/ enough to reason.

The thing is that, for some use-cases streams-kafka-streams connection can
be a bottleneck.  Yes, if I have 40GB/s or infiniband network bandwidth
this might not be an issue.

Consider a simple topology with operators A>B->C. (B forces to
re-partition)
  Streams nodes are s1(A), s2 (B,C) and kafka resides on cluster k, which
might be in different network switch.
So, rather than transferring data k->s1->s2, we make a round trip
k->s1->k->s2. If we know that s1 and s2 are in the same network and data
transfer is fast between two, we should not go through another intermediate
layer.


Thanks.



On Wed, Nov 29, 2017 at 4:52 PM, Jan Filipiak 
wrote:


Hey,

you making some wrong assumptions here.
Kafka Streams is in no way single threaded or
limited to one physical instance.
Having connectivity issues to your brokers is IMO
a problem with the deployment and not at all
with how kafka streams is designed and works.

Kafka Streams moves hundreds of GB per day for us.

Hope this helps.

Best Jan



On 29.11.2017 15:10, Adrienne Kole wrote:


Hi,

The purpose of this email is to get overall intuition for the future
plans
of streams library.

The main question is that, will it be a single threaded application in

the

long run and serve microservices use-cases, or are there any plans to
extend it to multi-node execution framework with less kafka dependency.

Currently, each streams node 'talks' with kafka cluster and they can
indirectly talk with each other again through kafka. However, especially
if
kafka is not in the same network with streams nodes (actually this can
happen if they are in the same network as well) this will cause high
network overhead and inefficiency.

One solution for this (bypassing network overhead) is to deploy streams
node on kafka cluster to ensure the data locality. However, this is not
recommended as the library and kafka can affect each other's performance
and  streams does not necessarily have to know the internal data
partitioning of kafka.

Another solution would be extending streams library to have a common
runtime. IMO, preserving the current selling points of streams (like
dynamic scale in/out) with this kind of extensions can be very good
improvement.

So my question is that, will streams in the long/short run, will extend
its
use-cases to massive and efficient stream processing (and compete with
spark) or stay and strengthen its current position?

Cheers,
Adrienne









Re: Plans to extend streams?

2017-11-29 Thread Jan Filipiak

Hey,

you making some wrong assumptions here.
Kafka Streams is in no way single threaded or
limited to one physical instance.
Having connectivity issues to your brokers is IMO
a problem with the deployment and not at all
with how kafka streams is designed and works.

Kafka Streams moves hundreds of GB per day for us.

Hope this helps.

Best Jan


On 29.11.2017 15:10, Adrienne Kole wrote:

Hi,

The purpose of this email is to get overall intuition for the future  plans
of streams library.

The main question is that, will it be a single threaded application in the
long run and serve microservices use-cases, or are there any plans to
extend it to multi-node execution framework with less kafka dependency.

Currently, each streams node 'talks' with kafka cluster and they can
indirectly talk with each other again through kafka. However, especially if
kafka is not in the same network with streams nodes (actually this can
happen if they are in the same network as well) this will cause high
network overhead and inefficiency.

One solution for this (bypassing network overhead) is to deploy streams
node on kafka cluster to ensure the data locality. However, this is not
recommended as the library and kafka can affect each other's performance
and  streams does not necessarily have to know the internal data
partitioning of kafka.

Another solution would be extending streams library to have a common
runtime. IMO, preserving the current selling points of streams (like
dynamic scale in/out) with this kind of extensions can be very good
improvement.

So my question is that, will streams in the long/short run, will extend its
use-cases to massive and efficient stream processing (and compete with
spark) or stay and strengthen its current position?

Cheers,
Adrienne





Re: No. of Kafk Instances in Single machine

2017-11-06 Thread Jan Filipiak

Hi,

I probably would recommend you to go for 1 instance. You can bump a few 
thread configs to match your hardware better.


Best Jan

On 06.11.2017 12:23, chidigam . wrote:

Hi All,
Let say, I have big machine, which having 120GB RAM,  with lot of cores,
and very high disk capacity.

How many no. of kafka instances are recommended? Is there any general
principle I can apply, to calculate optimal no.

Any help in this regards is highly appreciated.

Regards
Bhanu





Re: [DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-26 Thread Jan Filipiak

Thanks for the remarks. hope I didn't miss any.
Not even sure if it makes sense to introduce A and B or just stick with 
"this ktable", "other ktable"


Thank you
Jan


On 27.10.2017 06:58, Ted Yu wrote:

Do you mind addressing my previous comments ?

http://search-hadoop.com/m/Kafka/uyzND1hzF8SRzUqb?subj=Re+DISCUSS+KIP+213+Support+non+key+joining+in+KTable

On Thu, Oct 26, 2017 at 9:38 PM, Jan Filipiak 
wrote:


Hello everyone,

this is the new discussion thread after the ID-clash.

Best
Jan

__


Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows the
Streams DSL to perform KTableKTable-Joins when the KTables have a
one-to-many relationship.
To make sure we cover the requirements of as many users as possible and
have a good solution afterwards I invite everyone to read through the KIP I
put together and discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+
Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is
needed to bring this feauture into kafka-streams. I am looking forward to
everyones opinion!

Please keep the discussion on the mailing list rather than commenting on
the wiki (wiki discussions get unwieldy fast).

Best
Jan







[DISCUSS] KIP-213 Support non-key joining in KTable

2017-10-25 Thread Jan Filipiak

Hello Kafka-users,

I want to continue with the development of KAFKA-3705, which allows the 
Streams DSL to perform KTableKTable-Joins when the KTables have a 
one-to-many relationship.
To make sure we cover the requirements of as many users as possible and 
have a good solution afterwards I invite everyone to read through the 
KIP I put together and

discuss it here in this Thread.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
https://issues.apache.org/jira/browse/KAFKA-3705
https://github.com/apache/kafka/pull/3720

I think a public discussion and vote on a solution is exactly what is 
needed to bring this feauture into kafka-streams. I am looking forward 
to everyones opinion!


Please keep the discussion on the mailing list rather than commenting on 
the wiki (wiki discussions get unwieldy fast).


Best
Jan








Re: Log Compaction Not Picking up Topic

2017-10-25 Thread Jan Filipiak

Hi,

unfortunatly there is nothing trivial you could do here.
Without upgrading your kafkas you can only bounce the partition back and 
forth

between brokers so they compact while its still small.

With upgrading you could also just cherrypick this very commit or put a 
logstatement to verify.


Given the Logsizes your dealing with, I am very confident that this is 
your issue.


Best Jan


On 25.10.2017 12:21, Elmar Weber wrote:

Hi,

On 10/25/2017 12:15 PM, Xin Li wrote:
> I think that is a bug, and  should be fixed in this task 
https://issues.apache.org/jira/browse/KAFKA-6030.
> We experience that in our kafka cluster, we just check out the 
11.0.2 version, build it ourselves.


thanks for the hint, as it looks like a calculation issue, would it be 
possible to verify this by manually changing the clean ratio or some 
other settings?


Best,
Elmar





Re: Kafka Connect Sink Connector for multiple JDBC sinks

2017-09-16 Thread Jan Filipiak

Hi,

entirely depends on how you want to serialize. You should be able to get 
everything running on Windows anyhow. Nothing expect the broker is 
really extensively using OS support for operating.


To answer your initial question: You would simply start multiple sinks 
and give each sink a different connect string. That should do what you 
want instantly


Best Jan

On 16.09.2017 22:51, M. Manna wrote:

Yes I have, I do need to build and run Schema Registry as a pre-requisite
isn't that correct? because the QuickStart seems to start AVRO - without
AVRO you need your own implementation of transformer/serdes etc.

I am only asking since my deployment platform is Windows Server 2012 - and
Confluent pkg is meant to be run on Linux. I guess there is a lot of manual
conversion I need to do here?

On 16 September 2017 at 21:43, Ted Yu  wrote:


Have you looked at https://github.com/confluentinc/kafka-connect-jdbc ?

On Sat, Sep 16, 2017 at 1:39 PM, M. Manna  wrote:


Sure. But all these are not available via Kafka open source (requires
manual coding), correct? Only Confluence seems to provide some
off-the-shelf connector but Confluent isn't compatible on Windows (yet),
also correct?



On 13 September 2017 at 18:11, Sreejith S  wrote:


This is possible. Once you have records in your put method, its up your
logic how you are redirecting it to multiple jdbc connections for
insertion.

In my use case i have implemented many to many sources and sinks.

Regards,
Srijith

On 13-Sep-2017 10:14 pm, "M. Manna"  wrote:

Hi,

I need a little help/suggestion if possible. Does anyone know if it's
possible in Kafka to develop a connector that can sink for multiple

JDBC

urls for the same topic (i.e. table) ?

The examples I can see on Confluent talks about one JDBC url

(one-to-one

sink). Would it be possible to achieve a one-to-many ?

What I am trying to do is the following:

1) Write to a topic
2) Sink it to multiple DBs (they all will have the same table).

Is this doable/correct way for Connect API?

Kindest Regards,





Re: How to clear a particular partition?

2017-08-13 Thread jan
I can't help you here but maybe can focus the question - why would you want to?

jan

On 10/08/2017, Sven Ludwig  wrote:
> Hello,
>
> assume that all producers and consumers regarding a topic-partition have
> been shutdown.
>
> Is it possible in this situation to empty that topic-partition, while the
> other topic-partitions keep working?
>
> Like for example, is it possible to trigger a log truncation to 0 on the
> leader for that partition using some admin tool?
>
> Kind Regards,
> Sven
>
>


Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-11 Thread Jan Filipiak

Inline

rather sparse for the lack of time.

Sadly I can't agree to any of your arguments and I _hate_ how its gonna 
look,

but we can't have this discussion for ever.

I think I explained everything in enough detail so my points can make sense.
if someone is interested and has specific questions, can always approach me.

Otherwise I am just going to drink the kool-aid now. :(

Best Jan

On 08.08.2017 20:37, Guozhang Wang wrote:

Hello Jan,

Thanks for your feedback. Trying to explain them a bit more here since I
think there are still a bit mis-communication here:

Here are a few things I need to clarify for KIP-182 first:

1. KIP-182 is mainly about refactoring the public APIs, NOT for making any
optimizations on the internal implementations. So we only care that these
public APIs changes do not forbid us to make the internal implementations
in the near future.

To give you a concrete example, as you mentioned that KTableValueGetterSupplier
is NOT used in IQ, and that a materialized physical store is always used
today. Yes that is true, and we do have plans to optimize this case soon;
for example, it is still doable with the proposed KIP-182 that we can
remove the physical materialized store but use KTableValueGetterSupplier to
read form a up-stream's physical store and apply the optimizations. Another
example you mentioned is stream-stream join, where each stream is
physically materialized into a store, we can definitely optimize this in
the future to remove the physical materialized store but use something
else, e.g. a in-memory buffer. Such optimizations are NOT blocked by the
updated public APIs of KIP-182.
One of the big goals of the refactoring at least was to get rid of the 
overloads
to make implementation of new features easier as one has not to take 
care about
all the overloads. Folding 2 Overloads into 1 with a Builder that has 2 
way of beeing build

wont help much here.

Having the DSL express very closely what happens will only help people 
not getting confused.
Having the store overload on every operation is just plain confusing 
right now.





2. One concern you raise that KIP-182 may actually block such
optimizations, is that if users do specify a StateStoreSupplier then we
cannot optimize that away. That is true, but that is by design: if user do
specify a state store supplier in Materialized API, that is equal to say
"forget about doing any smart things library, just use what I give to you".
In other words, the above mentioned optimizations can be applied if users
do not enforce any specific StateStoreSupplier, for example in

public static  Materialized
as(final String
storeName)

i.e. user only provide a store name, which is similar like handler token
for IQ; then the library still have the freedom to do smart things
internally which is totally hidden from the users. It is similar to, like
in RDBMS or some NoSQL stores like in HIVE / Cassandra: the store engine do
not have the freedom to do those query plan optimizations if users already
enforce the specs like join ordering, query plan generation rules, etc.
You call the same method with the builder build differently and its 
going todo
different things. That is my definition of unituitive + The code 
internally has to become
dead ugly as it needs to apply these optimisations basically in the same 
method call or
at the place the Builder is evaluated. This just cries for ugly internal 
code. There is no

way this can become pretty




3. About whether it is worthwhile to "not break the fluent interface", a
key point that we cannot just try to optimize one or two use cases, but
consider the whole user space, and ask what are the percentage of users
that may get affected. Note that in the DSL we have overloaded functions
where Materialized / Joined / other options are NOT needed so for most
normal users they do not need to worry about the specs at all.

So suppose there are only X% "advanced" users who would want to enforce
some state store suppliers, and Y% who like to use IQ, 100-X-Y percent of
normal users see no difference in terms of programming for either of these
two approaches: whether to separate the specs into a different set of APIs.
And for the Y percent of users they are most likely to just use the
simplest APIs which is `operator(..., Materialized.as(storeName))` which
does not sound too bad as to `table = operator(...);
table.materialize(storeName)`. In other words we use the first approach
then only X percent of users may have an awkward programming with complex
option specs along with the operator; if we use the second approach the X+Y
users need to break its programing fluency to call `table.materialize`
separately. And my personal guess is that

0 < X << Y < 1, and that X is very minor compared to Y. That is why I feel
this is a good trade-off.

The keypoint here is that It doesn't matter. Any sufficiently usefull 
topology
will get broken up by the u

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-04 Thread Jan Filipiak
ving them separatly will increase the readability of topologies by 
a lot IMO.

For these quick example Topologies that we have floating around in all places:
I am pretty sure one can go unbroken on them and usually the last table will be 
the one that
is needed for IQ then.


Thanks again. The second point really got me thinking, as your perspective on 
the importance
of "not break the fluent interface" was not clear to me. I hope I managed to 
line out why I
think it shouldn't have such a big weight in the discussion.

PS.: check out Hive CTE, everyone loves them and our Analytic team is crazy for 
them
because you can name them and that brings clarity. and you get rid of the 
nesting and can
split everything into logical chunks of SQL. KTable variables are the CTE of 
kafka streams.
One can probably sell this to people :)

Best Jan
Enjoyed your feedback! hope mine makes sense




On 03.08.2017 00:10, Guozhang Wang wrote:

Hello Jan,

Thanks for your proposal. As Bill mentioned the main difference is that we
extract the user-customizable materialization logic out of the topology
building DSL workflow. And the main motivations are in two folds:

1) efficiency wise, it allows some KTables to not be materialized if
unnecessary, saving one state store instance and changelog topic.

2) programming wise, it looks nicer to separate the topology construction
code from the KTable materialization for IQ uses code.


Here are my thoughts regarding these two points:

Regarding 1), I think with whichever the public APIs (either Damian's
proposal or yours), we can always apply the internal optimization to not
physically materialize the KTable. You can take a look at the internal
interface of "KTableValueGetterSupplier", which is used exactly for this
purposes such that a get call on a "logically" materialized KTable can be
traced back to its parent KTables that are physically materialized in a
state store. So following proposed APIs, for example:


stream.groupByKey(..).aggregate(.., Materializedas("store1"))//
this resulted KTable is materialized in order to complete the aggregation
operation
 .filter(Materialized.as("store2"))
// this restuled KTable is not materialized but its
GetterSupplier is implemented to get values from "store1"


Or

table1 = stream.groupByKey(..).aggregate(..);
table2 = table1.filter();

tabel1.queryHandle("store1");   // this resulted KTable is materialized
in order to complete the aggregation operation
tabel1.queryHandle("store2")// this restuled KTable is not
materialized but its GetterSupplier is implemented to get values from
"store1"



When user query a value for "store2" which is not actually materialized
into a state store, the GetterSupplier will be triggered to in turn query
the store for "store1", and then apply the filter operator on-the-fly to
return the value. So the bottom line is, we can achieve the same efficiency
optimization with either of the public APIs.


Regarding 2), I actually have proposed a similar API to yours earlier in
this discussion thread:

---

// specifying the topology, should be concise and conveniently
concatenated, no specs of materialization at all

KStream stream1 = builder.stream();
KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
sessionMerger, sessionWindows);  // do not allow to pass-in a state store
supplier here any more

// additional code to the topology above, could be more prescriptive
than descriptive
// only advanced users would want to code in both parts above; while other
users would only code the topology as above.

table1.materialize("queryableStoreName"); // or..
table1.materialize("queryableStoreName").enableCaching().enableLogging();
// or..
table1.materialize(stateStoreSupplier); // we check type (key-value types,
windowed or not etc) at starting time and add the metrics / logging /
caching / windowing wrapper on top of the store, or..
table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
etc..

---

But one caveat of that, as illustrated above, is that you need to have
separate object of the KTable in order to call either "queryHandle" or
"materialize" (whatever the function name is) for the specifications of
materialization options. This can break the concatenation of the topology
construction part of the code, that you cannot simply add one operator
directly after another. So I think this is a trade-off we have to make and
the current approach looks better in this regard.



Guozhang




On Wed, Aug 2, 2017 at 2:07 PM, Jan Filipiak
wrote:


Hi Bill,

totally! So in the original discussion it was mentioned that the overloads
are nasty when implementing new features. So we wanted to get r

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-02 Thread Jan Filipiak

Hi Bill,

totally! So in the original discussion it was mentioned that the 
overloads are nasty when implementing new features. So we wanted to get 
rid of them. But what I felt was that the
copy & pasted code in the KTableProcessors for maintaining IQ stores was 
as big as a hurdle as the overloads.


With this proposal I try to shift things into the direction of getting 
IQ for free if
KTableValueGetterSupplier is properly implemented (like getting join for 
free then). Instead of having the code for maintaining IQ stores all the 
places. I realized I can do that while getting rid of the overloads, 
that makes me feel my proposal is superior.


Further I try to optimize by using as few stores as possible to give the 
user what he needs. That should save all sorts of resources while 
allowing faster rebalances.


The target ultimately is to only have KTableSource and the Aggregators 
maintain a Store and provide a ValueGetterSupplier.


Does this makes sense to you?

Best Jan

On 02.08.2017 18:09, Bill Bejeck wrote:

Hi Jan,

Thanks for the effort in putting your thoughts down on paper.

Comparing what I see from your proposal and what is presented in 
KIP-182, one of the main differences is the exclusion of 
an`Materialized`  instance in the `KTable` methods.


Can you go into more detail why this is so and the specific problems 
is avoids and or solves with this approach?


Thanks!
Bill

On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy <mailto:damian@gmail.com>> wrote:


Hi Jan,

Thanks for taking the time to put this together, appreciated. For the
benefit of others would you mind explaining a bit about your
motivation?

Cheers,
Damian

On Wed, 2 Aug 2017 at 01:40 Jan Filipiak mailto:jan.filip...@trivago.com>> wrote:

> Hi all,
>
> after some further discussions, the best thing to show my Idea
of how it
> should evolve would be a bigger mock/interface description.
> The goal is to reduce the store maintaining processors to only the
> Aggregators + and KTableSource. While having KTableSource optionally
> materialized.
>
> Introducing KTable:copy() will allow users to maintain state
twice if
> they really want to. KStream::join*() wasn't touched. I never
personally
> used that so I didn't feel
> comfortable enough touching it. Currently still making up my
mind. None
> of the suggestions made it querieable so far. Gouzhangs
'Buffered' idea
> seems ideal here.
    >
> please have a look. Looking forward for your opinions.
>
> Best Jan
>
>
>
> On 21.06.2017 17 :24, Eno Thereska wrote:
> > (cc’ing user-list too)
> >
> > Given that we already have StateStoreSuppliers that are
configurable
> using the fluent-like API, probably it’s worth discussing the other
> examples with joins and serdes first since those have many
overloads and
> are in need of some TLC.
> >
> > So following your example, I guess you’d have something like:
> > .join()
> > .withKeySerdes(…)
> > .withValueSerdes(…)
> > .withJoinType(“outer”)
> >
> > etc?
> >
> > I like the approach since it still remains declarative and
it’d reduce
> the number of overloads by quite a bit.
> >
> > Eno
> >
> >> On Jun 21, 2017, at 3:37 PM, Damian Guy mailto:damian@gmail.com>> wrote:
> >>
> >> Hi,
> >>
> >> I'd like to get a discussion going around some of the API
choices we've
> >> made in the DLS. In particular those that relate to stateful
operations
> >> (though this could expand).
> >> As it stands we lean heavily on overloaded methods in the
API, i.e,
> there
> >> are 9 overloads for KGroupedStream.count(..)! It is becoming
noisy and i
> >> feel it is only going to get worse as we add more optional
params. In
> >> particular we've had some requests to be able to turn caching
off, or
> >> change log configs,  on a per operator basis (note this can
be done now
> if
> >> you pass in a StateStoreSupplier, but this can be a bit
cumbersome).
> >>
> >> So this is a bit of an open question. How can we change the DSL
> overloads
> >> so that it flows, is simple to use and understand, and is easily
> extended
> >> in the future?
> >>
> >> One option would be to use a fluent API approach for
providing the
> optional
> >> params, so something like this

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-01 Thread Jan Filipiak

Hi all,

after some further discussions, the best thing to show my Idea of how it 
should evolve would be a bigger mock/interface description.
The goal is to reduce the store maintaining processors to only the 
Aggregators + and KTableSource. While having KTableSource optionally 
materialized.


Introducing KTable:copy() will allow users to maintain state twice if 
they really want to. KStream::join*() wasn't touched. I never personally 
used that so I didn't feel
comfortable enough touching it. Currently still making up my mind. None 
of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea 
seems ideal here.


please have a look. Looking forward for your opinions.

Best Jan



On 21.06.2017 17:24, Eno Thereska wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable using the 
fluent-like API, probably it’s worth discussing the other examples with joins 
and serdes first since those have many overloads and are in need of some TLC.

So following your example, I guess you’d have something like:
.join()
.withKeySerdes(…)
.withValueSerdes(…)
.withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce the 
number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs,  on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).

So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?

One option would be to use a fluent API approach for providing the optional
params, so something like this:

groupedStream.count()
   .withStoreName("name")
   .withCachingEnabled(false)
   .withLoggingEnabled(config)
   .table()



Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())

Another option is to say: Hey we don't need this, what are you on about!

The above has focussed on state store related overloads, but the same ideas
could  be applied to joins etc, where we presently have many join methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian



@InterfaceStability.Evolving
public interface KTable {

KTable filter(final Predicate predicate);
KTable filterNot(final Predicate predicate);
 KTable mapValues(final ValueMapper 
mapper);

KStream toStream();

KTable copy(); Inserts a new KTableSource
KTable copy(Materialized m); inserts a new KTableSource using 
toStream() as parent


   //I see why, Id rather have users using to+table
KTable through(final String topic);
KTable through(Produced p,
 final String topic);

void to(final String topic);
void to(final Produced
final String topic);

 KGroupedTable groupBy(final KeyValueMapper> selector);
 KGroupedTable groupBy(final KeyValueMapper> selector, Serialized s);

 KTable join(final KTable other,
final ValueJoiner joiner);

 KTable leftJoin(final KTable other,
final ValueJoiner joiner);

 KTable outerJoin(final KTable other,
 final ValueJoiner joiner);

UninitializedQueryHandle QueryHandle(); // causes enable sending old 
value / materialize



//Currently marked deprecated, easily reproduced by map or similiar
void writeAsText(final String filePath);
void writeAsText(final String filePath,
 final String streamName);
void  writeAsText(final String filePath,
  final Serde keySerde,
  final Serde valSerde);
void writeAsText(final String filePath,
 final String streamName,
 final Serde keySerde,
 final Serde valSerde);
void foreach(final ForeachAction action);
}


public interface UninitializedQueryHandle{

QueryHandle initialize(KafkaStreams ks);
}

public interface QueryHandle {

V get(K k);

}

public interface Produced{

Produced static with();

Produced serializer(Serialized s);

Produced partitioner(StreamPartitionier sp);

//sneaky new feature. skip

Re: Need clarification on Kafka Usage within our product..

2017-08-01 Thread jan
Don't know if it helps but <https://kafka.apache.org/> says at the bottom

" The contents of this website are © 2016 Apache Software Foundation
under the terms of the Apache License v2. Apache Kafka, Kafka, and the
Kafka logo are either registered trademarks or trademarks of The
Apache Software Foundation in the United States and other countries. "

Also this which is the original proposition and voting for a kafka
logo. Perhaps the proposer can be contacted; he seems to be the one
that designed the logo
<https://issues.apache.org/jira/browse/KAFKA-982>

You could checkout <https://svn.apache.org/repos/asf/kafka/> and see
if it has any statements on logo use.

Also top 3 hits of <https://www.google.co.uk/search?q=use+logo+apache>
sound promising but I've not looked at them.

Best I can suggest ATM

jan

On 01/08/2017, Sunil, Rinu  wrote:
> Including another mail id which I found online.   Kindly help in addressing
> the below query.
>
>
>
> Thanks,
>
> Rinu
>
>
>
> From: Sunil, Rinu
> Sent: Monday, July 31, 2017 7:19 PM
> To: 'users@kafka.apache.org' 
> Subject: Need clarification on Kafka Usage within our product..
> Importance: High
>
>
>
> Hi,
>
>
>
> I have a question regarding the usage of Apache Kafka logo within our
> product Unisys Data Exchange WorkBench Application.Team is working on
> enhancing the product to support Kafka as Data Manage Type with XSD message
> format along with other database types like SQL Server, DMSII etc...   To
> help users easily distinguish the Kafka XSD Database in the tree view we
> have used Kafka logo with a blue overlapping strip with an "x" character to
> indicate XSD message format.  Could you please verify the below image
> highlighted with yellow border and confirm if its ok to use?  I could not
> find Kafka logo compliance guidance online.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Thanks,
>
> Rinu
>
>


Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-24 Thread Jan Filipiak

Hi Damian,

thanks for taking the time. I think you read my points individually but 
you seem to not understand the bigger picture I am trying to paint.


From the three problems I mentioned - and that you agreed on to be 
problems -  you are only trying to address the first.


What I am trying to tell you is that if you focus on the later two the 
first one comes for free. On the other hand if you focus on the first
and please allow me to call it the easy part. All you going to archive 
is to break user land and sugar coat the real problems.


This takes away overloads, but still leaves it a mess to implement new 
features. I am currently trying to prep a patch for Kafka-3705 and
I do not understand why I should deal with Interactive Queries what so 
ever. My Output table has a proper ValueGetterSupplier.

That should be it!

I hope I made clear that to improve here quite some hard work has been 
done and that it would be rewariding and that just sugar coating everything

is one of the worst steps we could take from where we are at the moment.

Looking at Kafka-5581 that you mentioned. None of the points are really 
related to what I am saying really. Each of these points is tricky and

requires some carefull thinking but might work out.

Further Looking at you comment that refers to KIP vs. DISCUSS. I don't 
know what I should understand from that.


Regarding your comment mentioning that getQueryHandle() wouldn't work. 
Its the same thing as giving the user a queryable string.
It works the same way with the only difference that we have a wrapper 
object that gives the user what he wants instantly! Instead of giving 
him a String
to get a Store, we just give him a store, plus we don't hand out some 
inflexible native types that we later on don't have control over.

The whole logic about partitioners and what else does not change.

Hope this makes my points more clear.

Best Jan


On 19.07.2017 12:03, Damian Guy wrote:

Hi Jan,

Thanks for your input. Comments inline

On Tue, 18 Jul 2017 at 15:21 Jan Filipiak  wrote:


Hi,


1. To many overloads:
Currently, whenever a KTable is the result of an operation it gets and
override with stateStoreName, and StatestoreSupplier in case people want
to query that.
This is what produces 2/3rd of the overloaded methods right now (not
counting methods returning KStream)



As you state further down we are trying to address this.



2. Code copy and pasting.
Almost all KTableProcessorSuppliers have the same block of (if(name !=
null) store.put(k,v))



Yes, i agree. That is related to the KTable queryable store etc, and can
easily be addressed, but isn't necessarily part of this as it doesn't need
to be a public interface change, i.e., we can clean that up in the
background.



3. Runtime inefficiencies.
Each querable table almost instantly causes a another store beeing
required. Storing equivalent data of upstream KTables.


Agreed. Again, this is not a public interface change. We don't need another
store, i.e., we can just use a "View" on the existing store, which is
basically just using the KTableValueGetter to apply the map or filter
operation to the original store. We also have this jira
https://issues.apache.org/jira/browse/KAFKA-5581 to look into optimizing
when we do and don't need to add additional changelogs.



So I really see us tackeling only the first part currently. Wich in my
opinion is to short-sighted to settle on an Public API.


We are not settling on the public API. We do, however need to do KIPs for
public API discussions. For internal changes we don't necessarily need to
have a public discussion about it.



This is why I want to tackle our approach to IQ-first, as it seems to me
to be the most disruptive thing. And the cause of most problems.

The Plan:

Table from topic, kstream (don't even like this one, but probaly needed
for some kind of enhanced flexibility) or aggregations would be the only
KTables that would get associated with a statestore (their processors).
For these operations one can have the "statestoresupplier" overload but
also not the "querablestatestore" overload. From this point on KTables
abstraction would be considered restored.
All the overloads of join and through with respect to IQ would go away.
"through" would go completely maybe the benefit added is. The method I
would add is for a table to get a Queryhandle.
This query handle will underneath remember its tables processor name. To
access the data form IQ we would not rely on the "per processor
statestore" but go the usual path through ValueGetterSupplier.
*Note:* We do not necessarily have a Serde for V, especially after
mapValues. also not for any intermediate Data types. It would be each
KTableProccesors job to provide a serialized version of upstream Datatypes.
KTableKTabkeJoinwould need to bring a JoinInputSerializer that
would serialize both upstream values for trans

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-18 Thread Jan Filipiak

Hi,

Sorry for the delay, couldn't get to answer more early. I do understand 
your point perfectly.
I just have a different perspective on what is going on. The most 
crucial piece of abstraction, the KTable is falling apart

and that materializes (no pun intended) itself into many problems.

1. To many overloads:
Currently, whenever a KTable is the result of an operation it gets and 
override with stateStoreName, and StatestoreSupplier in case people want 
to query that.
This is what produces 2/3rd of the overloaded methods right now (not 
counting methods returning KStream)


2. Code copy and pasting.
Almost all KTableProcessorSuppliers have the same block of (if(name != 
null) store.put(k,v))


3. Runtime inefficiencies.
Each querable table almost instantly causes a another store beeing 
required. Storing equivalent data of upstream KTables.


So I really see us tackeling only the first part currently. Wich in my 
opinion is to short-sighted to settle on an Public API.
This is why I want to tackle our approach to IQ-first, as it seems to me 
to be the most disruptive thing. And the cause of most problems.


The Plan:

Table from topic, kstream (don't even like this one, but probaly needed 
for some kind of enhanced flexibility) or aggregations would be the only 
KTables that would get associated with a statestore (their processors).
For these operations one can have the "statestoresupplier" overload but 
also not the "querablestatestore" overload. From this point on KTables 
abstraction would be considered restored.
All the overloads of join and through with respect to IQ would go away. 
"through" would go completely maybe the benefit added is. The method I 
would add is for a table to get a Queryhandle.
This query handle will underneath remember its tables processor name. To 
access the data form IQ we would not rely on the "per processor 
statestore" but go the usual path through ValueGetterSupplier.
*Note:* We do not necessarily have a Serde for V, especially after 
mapValues. also not for any intermediate Data types. It would be each 
KTableProccesors job to provide a serialized version of upstream Datatypes.
KTableKTabkeJoinwould need to bring a JoinInputSerializer that 
would serialize both upstream values for transport across boxes.


This first step would kill all the "Storename" based overloads + many 
Statestore overloads. It would also avoid the bloated copy pasting in 
each KTableProcessor for maintaining the store.
It would also make the runtime more efficient in a way that it does not 
store the same data twice, just for accessing from IQ. Tackeling problem 
1 but also all other three problems mentioned above.


From here ~3 or 4 (from kstream,topic or aggregate) methods would still 
be stuck with StateStoresupplier overload. For me, this is quite an 
improvement already, to reduce further overloads I am thinking
to put a nullable properties to this operations. If people want to use 
all defaults they could throw in null and it wouldn't be to painfull. 
That doesn't necessarily require
them to have config files laying around. They could if they wanted use 
property files to create such properties + we would over to look for 
configs in the streams property.
So the complexity of distributing property files is optional and the 
user might choose to fill the configs by code or files.


I think these steps can rescue the proper abstraction of a KTable. I 
believe that with the current proposals we are only sugarcoating problem 
1 and end up with a broken idea of what KTable is.
I think it will be even harder to develop further from there. Interface 
wise my proposal is like developing backwards as i am very certain we 
did a wrong turn with the IQ we shouldn't try to carry through.


I hope I could explain how this re factoring can tackle  the 3 above 
problems and especially why i don't think we can win tackiling only 
point 1 in the long run.
If anything would need an implementation draft please feel free to ask 
me to provide one. Initially the proposal hopefully would get the job 
done of just removing clutter.


Looking forward to your comments.

Best Jan



On 12.07.2017 21:27, Guozhang Wang wrote:

Hello Jan,

Thanks for your feedbacks. Let me try to clarify a few things with the 
problems that we are trying to resolve and the motivations with the 
current proposals.


As Matthias mentioned, one issue that we are trying to tackle is to 
reduce the number of overloaded functions in the DSL due to serde 
overridden / state store supplier overridden that are needed for 
repartition, or for state store materializations. Another related 
issue is that the current overridden state store supplier is not very 
natural to use, for example:


1) If a user just want to disable caching / logging etc but do not 
want to change the underlying store engine at all, she needs to learn 
to know that, for example, if a wi

Re: Where to run kafka-consumer-groups.sh from?

2017-07-11 Thread Jan Filipiak

Hi,

very likely due to timing. What problem is it causing you exactly that 
you want to work around?

These differences shouldn't concern you to much I guess.

We use the tool across continents and don't worry about it to much. 
Offset Commit interval makes everything blury anyways. If you can 
specify your pain more precisely maybe we can work around it.


Best Jan

On 10.07.2017 10:31, Dmitriy Vsekhvalnov wrote:

Guys, let me up this one again. Still looking for comments about
kafka-consumer-groups.sh
tool.

Thank you.

On Fri, Jul 7, 2017 at 3:14 PM, Dmitriy Vsekhvalnov 
wrote:


I've tried 3 brokers on command line, like that:

/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server
broker:9092,broker_2:9092,broker_3:9092 --new-consumer --group
logging-svc --describe

it doesn't make any difference, still x10 times difference in figures when
running on broker host vs. remote

Here is snippet from console output (are you looking something specific in
it? it looks normal a far as i can say):


TOPICPARTITION  CURRENT-OFFSET  LOG-END-OFFSETLAG
 CONSUMER-ID

test.topic  54 4304430935
  consumer-26-21f5050c-a43c-4254-bfcf-42e17dbdb651

test.topic  40 4426443610
 consumer-21-24f3ebca-004f-4aac-a348-638c9c6a02f0

test.topic  59 4414442063
  consumer-27-ed34f1b3-1be9-422b-bb07-e3c9913195c7

test.topic  42 4389440376
 consumer-22-75c2fc0a-5d5c-472d-b27e-e873030f82b6

test.topic  27 4416442224
  consumer-18-3be20568-8dd3-4679-a008-0ca64d31083c




On Fri, Jul 7, 2017 at 2:52 PM, M. Manna  wrote:


kafka-consumer-groups.sh --bootstrap-server broker:9092 --new-consumer
--group service-group --describe

how many brokers do you have in the cluster? if you have more than one,
list them all using a comma csv with --bootstrap-server.

Also, could you paste some results from the console printout?

On 7 July 2017 at 12:47, Dmitriy Vsekhvalnov 
wrote:


Hi all,

question about lag checking. We've tried to periodically sample consumer
lag with:

kafka-consumer-groups.sh --bootstrap-server broker:9092 --new-consumer
--group service-group --describe

it's all fine, but depending on host  we run it from it gives different
results.

E.g:

   - when running from one of the broker hosts itself we getting close

to 0

figures.

   - when running from remote host, we getting 30-60 in average (i

suspect

there are multiple remote calls to broker involved, so difference due to
timing).


My question is what is correct way to use it? From broker host itself?







Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-09 Thread Jan Filipiak
 is inline with my comment above.



also providing Serdes by config is neat. wouldn't even need to go into
the code then would also save a ton. (We have the defaults one in conf
why not override the specific ones?)

I am not sure, if Serdes are really a config? I mean, the data types are
hard coded into the code, so it does make sense to specify the Serdes
accordingly. I am also not sure how we would map Serdes from the config
to the corresponding operator?
true! maybe not an ideal case where configs help with overloading. I 
guess people are either using the global untyped one or a typed one for 
all steps.
So statestore is probably a better case. Its going to be referenced by a 
name always anyways so one could use this name to provide additional 
configs to the Statestore.

Probably also defining a factory used to build it.

Similarly a join has some sort of name, currently its 3 names, wich 
would need unifying to some degree, but then also the joins could be 
addressed with configs.
But Joins don't seem to have the to heaver overloading problem (Only 
store related :D).  But to be honest I can't judge the usefulness of 
outer and left. Not a pattern
that I came across yet for us its always inner. Maybe materialized but 
not sending old values is that what it does? Sorry can't wrap my head 
round that just now

heading towards 3am.

The example I provided was

streams.$applicationid.stores.$storename.inmemory = false
streams.$applicationid.stores.$storename.cachesize = 40k

for the configs. The Query Handle thing make sense hopefully.

Best Jan



-Matthias


On 7/8/17 2:23 AM, Jan Filipiak wrote:

Hi Matthias thanks,

Exactly what I was guessing.

I don't understand why custom stores in DSL? and I don't understand why
we are not concidering a more generic config based appraoch?

StateStores in DSL => what I really think we are looking for PAPA => DSL
=> PAPI  back and forth switcharoo capabilities.

Looking at the most overloaded that I can currently find "through()" 2
of them come from the broken idea of "the user provides a name for the
statestore for IQ" and custom statestores.
 From the beginning I said that's madness. That is the real disease we
need to fix IMHO. To be honest I also don't understand why through with
statestore is particularly usefull, second Unique Key maybe?

also providing Serdes by config is neat. wouldn't even need to go into
the code then would also save a ton. (We have the defaults one in conf
why not override the specific ones?)

Does this makes sense to people? what pieces should i outline with code
(time is currently sparse :( but I can pull of some smaller examples i
guess)

Best Jan





On 08.07.2017 01:23, Matthias J. Sax wrote:

It's too issues we want to tackle

   - too many overload (for some method we have already more than 10(
   - improve custom store API

-Matthias


On 7/7/17 3:42 PM, Jan Filipiak wrote:

It makes me want to cry.

why on earth is the DSL going to expose all its implementation
details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are looking
for a way to let the user switch back and forth between PAPI and DSL?

A change as the proposed would not eliminate any of my pain points while
still being a heck of work migrating towards to.

Since I am only following this from the point where Eno CC'ed it into
the users list:

Can someone please rephrase for me what problem this is trying to solve?
I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This helps
us nowhere in making the configs more flexible, its just syntactic
sugar.

A low effort shoot like: lets add a properties to operations that would
otherwise become overloaded to heavy? Or pull the configs by some naming
schema
form the overall properties. Additionally to that we get rid of
StateStoreSuppliers in the DSL and have them also configured by said
properties.

=> way easier to migrate to, way less risk, way more flexible in the
future (different implementations of the same operation don't require
code change to configure)

Line 184 makes especially no sense to me. what is a KTableKTable non
materialized join anyways?

Hope we can discuss more on this.



On 07.07.2017 17:23, Guozhang Wang wrote:

I messed the indentation on github code repos; this would be easier to
read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang 
wrote:


Hi Damian / Kyle,

I think I agree with you guys about the pros / cons of using the
builder
pattern v.s. using some "secondary classes". And I'm thinking if we
can
take a "mid" manner between these two. I spent some time with a slight
different approach from Damian's current proposal:

https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-08 Thread Jan Filipiak

Hi Matthias thanks,

Exactly what I was guessing.

I don't understand why custom stores in DSL? and I don't understand why 
we are not concidering a more generic config based appraoch?


StateStores in DSL => what I really think we are looking for PAPA => DSL 
=> PAPI  back and forth switcharoo capabilities.


Looking at the most overloaded that I can currently find "through()" 2 
of them come from the broken idea of "the user provides a name for the 
statestore for IQ" and custom statestores.
From the beginning I said that's madness. That is the real disease we 
need to fix IMHO. To be honest I also don't understand why through with 
statestore is particularly usefull, second Unique Key maybe?


also providing Serdes by config is neat. wouldn't even need to go into 
the code then would also save a ton. (We have the defaults one in conf 
why not override the specific ones?)


Does this makes sense to people? what pieces should i outline with code 
(time is currently sparse :( but I can pull of some smaller examples i 
guess)


Best Jan





On 08.07.2017 01:23, Matthias J. Sax wrote:

It's too issues we want to tackle

  - too many overload (for some method we have already more than 10(
  - improve custom store API

-Matthias


On 7/7/17 3:42 PM, Jan Filipiak wrote:

It makes me want to cry.

why on earth is the DSL going to expose all its implementation details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are looking
for a way to let the user switch back and forth between PAPI and DSL?

A change as the proposed would not eliminate any of my pain points while
still being a heck of work migrating towards to.

Since I am only following this from the point where Eno CC'ed it into
the users list:

Can someone please rephrase for me what problem this is trying to solve?
I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This helps
us nowhere in making the configs more flexible, its just syntactic sugar.

A low effort shoot like: lets add a properties to operations that would
otherwise become overloaded to heavy? Or pull the configs by some naming
schema
form the overall properties. Additionally to that we get rid of
StateStoreSuppliers in the DSL and have them also configured by said
properties.

=> way easier to migrate to, way less risk, way more flexible in the
future (different implementations of the same operation don't require
code change to configure)

Line 184 makes especially no sense to me. what is a KTableKTable non
materialized join anyways?

Hope we can discuss more on this.



On 07.07.2017 17:23, Guozhang Wang wrote:

I messed the indentation on github code repos; this would be easier to
read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang  wrote:


Hi Damian / Kyle,

I think I agree with you guys about the pros / cons of using the builder
pattern v.s. using some "secondary classes". And I'm thinking if we can
take a "mid" manner between these two. I spent some time with a slight
different approach from Damian's current proposal:

https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/

java/org/apache/kafka/streams/RefactoredAPIs.java

The key idea is to tolerate the final "table()" or "stream()"
function to
"upgrade" from the secondary classes to the first citizen classes, while
having all the specs inside this function. Also this proposal
includes some
other refactoring that people have been discussed about for the
builder to
reduce the overloaded functions as well. WDYT?


Guozhang


On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy  wrote:


Hi Jan,

Thanks very much for the input.

On Tue, 4 Jul 2017 at 08:54 Jan Filipiak 
wrote:


Hi Damian,

I do see your point of something needs to change. But I fully agree
with
Gouzhang when he says.
---

But since this is a incompatibility change, and we are going to remove

the

compatibility annotations soon it means we only have one chance and we
really have to make it right.




I think we all agree on this one! Hence the discussion.



I fear all suggestions do not go far enough to become something that

will

carry on for very much longer.
I am currently working on KAFKA-3705 and try to find the most easy way

for

the user to give me all the required functionality. The easiest

interface I

could come up so far can be looked at here.


https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2

de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L622
And its already horribly complicated. I am currently unable to find the

right abstraction level to have everything falling into place

naturally. To

be honest I already think introducing



To be fair that is not a p

Re: Hello, Help!

2017-07-07 Thread jan
Hi,
I'd is this the right place to ask about cockroachDB?

(well he started it, officer...)

jan

On 07/07/2017, David Garcia  wrote:
> “…events so timely that the bearing upon of which is not immediately
> apparent and are hidden from cognitive regard; the same so tardy, they
> herald apropos”
>
> On 7/7/17, 12:06 PM, "Marcelo Vinicius"  wrote:
>
> Hello, my name is Marcelo, and I am from Brazil. I'm doing a search on
> Kafka. I would like to know if this phrase: "Somehow I struggled
> against
> sensations that contained pure abstraction and no gesture directed at
> the
> present world", is it really kafka? If so, where do I find his phrase?
> In
> what text from kafka?
> Thank you!
>
> --
> *Marcelo Vinicius*
> Universidade Estadual de Feira de Santana - UEFS
> Facebook: www.facebook.com/marcelovinicius02
>
> "Não há poema em si, mas em mim ou em ti"
>
>
>


Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-07 Thread Jan Filipiak

It makes me want to cry.

why on earth is the DSL going to expose all its implementation details now?
especially being materialized or not.

If we want to take usefull steps in that direction maybe we are looking 
for a way to let the user switch back and forth between PAPI and DSL?


A change as the proposed would not eliminate any of my pain points while 
still being a heck of work migrating towards to.


Since I am only following this from the point where Eno CC'ed it into 
the users list:


Can someone please rephrase for me what problem this is trying to solve? 
I don't mean to be rude but It uses a problematic feature
"StateStoreSuppliers in DSL" to justify making it even worse. This helps 
us nowhere in making the configs more flexible, its just syntactic sugar.


A low effort shoot like: lets add a properties to operations that would 
otherwise become overloaded to heavy? Or pull the configs by some naming 
schema
form the overall properties. Additionally to that we get rid of 
StateStoreSuppliers in the DSL and have them also configured by said 
properties.


=> way easier to migrate to, way less risk, way more flexible in the 
future (different implementations of the same operation don't require 
code change to configure)


Line 184 makes especially no sense to me. what is a KTableKTable non 
materialized join anyways?


Hope we can discuss more on this.



On 07.07.2017 17:23, Guozhang Wang wrote:

I messed the indentation on github code repos; this would be easier to read:

https://codeshare.io/GLWW8K


Guozhang


On Fri, Jul 7, 2017 at 1:30 AM, Guozhang Wang  wrote:


Hi Damian / Kyle,

I think I agree with you guys about the pros / cons of using the builder
pattern v.s. using some "secondary classes". And I'm thinking if we can
take a "mid" manner between these two. I spent some time with a slight
different approach from Damian's current proposal:

https://github.com/guozhangwang/kafka/blob/dsl-refactor/streams/src/main/
java/org/apache/kafka/streams/RefactoredAPIs.java

The key idea is to tolerate the final "table()" or "stream()" function to
"upgrade" from the secondary classes to the first citizen classes, while
having all the specs inside this function. Also this proposal includes some
other refactoring that people have been discussed about for the builder to
reduce the overloaded functions as well. WDYT?


Guozhang


On Tue, Jul 4, 2017 at 1:40 AM, Damian Guy  wrote:


Hi Jan,

Thanks very much for the input.

On Tue, 4 Jul 2017 at 08:54 Jan Filipiak 
wrote:


Hi Damian,

I do see your point of something needs to change. But I fully agree with
Gouzhang when he says.
---

But since this is a incompatibility change, and we are going to remove

the

compatibility annotations soon it means we only have one chance and we
really have to make it right.




I think we all agree on this one! Hence the discussion.



I fear all suggestions do not go far enough to become something that

will

carry on for very much longer.
I am currently working on KAFKA-3705 and try to find the most easy way

for

the user to give me all the required functionality. The easiest

interface I

could come up so far can be looked at here.


https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2

de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L622



And its already horribly complicated. I am currently unable to find the

right abstraction level to have everything falling into place

naturally. To

be honest I already think introducing



To be fair that is not a particularly easy problem to solve!



https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2

de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/
kafka/streams/kstream/internals/KTableImpl.java#L493

was unideal and makes everything a mess.


I'm not sure i agree that it makes everything a mess, but It could have
been done differently.

The JoinType:Whatever is also not really flexible. 2 things come to my
mind:

1. I don't think we should rule out config based decisions say configs

like

 streams.$applicationID.joins.$joinname.conf = value


Is this just for config? Or are you suggesting that we could somehow
"code"
the join in a config file?



This can allow for tremendous changes without single API change and IMO

it

was not considered enough yet.

2. Push logic from the DSL to the Callback classes. A ValueJoiner for
example can be used to implement different join types as the user

wishes.
Do you have an example of how this might look?



As Gouzhang said: stopping to break users is very important.


Of course. We want to make it as easy as possible for people to use
streams.


especially with this changes + All the plans I sadly only have in my head

but hopefully the first link can give a glimpse.

Thanks for preparing the examples made it way clearer to me 

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-07-04 Thread Jan Filipiak

Hi Damian,

I do see your point of something needs to change. But I fully agree with 
Gouzhang when he says.

---

But since this is a incompatibility change, and we are going to remove the
compatibility annotations soon it means we only have one chance and we
really have to make it right.


I fear all suggestions do not go far enough to become something that will carry 
on for very much longer.
I am currently working on KAFKA-3705 and try to find the most easy way for the 
user to give me all the required functionality. The easiest interface I could 
come up so far can be looked at here.

https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L622

And its already horribly complicated. I am currently unable to find the right 
abstraction level to have everything falling into place naturally. To be honest 
I already think introducing

https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L493

was unideal and makes everything a mess. The JoinType:Whatever is also not 
really flexible. 2 things come to my mind:

1. I don't think we should rule out config based decisions say configs like
streams.$applicationID.joins.$joinname.conf = value
This can allow for tremendous changes without single API change and IMO it was 
not considered enough yet.

2. Push logic from the DSL to the Callback classes. A ValueJoiner for example 
can be used to implement different join types as the user wishes.

As Gouzhang said: stopping to break users is very important. especially with 
this changes + All the plans I sadly only have in my head but hopefully the 
first link can give a glimpse.

Thanks for preparing the examples made it way clearer to me what exactly we are 
talking about. I would argue to go a bit slower and more carefull on this one. 
At some point we need to get it right. Peeking over to the hadoop guys with 
their hughe userbase. Config files really work well for them.

Best Jan





On 30.06.2017 09:31, Damian Guy wrote:

Thanks Matthias

On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax  wrote:


I am just catching up on this thread, so sorry for the long email in
advance... Also, it's to some extend a dump of thoughts and not always a
clear proposal. Still need to think about this in more detail. But maybe
it helps other to get new ideas :)



However, I don't understand your argument about putting aggregate()
after the withXX() -- all the calls to withXX() set optional parameters
for aggregate() and not for groupBy() -- but a groupBy().withXX()
indicates that the withXX() belongs to the groupBy(). IMHO, this might
be quite confusion for developers.



I see what you are saying, but the grouped stream is effectively a no-op
until you call one of the aggregate/count/reduce etc functions. So the
optional params are ones that are applicable to any of the operations you
can perform on this grouped stream. Then the final
count()/reduce()/aggregate() call has any of the params that are
required/specific to that function.


I understand your argument, but you don't share the conclusion. If we
need a "final/terminal" call, the better way might be

.groupBy().count().withXX().build()

(with a better name for build() though)



The point is that all the other calls, i.e,withBlah, windowed, etc apply
too all the aggregate functions. The terminal call being the actual type of
aggregation you want to do. I personally find this more natural than
groupBy().count().withBlah().build()



groupedStream.count(/** non windowed count**/)
groupedStream.windowed(TimeWindows.of(10L)).count(...)
groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)


I like this. However, I don't see a reason to have windowed() and
sessionWindowed(). We should have one top-level `Windows` interface that
both `TimeWindows` and `SessionWindows` implement and just have a single
windowed() method that accepts all `Windows`. (I did not like the
separation of `SessionWindows` in the first place, and this seems to be
an opportunity to clean this up. It was hard to change when we
introduced session windows)


Yes - true we should look into that.



Btw: we do you the imperative groupBy() and groupByKey(), and thus we
might also want to use windowBy() (instead of windowed()). Not sure how
important this is, but it seems to be inconsistent otherwise.



Makes sense



About joins:  I don't like .withJoinType(JoinType.LEFT) at all. I think,
defining an inner/left/outer join is not an optional argument but a
first class concept and should have a proper representation in the API
(like the current methods join(), leftJoin, outerJoin()).



Yep, i did originally have it as a required param and maybe that is what we
go with. It could have a default, but maybe that is confusing.




About the tw

Re: [ERICSSON] - Trade Compliance: ECCN code for Apache Items

2017-06-28 Thread jan
@Martin Gainty - see my link:

"The Apache Software Foundation (ASF) is a 501(c)3 nonprofit charity
based in the United States of America. All of our products are
developed via online collaboration in public forums and distributed
from a central server within the U.S. Therefore, U.S. export laws and
regulations apply to our distributions and remain in force as products
and technology are re-exported to different parties and places around
the world. Information on export control classifications and
associated restrictions may be required for exporting, re-exporting,
record keeping, bundling/embedding of ASF products, encryption
reporting, and shipping documentation."

I agree with you, it seems bizarre, and wrong.

jan

On 28/06/2017, Martin Gainty  wrote:
>
> MG>am requesting clarification below
> 
> From: Axelle Margot 
> Sent: Tuesday, June 27, 2017 8:48 AM
> To: users@kafka.apache.org
> Cc: Celine Heerdt; Wassila BenJelloul.ext; Florent Poulain
> Subject: [ERICSSON] - Trade Compliance: ECCN code for Apache Items
>
>
> Hello,
>
> You were contacted as part of a new project in France.
>
> For all products you offer, HW and / or SW, we need, as usual, you provide
> some information about your products in order to better prepare the orders.
> So can you send us now the following information about your products:
>
>   *   EU ECCN Code: who define if the product is dual use or not.
>
>  This code is in the format:  Digit - Letter- Digit - Digit - Digit + an
> extension of Letter - Digit – Letter
>
> Example: 5D001.d.2.a to the SW or HW for 5A002.d.2.a
>
> Nota: Ericsson France needs the European ECCN Code, not the US ECCN Code.
>
>
> mg>kafka is licensed as OpenSource by ASF and not a commercial (for sale for
> profit) product  ..how is European/American ECCN applicable?
>
>
>   *   HST code or TARIC code: corresponding to the complete description of
> the property and to define the customs taxes
>   *
>
> mg>kafka is licensed as OpenSource by ASF and not a commercial (for sale for
> profit) product  ..how is HST applicable?
>
>
>
> If you can’t find the ECCN Product code:
> - If you are a reseller, you must contact your supplier as soon as possible
> to send us the information quickly.
> - If it’s your equipment, the responsibility of the classification is yours.
> You can refer to Regulation (EC) No 428/2009 of 5 May 2009, or for France
> office, you can also have a look on SBDU website (Service Of Dual-use)
> http://www.entreprises.gouv.fr/biens-double -usage / Home
>
>
>
> We need the EU ECCN Code and HST code for the following family product:
>
>
>
> Apache
>
>
> Kafka 0.10.2.1
>
>
> Zookeper 3.4.9
>
>
> Puppet client
>
>
>
>
> Regarding the ECCN Code, is this is a mass market product, thanks to precise
> us.
>
>
>
> Please find attached some file who can helps you.
>
> I remind you that in our internal data system we can’t record your items
> without the EU ECCN Code. This one is mandatory to valid the order.
>
>
>
> We need these information for the end of next week, the 7th of July.
>
> For Further information, please contact us.
> Best regards,
>
>
>
> Axelle MARGOT
>
> Trade Compliance Adviser / Controle Export
>
> ERICSSON FRANCE & ERICSSON MAGHREB
>
> 25, Avenue Carnot
> 91348 MASSY Cedex, France
>
> Phone : +33 1 81 87 44 11
>
> Mobile : +33 6 60 14 34 28
>
> Email : axelle.mar...@ericsson.com<mailto:axelle.mar...@ericsson.com>
>
> www.ericsson.com<http://www.ericsson.com/>
>
>
>
> [Description: Ericsson]
>
>
>
>
>
> Remember: a dual-use purpose:
> According to the usual definition, fall into this category "property,
> equipment - including technology, software, know-how immaterial or
> intangible - that could have both civilian and military uses or may - wholly
> or partly - contribute to the development, production, handling, operation,
> maintenance, storage, detection, identification, dissemination of weapons of
> mass destruction '' (WMD - nuclear, biological, chemical , etc.).
>
>
>
>
>
>
>
>
>
>
>
> From: Celine Heerdt
> Sent: lundi 26 juin 2017 15:36
> To: users@kafka.apache.org
> Cc: Axelle Margot 
> Subject: ECCN code for Kafka 0.10.2.1
>
>
>
> Hi,
>
> We in Ericsson are interested in the SW Kafka 0.10.2.1 from Apache. In order
> to begin the trade compliance process, could you please send us the ECCN
> codes for that SW.
>
> Best regards
>
> Céline Heerdt
>
>
>
>
>
> [Ericsson]<http://www.ericsson.com/>
>
> Céline Heerdt
>
> Project Manager
>
> Ericsson
> 25 Avenue Carnot
> 91348 Massy, France
> Mobile +33 6 71 53 92 02
> celine.hee...@ericsson.com<mailto:celine.hee...@ericsson.com>
> www.ericsson.com<http://www.ericsson.com/>
>
>
>


Re: [ERICSSON] - Trade Compliance: ECCN code for Apache Items

2017-06-27 Thread jan
I appreciate there may be a loss of subtlety traversing languages, but
this doesn't come over to politely.

I can't help you, the best I can find is
<http://www.apache.org/licenses/exports/>. This *may* be more helpful
than posting here although it covers none of the software you mention,
sorry, but maybe it's worth a look through that page.

I have to admit I'd never heard of ECCN classifications and am
surprised it even exists.

cheers

jan


On 27/06/2017, Axelle Margot  wrote:
> Hello,
>
> You were contacted as part of a new project in France.
>
> For all products you offer, HW and / or SW, we need, as usual, you provide
> some information about your products in order to better prepare the orders.
> So can you send us now the following information about your products:
>
>   *   EU ECCN Code: who define if the product is dual use or not.
>
>  This code is in the format:  Digit - Letter- Digit - Digit - Digit + an
> extension of Letter - Digit - Letter
>
> Example: 5D001.d.2.a to the SW or HW for 5A002.d.2.a
>
> Nota: Ericsson France needs the European ECCN Code, not the US ECCN Code.
>
>
>
>   *   HST code or TARIC code: corresponding to the complete description of
> the property and to define the customs taxes
>
>
>
> If you can't find the ECCN Product code:
> - If you are a reseller, you must contact your supplier as soon as possible
> to send us the information quickly.
> - If it's your equipment, the responsibility of the classification is yours.
> You can refer to Regulation (EC) No 428/2009 of 5 May 2009, or for France
> office, you can also have a look on SBDU website (Service Of Dual-use)
> http://www.entreprises.gouv.fr/biens-double -usage / Home
>
>
>
> We need the EU ECCN Code and HST code for the following family product:
>
>
> Apache
>
> Kafka 0.10.2.1
>
> Zookeper 3.4.9
>
> Puppet client
>
>
>
>
> Regarding the ECCN Code, is this is a mass market product, thanks to precise
> us.
>
>
>
> Please find attached some file who can helps you.
>
> I remind you that in our internal data system we can't record your items
> without the EU ECCN Code. This one is mandatory to valid the order.
>
>
>
> We need these information for the end of next week, the 7th of July.
>
> For Further information, please contact us.
> Best regards,
>
> Axelle MARGOT
> Trade Compliance Adviser / Controle Export
> ERICSSON FRANCE & ERICSSON MAGHREB
> 25, Avenue Carnot
> 91348 MASSY Cedex, France
> Phone : +33 1 81 87 44 11
> Mobile : +33 6 60 14 34 28
> Email : axelle.mar...@ericsson.com<mailto:axelle.mar...@ericsson.com>
> www.ericsson.com<http://www.ericsson.com/>
>
> [Description: Ericsson]
>
>
>
> Remember: a dual-use purpose:
> According to the usual definition, fall into this category "property,
> equipment - including technology, software, know-how immaterial or
> intangible - that could have both civilian and military uses or may - wholly
> or partly - contribute to the development, production, handling, operation,
> maintenance, storage, detection, identification, dissemination of weapons of
> mass destruction '' (WMD - nuclear, biological, chemical , etc.).
>
>
>
>
>
> From: Celine Heerdt
> Sent: lundi 26 juin 2017 15:36
> To: users@kafka.apache.org
> Cc: Axelle Margot 
> Subject: ECCN code for Kafka 0.10.2.1
>
> Hi,
> We in Ericsson are interested in the SW Kafka 0.10.2.1 from Apache. In order
> to begin the trade compliance process, could you please send us the ECCN
> codes for that SW.
> Best regards
> Céline Heerdt
>
>
> [Ericsson]<http://www.ericsson.com/>
> Céline Heerdt
> Project Manager
>
> Ericsson
> 25 Avenue Carnot
> 91348 Massy, France
> Mobile +33 6 71 53 92 02
> celine.hee...@ericsson.com<mailto:celine.hee...@ericsson.com>
> www.ericsson.com<http://www.ericsson.com/>
>
>


Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-24 Thread Jan Filipiak

I am with Gouzhang here.

I think all the suggestions are far to short-sighted. Especially this 
wired materialize(String) call is broken totally and people go nuts 
about how this will look. + Implementing more and better joins, not this 
wired one we got currently. Implementing an one to many join I couln't 
get away without 3 highly complex value mappers


   final ValueMapper 
keyExtractor,
   final ValueMapper 
joinPrefixFaker,
   final ValueMapper 
leftKeyExtractor,


in addition to the one joiner of course

   final ValueJoiner joiner,

how to specify if its outer or inner is for sure the smallest problem we 
are going to face with proper join semantics. What the resulting Key 
will be is is also highly discussable. What happens to the key is very 
complex and the API has to tell the user.


Bringing this discussion into a good direction, we would need sample 
interfaces we could mock against ( as gouzhang suggested) + We need to 
know how the implementation (of joins especially) will be later. As I 
strongly recommend stopping the usage of ChangeSerde and have "properly" 
repartitioned topic. That is just sane IMO


Best Jan




On 22.06.2017 11:54, Eno Thereska wrote:

Note that while I agree with the initial proposal (withKeySerdes, withJoinType, 
etc), I don't agree with things like .materialize(), .enableCaching(), 
.enableLogging().

The former maintain the declarative DSL, while the later break the declarative 
part by mixing system decisions in the DSL.  I think there is a difference 
between the two proposals.

Eno


On 22 Jun 2017, at 03:46, Guozhang Wang  wrote:

I have been thinking about reducing all these overloaded functions for
stateful operations (there are some other places that introduces overloaded
functions but let's focus on these only in this discussion), what I used to
have is to use some "materialize" function on the KTables, like:

---

// specifying the topology

KStream stream1 = builder.stream();
KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
sessionMerger, sessionWindows);  // do not allow to pass-in a state store
supplier here any more

// additional specs along with the topology above

table1.materialize("queryableStoreName"); // or..
table1.materialize("queryableStoreName").enableCaching().enableLogging();
// or..
table1.materialize(stateStoreSupplier); // add the metrics / logging /
caching / windowing functionalities on top of the store, or..
table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
etc..

---

But thinking about it more, I feel Damian's first proposal is better since
my proposal would likely to break the concatenation (e.g. we may not be
able to do sth. like "table1.filter().map().groupBy().aggregate()" if we
want to use different specs for the intermediate filtered KTable).


But since this is a incompatibility change, and we are going to remove the
compatibility annotations soon it means we only have one chance and we
really have to make it right. So I'd call out for anyone try to rewrite
your examples / demo code with the proposed new API and see if it feel
natural, for example, if I want to use a different storage engine than the
default rockDB engine how could I easily specify that with the proposed
APIs?

Meanwhile Damian could you provide a formal set of APIs for people to
exercise on them? Also could you briefly describe how custom storage
engines could be swapped in with the above APIs?



Guozhang


On Wed, Jun 21, 2017 at 9:08 AM, Eno Thereska 
wrote:


To make it clear, it’s outlined by Damian, I just copy pasted what he told
me in person :)

Eno


On Jun 21, 2017, at 4:40 PM, Bill Bejeck  wrote:

+1 for the approach outlined above by Eno.

On Wed, Jun 21, 2017 at 11:28 AM, Damian Guy 

wrote:

Thanks Eno.

Yes i agree. We could apply this same approach to most of the operations
where we have multiple overloads, i.e., we have a single method for each
operation that takes the required parameters and everything else is
specified as you have done above.

On Wed, 21 Jun 2017 at 16:24 Eno Thereska 

wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable

using

the fluent-like API, probably it’s worth discussing the other examples

with

joins and serdes first since those have many overloads and are in need

of

some TLC.

So following your example, I guess you’d have something like:
.join()
  .withKeySerdes(…)
  .withValueSerdes(…)
  .withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce

the

number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discus

Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-06-22 Thread Jan Filipiak

Hi Eno,

I am less interested in the user facing interface but more in the actual 
implementation. Any hints where I can follow the discussion on this? As 
I still want to discuss upstreaming of KAFKA-3705 with someone


Best Jan


On 21.06.2017 17:24, Eno Thereska wrote:

(cc’ing user-list too)

Given that we already have StateStoreSuppliers that are configurable using the 
fluent-like API, probably it’s worth discussing the other examples with joins 
and serdes first since those have many overloads and are in need of some TLC.

So following your example, I guess you’d have something like:
.join()
.withKeySerdes(…)
.withValueSerdes(…)
.withJoinType(“outer”)

etc?

I like the approach since it still remains declarative and it’d reduce the 
number of overloads by quite a bit.

Eno


On Jun 21, 2017, at 3:37 PM, Damian Guy  wrote:

Hi,

I'd like to get a discussion going around some of the API choices we've
made in the DLS. In particular those that relate to stateful operations
(though this could expand).
As it stands we lean heavily on overloaded methods in the API, i.e, there
are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
feel it is only going to get worse as we add more optional params. In
particular we've had some requests to be able to turn caching off, or
change log configs,  on a per operator basis (note this can be done now if
you pass in a StateStoreSupplier, but this can be a bit cumbersome).

So this is a bit of an open question. How can we change the DSL overloads
so that it flows, is simple to use and understand, and is easily extended
in the future?

One option would be to use a fluent API approach for providing the optional
params, so something like this:

groupedStream.count()
   .withStoreName("name")
   .withCachingEnabled(false)
   .withLoggingEnabled(config)
   .table()



Another option would be to provide a Builder to the count method, so it
would look something like this:
groupedStream.count(new
CountBuilder("storeName").withCachingEnabled(false).build())

Another option is to say: Hey we don't need this, what are you on about!

The above has focussed on state store related overloads, but the same ideas
could  be applied to joins etc, where we presently have many join methods
and many overloads.

Anyway, i look forward to hearing your opinions.

Thanks,
Damian




Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Jan Filipiak

Depends, embedded postgress puts you into the same spot.

But if you use your state store change log to materialize into a 
postgress; that might work out decently.
Current JDBC doesn't support delete which is an issue but writing a 
custom sink is not to hard.


Best Jan


On 07.06.2017 23:47, Steven Schlansker wrote:

I was actually considering writing my own KeyValueStore backed
by e.g. a Postgres or the like.

Is there some feature Connect gains me that would make it better
than such an approach?

thanks


On Jun 7, 2017, at 2:20 PM, Jan Filipiak  wrote:

Hi,

have you thought about using connect to put data into a store that is more 
reasonable for your kind of query requirements?

Best Jan

On 07.06.2017 00:29, Steven Schlansker wrote:

On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:

Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems.

Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.


You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.

Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*


HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
wrote:


On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:

Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting

null() the code will have to retry by refreshing the metadata and going to
the new instance. If you don’t know beforehand if a key exists or not you
might have to check all instances of a store to make sure.
No, I am not presupposing that the key can exist -- this is a user visible
API and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not
truly sufficient,
as querying different all workers at different times in the presence of
migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some
day being
cool like Jepsen :)


Eno



On Jun 5, 2017, at 10:12 PM, Steven Schlansker <

sschlans...@opentable.com> wrote:

Hi everyone, me again :)

I'm still trying to implement my "remoting" layer that allows
my clients to see the partitioned Kafka Streams state
regardless of which instance they hit.  Roughly, my lookup is:

Message get(Key key) {
  RemoteInstance instance = selectPartition(key);
  return instance.get(key); // http remoting
}

RemoteInstance.get(Key key) { // http endpoint
  return readOnlyKeyValueStore.get(key);
}

However, the mapping of partitions to instances may change.
If you call KeyValueStore.get(K) where K is on a partition you
don't own, it returns null.  This is indistinguishable from a
successful get on a key that doesn't exist.

If one instance selects a sibling instance right as the partition is

failing

off of that instance, it may get routed there and by the time it gets
the request no longer "owns" the partition -- returns a false 'null'.

You can try re-checking after you get a null value, but that's

susceptible

to the same race -- it's unlikely but possible that the data migrates

*back*

before you do this re-check.

Is there any way to correctly implement this without races?  I'd imagine
you need a new primitive like KeyValueStore#get that atomically finds
the key or throws an exception if it is not in an owned partition
at the time of lookup so you know to recheck the partition and retry.

Thoughts?

Thanks again,
Steven





Re: Reliably implementing global KeyValueStore#get

2017-06-07 Thread Jan Filipiak

Hi,

have you thought about using connect to put data into a store that is 
more reasonable for your kind of query requirements?


Best Jan

On 07.06.2017 00:29, Steven Schlansker wrote:

On Jun 6, 2017, at 2:52 PM, Damian Guy  wrote:

Steven,

In practice, data shouldn't be migrating that often. If it is then you
probably have bigger problems.

Understood and agreed, but when designing distributed systems, it usually
helps to model for the worst case rather than the "well that should never
happen" case, lest you find yourself fixing those bugs at 3am instead :)

I'd like to be able to induce extreme pain at the Kafka layer (change leader
every 3 seconds and migrate all partitions around randomly) and still have
my app behave correctly.


You should be able to use the metadata api
to find the instance the key should be on and then when you check that node
you can also check with the metadata api that the key should still be on
this host. If streams is rebalancing while you query an exception will be
raised and you'll need to retry the request once the rebalance has
completed.

Agreed here as well.  But let's assume I have a very fast replication
setup (assume it takes zero time, for the sake of argument) -- I'm fairly
sure there's still a race here as this exception only fires *during a migration*
not *after a migration that may have invalidated your metadata lookup completes*


HTH,
Damian

On Tue, 6 Jun 2017 at 18:11 Steven Schlansker 
wrote:


On Jun 6, 2017, at 6:16 AM, Eno Thereska  wrote:

Hi Steven,

Do you know beforehand if a key exists? If you know that and are getting

null() the code will have to retry by refreshing the metadata and going to
the new instance. If you don’t know beforehand if a key exists or not you
might have to check all instances of a store to make sure.
No, I am not presupposing that the key can exist -- this is a user visible
API and will
be prone to "accidents" :)

Thanks for the insight.  I worry that even checking all stores is not
truly sufficient,
as querying different all workers at different times in the presence of
migrating data
can still in theory miss it given pessimal execution.

I'm sure I've long wandered off into the hypothetical, but I dream of some
day being
cool like Jepsen :)


Eno



On Jun 5, 2017, at 10:12 PM, Steven Schlansker <

sschlans...@opentable.com> wrote:

Hi everyone, me again :)

I'm still trying to implement my "remoting" layer that allows
my clients to see the partitioned Kafka Streams state
regardless of which instance they hit.  Roughly, my lookup is:

Message get(Key key) {
  RemoteInstance instance = selectPartition(key);
  return instance.get(key); // http remoting
}

RemoteInstance.get(Key key) { // http endpoint
  return readOnlyKeyValueStore.get(key);
}

However, the mapping of partitions to instances may change.
If you call KeyValueStore.get(K) where K is on a partition you
don't own, it returns null.  This is indistinguishable from a
successful get on a key that doesn't exist.

If one instance selects a sibling instance right as the partition is

failing

off of that instance, it may get routed there and by the time it gets
the request no longer "owns" the partition -- returns a false 'null'.

You can try re-checking after you get a null value, but that's

susceptible

to the same race -- it's unlikely but possible that the data migrates

*back*

before you do this re-check.

Is there any way to correctly implement this without races?  I'd imagine
you need a new primitive like KeyValueStore#get that atomically finds
the key or throws an exception if it is not in an owned partition
at the time of lookup so you know to recheck the partition and retry.

Thoughts?

Thanks again,
Steven







Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-07 Thread Jan Filipiak

Hi Eno,

On 07.06.2017 22:49, Eno Thereska wrote:

Comments inline:


On 5 Jun 2017, at 18:19, Jan Filipiak  wrote:

Hi

just my few thoughts

On 05.06.2017 11:44, Eno Thereska wrote:

Hi there,

Sorry for the late reply, I was out this past week. Looks like good progress 
was made with the discussions either way. Let me recap a couple of points I saw 
into one big reply:

1. Jan mentioned CRC errors. I think this is a good point. As these happen in 
Kafka, before Kafka Streams gets a chance to inspect anything, I'd like to hear 
the opinion of more Kafka folks like Ismael or Jason on this one. Currently the 
documentation is not great with what to do once a CRC check has failed. From 
looking at the code, it looks like the client gets a KafkaException (bubbled up 
from the fetcher) and currently we in streams catch this as part of poll() and 
fail. It might be advantageous to treat CRC handling in a similar way to 
serialisation handling (e.g., have the option to fail/skip). Let's see what the 
other folks say. Worst-case we can do a separate KIP for that if it proved too 
hard to do in one go.

there is no reasonable way to "skip" a crc error. How can you know the length 
you read was anything reasonable? you might be completely lost inside your response.

On the client side, every record received is checked for validity. As it 
happens, if the CRC check fails the exception is wrapped with a KafkaException 
that is thrown all the way to poll(). Assuming we change that and poll() throws 
a CRC exception, I was thinking we could treat it similarly to a deserialize 
exception and pass it to the exception handler to decide what to do. Default 
would be to fail. This might need a Kafka KIP btw and can be done separately 
from this KIP, but Jan, would you find this useful?
I don't think so. IMO you can not reasonably continue parsing when the 
checksum of a message is not correct. If you are not sure you got the 
correct length, how can you be sure to find the next record? I would 
always straight fail in all cases. Its to hard for me to understand why 
one would try to continue. I mentioned CRC's because thats the only bad 
pills I ever saw so far. But I am happy that it just stopped and I could 
check what was going on. This will also be invasive in the client code then.


If you ask me, I am always going to vote for "grind to halt" let the 
developers see what happened and let them fix it. It helps building good 
kafka experiences and better software and architectures. For me this is: 
"force the user todo the right thing". 
https://youtu.be/aAb7hSCtvGw?t=374 eg. not letting unexpected input slip 
by.  Letting unexpected input slip by is what bought us 15+years of war 
of all sorts of ingestion attacks. I don't even dare to estimate how 
many missingrecords-search-teams going be formed, maybe some hackerone 
for stream apps :D


Best Jan




At a minimum, handling this type of exception will need to involve the 
exactly-once (EoS) logic. We'd still allow the option of failing or skipping, 
but EoS would need to clean up by rolling back all the side effects from the 
processing so far. Matthias, how does this sound?

Eos will not help the record might be 5,6 repartitions down into the topology. 
I haven't followed but I pray you made EoS optional! We don't need this and we 
don't want this and we will turn it off if it comes. So I wouldn't recommend 
relying on it. The option to turn it off is better than forcing it and still 
beeing unable to rollback badpills (as explained before)

Yeah as Matthias mentioned EoS is optional.

Thanks,
Eno



6. Will add an end-to-end example as Michael suggested.

Thanks
Eno




On 4 Jun 2017, at 02:35, Matthias J. Sax  wrote:

What I don't understand is this:


 From there on its the easiest way forward: fix, redeploy, start => done

If you have many producers that work fine and a new "bad" producer
starts up and writes bad data into your input topic, your Streams app
dies but all your producers, including the bad one, keep writing.

Thus, how would you fix this, as you cannot "remove" the corrupted date
from the topic? It might take some time to identify the root cause and
stop the bad producer. Up to this point you get good and bad data into
your Streams input topic. If Streams app in not able to skip over those
bad records, how would you get all the good data from the topic? Not
saying it's not possible, but it's extra work copying the data with a
new non-Streams consumer-producer-app into a new topic and than feed
your Streams app from this new topic -- you also need to update all your
upstream producers to write to the new topic.

Thus, if you want to fail fast, you can still do this. And after you
detected and fixed the bad producer you might just reconfigure your app
to skip bad records until it reaches the good part of the data.
Afterwards, you 

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-05 Thread Jan Filipiak

Hi

just my few thoughts

On 05.06.2017 11:44, Eno Thereska wrote:

Hi there,

Sorry for the late reply, I was out this past week. Looks like good progress 
was made with the discussions either way. Let me recap a couple of points I saw 
into one big reply:

1. Jan mentioned CRC errors. I think this is a good point. As these happen in 
Kafka, before Kafka Streams gets a chance to inspect anything, I'd like to hear 
the opinion of more Kafka folks like Ismael or Jason on this one. Currently the 
documentation is not great with what to do once a CRC check has failed. From 
looking at the code, it looks like the client gets a KafkaException (bubbled up 
from the fetcher) and currently we in streams catch this as part of poll() and 
fail. It might be advantageous to treat CRC handling in a similar way to 
serialisation handling (e.g., have the option to fail/skip). Let's see what the 
other folks say. Worst-case we can do a separate KIP for that if it proved too 
hard to do in one go.
there is no reasonable way to "skip" a crc error. How can you know the 
length you read was anything reasonable? you might be completely lost 
inside your response.

2. Damian has convinced me that the KIP should just be for deserialisation from 
the network, not from local state store DBs. For the latter we'll follow the 
current way of failing since the DB is likely corrupt.

3. Dead letter queue option. There was never any intention here to do anything 
super clever like attempt to re-inject the failed records from the dead letter 
queue back into the system. Reasoning about when that'd be useful in light of 
all sorts of semantic breakings would be hard (arguably impossible). The idea 
was to just have a place to have all these dead records to help with subsequent 
debugging. We could also just log a whole bunch of info for a poison pill 
record and not have a dead letter queue at all. Perhaps that's a better, 
simpler, starting point.

+1


4. Agree with Jay on style, a DefaultHandler with some config options. Will add 
options to KIP. Also as part of this let's remove the threshold logger since it 
gets complex and arguably the ROI is low.

5. Jay's JSON example, where serialisation passes but the JSON message doesn't 
have the expected fields, is an interesting one. It's a bit complicated to 
handle this in the middle of processing. For example, some operators in the DAG 
might actually find the needed JSON fields and make progress, but other 
operators, for the same record, might not find their fields and will throw an 
exception.

At a minimum, handling this type of exception will need to involve the 
exactly-once (EoS) logic. We'd still allow the option of failing or skipping, 
but EoS would need to clean up by rolling back all the side effects from the 
processing so far. Matthias, how does this sound?
Eos will not help the record might be 5,6 repartitions down into the 
topology. I haven't followed but I pray you made EoS optional! We don't 
need this and we don't want this and we will turn it off if it comes. So 
I wouldn't recommend relying on it. The option to turn it off is better 
than forcing it and still beeing unable to rollback badpills (as 
explained before)


6. Will add an end-to-end example as Michael suggested.

Thanks
Eno




On 4 Jun 2017, at 02:35, Matthias J. Sax  wrote:

What I don't understand is this:


 From there on its the easiest way forward: fix, redeploy, start => done

If you have many producers that work fine and a new "bad" producer
starts up and writes bad data into your input topic, your Streams app
dies but all your producers, including the bad one, keep writing.

Thus, how would you fix this, as you cannot "remove" the corrupted date
from the topic? It might take some time to identify the root cause and
stop the bad producer. Up to this point you get good and bad data into
your Streams input topic. If Streams app in not able to skip over those
bad records, how would you get all the good data from the topic? Not
saying it's not possible, but it's extra work copying the data with a
new non-Streams consumer-producer-app into a new topic and than feed
your Streams app from this new topic -- you also need to update all your
upstream producers to write to the new topic.

Thus, if you want to fail fast, you can still do this. And after you
detected and fixed the bad producer you might just reconfigure your app
to skip bad records until it reaches the good part of the data.
Afterwards, you could redeploy with fail-fast again.


Thus, for this pattern, I actually don't see any reason why to stop the
Streams app at all. If you have a callback, and use the callback to
raise an alert (and maybe get the bad data into a bad record queue), it
will not take longer to identify and stop the "bad" producer. But for
this case, you have zero downtime for your Streams

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-03 Thread Jan Filipiak

Could not agree more!

But then I think the easiest is still: print exception and die.
From there on its the easiest way forward: fix, redeploy, start => done

All the other ways to recover a pipeline that was processing partially 
all the time
and suddenly went over a "I cant take it anymore" threshold is not 
straight forward IMO.


How to find the offset, when it became to bad when it is not the latest 
commited one?

How to reset there? with some reasonable stuff in your rockses?

If one would do the following. The continuing Handler would measure for 
a threshold and
would terminate after a certain threshold has passed (per task). Then 
one can use offset commit/ flush intervals
to make reasonable assumption of how much is slipping by + you get an 
easy recovery when it gets to bad

+ you could also account for "in processing" records.

Setting this threshold to zero would cover all cases with 1 
implementation. It is still beneficial to have it pluggable


Again CRC-Errors are the only bad pills we saw in production for now.

Best Jan


On 02.06.2017 17:37, Jay Kreps wrote:

Jan, I agree with you philosophically. I think one practical challenge has
to do with data formats. Many people use untyped events, so there is simply
no guarantee on the form of the input. E.g. many companies use JSON without
any kind of schema so it becomes very hard to assert anything about the
input which makes these programs very fragile to the "one accidental
message publication that creates an unsolvable problem.

For that reason I do wonder if limiting to just serialization actually gets
you a useful solution. For JSON it will help with the problem of
non-parseable JSON, but sounds like it won't help in the case where the
JSON is well-formed but does not have any of the fields you expect and
depend on for your processing. I expect the reason for limiting the scope
is it is pretty hard to reason about correctness for anything that stops in
the middle of processing an operator DAG?

-Jay

On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak 
wrote:


IMHO your doing it wrong then. + building to much support into the kafka
eco system is very counterproductive in fostering a happy userbase



On 02.06.2017 13:15, Damian Guy wrote:


Jan, you have a choice to Fail fast if you want. This is about giving
people options and there are times when you don't want to fail fast.


On Fri, 2 Jun 2017 at 11:00 Jan Filipiak 
wrote:

Hi

1.
That greatly complicates monitoring.  Fail Fast gives you that when you
monitor only the lag of all your apps
you are completely covered. With that sort of new application Monitoring
is very much more complicated as
you know need to monitor fail % of some special apps aswell. In my
opinion that is a huge downside already.

2.
using a schema regerstry like Avrostuff it might not even be the record
that is broken, it might be just your app
unable to fetch a schema it needs now know. Maybe you got partitioned
away from that registry.

3. When you get alerted because of to high fail percentage. what are the
steps you gonna do?
shut it down to buy time. fix the problem. spend way to much time to
find a good reprocess offset.
Your timewindows are in bad shape anyways, and you pretty much lost.
This routine is nonsense.

Dead letter queues would be the worst possible addition to the kafka
toolkit that I can think of. It just doesn't fit the architecture
of having clients falling behind is a valid option.

Further. I mentioned already the only bad pill ive seen so far is crc
errors. any plans for those?

Best Jan






On 02.06.2017 11:34, Damian Guy wrote:


I agree with what Matthias has said w.r.t failing fast. There are plenty


of


times when you don't want to fail-fast and must attempt to  make


progress.


The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax 


wrote:


First a meta comment. KIP discussion should take place on the dev list

-- if user list is cc'ed please make sure to reply to both lists.


Thanks.
Thanks for making the scope of the KIP clear. Makes a lot of sense to

focus on deserialization exceptions for now.

With regard to corrupted state stores, would it make sense to fail a
task and wipe out the store to repair it via recreation from the
changelog? That's of course a quite advance pattern, but I want to
bring
it up to design the first step in a way such that we can get there (if
we think it's a reasonable idea).

I also want to comment about fail fast vs making progress. I think that
fail-fast must not always be the best option. The scenario I have in
mind is like this: you got a bunch of producers that feed the Streams
input topic. Most producers work find, but maybe one producer miss
behaves and the data it writes is corrupted. You might not even be able
to recover this lost data at any point -

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Jan Filipiak
IMHO your doing it wrong then. + building to much support into the kafka 
eco system is very counterproductive in fostering a happy userbase



On 02.06.2017 13:15, Damian Guy wrote:

Jan, you have a choice to Fail fast if you want. This is about giving
people options and there are times when you don't want to fail fast.


On Fri, 2 Jun 2017 at 11:00 Jan Filipiak  wrote:


Hi

1.
That greatly complicates monitoring.  Fail Fast gives you that when you
monitor only the lag of all your apps
you are completely covered. With that sort of new application Monitoring
is very much more complicated as
you know need to monitor fail % of some special apps aswell. In my
opinion that is a huge downside already.

2.
using a schema regerstry like Avrostuff it might not even be the record
that is broken, it might be just your app
unable to fetch a schema it needs now know. Maybe you got partitioned
away from that registry.

3. When you get alerted because of to high fail percentage. what are the
steps you gonna do?
shut it down to buy time. fix the problem. spend way to much time to
find a good reprocess offset.
Your timewindows are in bad shape anyways, and you pretty much lost.
This routine is nonsense.

Dead letter queues would be the worst possible addition to the kafka
toolkit that I can think of. It just doesn't fit the architecture
of having clients falling behind is a valid option.

Further. I mentioned already the only bad pill ive seen so far is crc
errors. any plans for those?

Best Jan






On 02.06.2017 11:34, Damian Guy wrote:

I agree with what Matthias has said w.r.t failing fast. There are plenty

of

times when you don't want to fail-fast and must attempt to  make

progress.

The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax 

wrote:

First a meta comment. KIP discussion should take place on the dev list
-- if user list is cc'ed please make sure to reply to both lists.

Thanks.

Thanks for making the scope of the KIP clear. Makes a lot of sense to
focus on deserialization exceptions for now.

With regard to corrupted state stores, would it make sense to fail a
task and wipe out the store to repair it via recreation from the
changelog? That's of course a quite advance pattern, but I want to bring
it up to design the first step in a way such that we can get there (if
we think it's a reasonable idea).

I also want to comment about fail fast vs making progress. I think that
fail-fast must not always be the best option. The scenario I have in
mind is like this: you got a bunch of producers that feed the Streams
input topic. Most producers work find, but maybe one producer miss
behaves and the data it writes is corrupted. You might not even be able
to recover this lost data at any point -- thus, there is no reason to
stop processing but you just skip over those records. Of course, you
need to fix the root cause, and thus you need to alert (either via logs
of the exception handler directly) and you need to start to investigate
to find the bad producer, shut it down and fix it.

Here the dead letter queue comes into place. From my understanding, the
purpose of this feature is solely enable post debugging. I don't think
those record would be fed back at any point in time (so I don't see any
ordering issue -- a skipped record, with this regard, is just "fully
processed"). Thus, the dead letter queue should actually encode the
original records metadata (topic, partition offset etc) to enable such
debugging. I guess, this might also be possible if you just log the bad
records, but it would be harder to access (you first must find the
Streams instance that did write the log and extract the information from
there). Reading it from topic is much simpler.

I also want to mention the following. Assume you have such a topic with
some bad records and some good records. If we always fail-fast, it's
going to be super hard to process the good data. You would need to write
an extra app that copied the data into a new topic filtering out the bad
records (or apply the map() workaround withing stream). So I don't think
that failing fast is most likely the best option in production is
necessarily, true.

Or do you think there are scenarios, for which you can recover the
corrupted records successfully? And even if this is possible, it might
be a case for reprocessing instead of failing the whole application?
Also, if you think you can "repair" a corrupted record, should the
handler allow to return a "fixed" record? This would solve the ordering
problem.



-Matthias




On 5/30/17 1:47 AM, Michael Noll wrote:

Thanks for your work on this KIP, Eno -- much appreciated!

- I think it would help to improve the KIP by adding an end-to-end code
example that demonstrates, with the DSL and with the Processor API, how

the

user w

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Jan Filipiak

Hi

1.
That greatly complicates monitoring.  Fail Fast gives you that when you 
monitor only the lag of all your apps
you are completely covered. With that sort of new application Monitoring 
is very much more complicated as
you know need to monitor fail % of some special apps aswell. In my 
opinion that is a huge downside already.


2.
using a schema regerstry like Avrostuff it might not even be the record 
that is broken, it might be just your app
unable to fetch a schema it needs now know. Maybe you got partitioned 
away from that registry.


3. When you get alerted because of to high fail percentage. what are the 
steps you gonna do?
shut it down to buy time. fix the problem. spend way to much time to 
find a good reprocess offset.

Your timewindows are in bad shape anyways, and you pretty much lost.
This routine is nonsense.

Dead letter queues would be the worst possible addition to the kafka 
toolkit that I can think of. It just doesn't fit the architecture

of having clients falling behind is a valid option.

Further. I mentioned already the only bad pill ive seen so far is crc 
errors. any plans for those?


Best Jan






On 02.06.2017 11:34, Damian Guy wrote:

I agree with what Matthias has said w.r.t failing fast. There are plenty of
times when you don't want to fail-fast and must attempt to  make progress.
The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax  wrote:


First a meta comment. KIP discussion should take place on the dev list
-- if user list is cc'ed please make sure to reply to both lists. Thanks.

Thanks for making the scope of the KIP clear. Makes a lot of sense to
focus on deserialization exceptions for now.

With regard to corrupted state stores, would it make sense to fail a
task and wipe out the store to repair it via recreation from the
changelog? That's of course a quite advance pattern, but I want to bring
it up to design the first step in a way such that we can get there (if
we think it's a reasonable idea).

I also want to comment about fail fast vs making progress. I think that
fail-fast must not always be the best option. The scenario I have in
mind is like this: you got a bunch of producers that feed the Streams
input topic. Most producers work find, but maybe one producer miss
behaves and the data it writes is corrupted. You might not even be able
to recover this lost data at any point -- thus, there is no reason to
stop processing but you just skip over those records. Of course, you
need to fix the root cause, and thus you need to alert (either via logs
of the exception handler directly) and you need to start to investigate
to find the bad producer, shut it down and fix it.

Here the dead letter queue comes into place. From my understanding, the
purpose of this feature is solely enable post debugging. I don't think
those record would be fed back at any point in time (so I don't see any
ordering issue -- a skipped record, with this regard, is just "fully
processed"). Thus, the dead letter queue should actually encode the
original records metadata (topic, partition offset etc) to enable such
debugging. I guess, this might also be possible if you just log the bad
records, but it would be harder to access (you first must find the
Streams instance that did write the log and extract the information from
there). Reading it from topic is much simpler.

I also want to mention the following. Assume you have such a topic with
some bad records and some good records. If we always fail-fast, it's
going to be super hard to process the good data. You would need to write
an extra app that copied the data into a new topic filtering out the bad
records (or apply the map() workaround withing stream). So I don't think
that failing fast is most likely the best option in production is
necessarily, true.

Or do you think there are scenarios, for which you can recover the
corrupted records successfully? And even if this is possible, it might
be a case for reprocessing instead of failing the whole application?
Also, if you think you can "repair" a corrupted record, should the
handler allow to return a "fixed" record? This would solve the ordering
problem.



-Matthias




On 5/30/17 1:47 AM, Michael Noll wrote:

Thanks for your work on this KIP, Eno -- much appreciated!

- I think it would help to improve the KIP by adding an end-to-end code
example that demonstrates, with the DSL and with the Processor API, how

the

user would write a simple application that would then be augmented with

the

proposed KIP changes to handle exceptions.  It should also become much
clearer then that e.g. the KIP would lead to different code paths for the
happy case and any failure scenarios.

- Do we have sufficient information available to make informed decisions

on

what to do next?  For example, do we kno

Re: Kafka write throughput tuning

2017-05-31 Thread jan
I'm no kafka expert and I've forgotten what little I learnt, however
there must be a bottleneck somewhere.

In your first instance of 3 partitions on 3 disks:
- Are all partitions growing?
- Are they growing about equally?

- What else might be limiting aspect...
-- what's the cpu activity like, perhaps it's cpu bound (unlikely but
please check)
-- are the disks directly attached and not sharing any write paths, or
are they virtual disks over a network? (I've actually seen virtuals
over a network - not pretty)
-- any other limiting factors you can see or imagine?

Also please in future give a fuller picture of your setup eg. OS, OS
version, memory, number of cpus, what actual hardware (PCs are very
different from servers), etc

cheers

jan

On 17/05/2017, 陈 建平Chen Jianping  wrote:
> Hi Group,
>
> Recently I am trying to turn Kafka write performance to improve throughput.
> On my Kafka broker, there are 3 disks (7200 RPM).
> For one disk, the Kafka write throughput can reach 150MB/s. In my opinion,
> if I send message to topic test_p3 (which has 3 partitions located on
> different disk in the same server), the whole write throughput can reach 450
> MB/s due to parallel writing disk. However the test result is still 150MB/s.
> Is there any reason that multiple disk doesn’t multiply the write
> throughput? And is there any bottleneck for the Kafka write throughput or I
> need some configuration to update?
>
> I also try to test sending message to two different topic (whose partition
> on different disk of that server), and the total throughput only reach 200
> MB/s instead of 300 MB/s as I expect. Below is my Kafka configuration and
> setting. Thanks for any idea or advice on it:)
>
> ##Kafka producer setting
> ./kafka-run-class org.apache.kafka.tools.ProducerPerformance --topic test_p3
> --num-records 5000 --record-size 100 --throughput -1 --producer-props
> acks=0 bootstrap.servers=localhost:9092 buffer.memory=33554432
> batch.size=16384
>
> ##OS tuning setting
> net.core.rmem_default = 124928
> net.core.rmem_max = 2048000
> net.core.wmem_default = 124928
> net.core.wmem_max = 2048000
> net.ipv4.tcp_rmem = 4096 87380 4194304
> net.ipv4.tcp_wmem = 4096 87380 4194304
> net.ipv4.tcp_max_tw_buckets = 262144
> net.ipv4.tcp_max_syn_backlog = 1024
> vm.oom_kill_allocating_task = 1
> vm.max_map_count = 20
> vm.swappiness = 1
> vm.dirty_writeback_centisecs = 500
> vm.dirty_expire_centisecs = 500
> vm.dirty_ratio = 60
> vm.dirty_background_ratio = 5
>
>
> Thanks,
> Eric
>
>


Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-30 Thread Jan Filipiak

Hi Jay,

Eno mentioned that he will narrow down the scope to only ConsumerRecord 
deserialisation.


I am working with Database Changelogs only. I would really not like to 
see a dead letter queue or something
similliar. how am I expected to get these back in order. Just grind to 
hold an call me on the weekend. I'll fix it
then in a few minutes rather spend 2 weeks ordering dead letters. (where 
reprocessing might be even the faster fix)


Best Jan



On 29.05.2017 20:23, Jay Kreps wrote:

- I think we should hold off on retries unless we have worked out the
full usage pattern, people can always implement their own. I think the idea
is that you send the message to some kind of dead letter queue and then
replay these later. This obviously destroys all semantic guarantees we are
working hard to provide right now, which may be okay.




Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-28 Thread Jan Filipiak

+1

On 26.05.2017 18:36, Damian Guy wrote:

In that case, though, every access to that key is doomed to failure as the
database is corrupted. So i think it should probably die in a steaming heap
at that point!

On Fri, 26 May 2017 at 17:33 Eno Thereska  wrote:


Hi Damian,

I was thinking of cases when there is bit-rot on the storage itself and we
get a malformed record that cannot be de-serialized. There is an
interesting intersection here with CRCs in both Kafka (already there, they
throw on deserialization) and potentially local storage (we don't have CRCs
here on the data files, though RocksDB has them on its write-ahead log
records).

Basically in a nutshell, I'm saying that every deserialization exception
should go through this new path. The user can decide to fail or continue.
We could start with just poison pills from Kafka though and punt the
storage one to later.

Eno


On 26 May 2017, at 16:59, Damian Guy  wrote:

Eno,

Under what circumstances would you get a deserialization exception from

the

state store? I can only think of the case where someone has provided a

bad

deserializer to a method that creates a state store. In which case it

would

be a user error and probably should just abort?

Thanks,
Damian

On Fri, 26 May 2017 at 16:32 Eno Thereska 

wrote:

See latest reply to Jan's note. I think I unnecessarily broadened the
scope of this KIP to the point where it sounded like it handles all

sorts

of exceptions. The scope should be strictly limited to "poison pill"
records for now. Will update KIP,

Thanks
Eno

On 26 May 2017, at 16:16, Matthias J. Sax 

wrote:

"bad" for this case would mean, that we got an
`DeserializationException`. I am not sure if any other processing error
should be covered?

@Eno: this raises one one question. Might it be better to allow for two
handlers instead of one? One for deserialization exception and one for
all other exceptions from user code?

Just a thought.


-Matthias

On 5/26/17 7:49 AM, Jim Jagielski wrote:

On May 26, 2017, at 5:13 AM, Eno Thereska 

wrote:




With regard to `DeserializationException`, do you thing it might

make

sense to have a "dead letter queue" as a feature to provide

out-of-the-box?

We could provide a special topic where bad messages go to, and then

we'd have to add a config option for the user to provide a topic. Is

that

what you're thinking?

For various definitions of "bad"??









Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-26 Thread Jan Filipiak

Hi Eno,

that does make a lot more sense to me. when you pop stuff out of a topic 
you can at least put the coordinates (topicpartition,offset) 
additionally into the log wich is probably kinda nice to just fetch it 
from CLI an check whats going on.


One additional question:

This handler is only going to cover Serde exceptions or MessageSet 
Iterator exceptions aswell? Speaking Checksum Error. We can't rely on 
the deserializer to properly throw when we hand it data with a bad 
checksum + the checksum errors are the only bad pills I have seen in 
production until this point.


Best Jan


On 26.05.2017 17:31, Eno Thereska wrote:

Hi Jan,

You're right. I think I got carried away and broadened the scope of this KIP beyond it's 
original purpose. This handler will only be there for deserialization errors, i.e., 
"poison pills" and is not intended to be a catch-all handler for all sorts of 
other problems (e.g., NPE exception in user code). Deserialization erros can happen 
either when polling or when deserialising from a state store. So that narrows down the 
scope of the KIP, will update it.

Thanks
Eno


On 26 May 2017, at 11:31, Jan Filipiak  wrote:

Hi

unfortunatly no. Think about "caching" these records popping outta there or 
multiple step Tasks (join,aggregate,repartiton all in one go) last repartitioner might 
throw cause it cant determine the partition only because a get on the join store cause a 
flush through the aggregates. This has nothing todo with a ConsumerRecord at all. 
Especially not the one we most recently processed.

To be completly honest. All but grining to a hold is not appealing to me at 
all. Sure maybe lagmonitoring will call me on Sunday but I can at least be 
confident its working the rest of the time.

Best Jan

PS.:

Hope you get my point. I am mostly complaing about

|public| |interface| |RecordExceptionHandler {|
|||/**|
|||* Inspect a record and the exception received|
|||*/|
|||HandlerResponse handle(that guy here >>>>>>>   ConsumerRecord<||byte||[], 
||byte||[]> record, Exception exception);|
|}|
||
|public| |enum| |HandlerResponse {|
|||/* continue with processing */|
|||CONTINUE(||1||), |
|||/* fail the processing and stop */|
|||FAIL(||2||);|
|}|



On 26.05.2017 11:18, Eno Thereska wrote:

Thanks Jan,

The record passed to the handler will always be the problematic record. There 
are 2 cases/types of exceptions for the purposes of this KIP: 1) any exception 
during deserialization. The bad record + the exception (i.e. 
DeserializeException) will be passed to the handler. The handler will be able 
to tell this was a deserialization error.
2) any exception during processing of this record. So whenever a processor gets 
the record (after some caching, etc) it starts to process it, then it fails, 
then it will call the handler with this record.

Does that match your thinking?

Thanks,
Eno



On 26 May 2017, at 09:51, Jan Filipiak  wrote:

Hi,

quick question: From the KIP it doesn't quite makes sense to me how that fits 
with caching.
With caching the consumer record might not be at all related to some processor 
throwing while processing.

would it not make more sense to get the ProcessorName + object object for 
processing and
statestore or topic name + byte[] byte[]  for serializers? maybe passing in the 
used serdes?

Best Jan



On 25.05.2017 11:47, Eno Thereska wrote:

Hi there,

I’ve added a KIP on improving exception handling in streams:
KIP-161: streams record processing exception handlers. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+record+processing+exception+handlers
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+processing+exception+handlers>

Discussion and feedback is welcome, thank you.
Eno




Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-26 Thread Jan Filipiak

Hi

unfortunatly no. Think about "caching" these records popping outta there 
or multiple step Tasks (join,aggregate,repartiton all in one go) last 
repartitioner might throw cause it cant determine the partition only 
because a get on the join store cause a flush through the aggregates. 
This has nothing todo with a ConsumerRecord at all. Especially not the 
one we most recently processed.


To be completly honest. All but grining to a hold is not appealing to me 
at all. Sure maybe lagmonitoring will call me on Sunday but I can at 
least be confident its working the rest of the time.


Best Jan

PS.:

Hope you get my point. I am mostly complaing about

|public| |interface| |RecordExceptionHandler {|
|||/**|
|||* Inspect a record and the exception received|
|||*/|
|||HandlerResponse handle(that guy here >>>>>>>   
ConsumerRecord<||byte||[], ||byte||[]> record, Exception exception);|

|}|
||
|public| |enum| |HandlerResponse {|
|||/* continue with processing */|
|||CONTINUE(||1||), |
|||/* fail the processing and stop */|
|||FAIL(||2||);|
|}|



On 26.05.2017 11:18, Eno Thereska wrote:

Thanks Jan,

The record passed to the handler will always be the problematic record. There 
are 2 cases/types of exceptions for the purposes of this KIP: 1) any exception 
during deserialization. The bad record + the exception (i.e. 
DeserializeException) will be passed to the handler. The handler will be able 
to tell this was a deserialization error.
2) any exception during processing of this record. So whenever a processor gets 
the record (after some caching, etc) it starts to process it, then it fails, 
then it will call the handler with this record.

Does that match your thinking?

Thanks,
Eno



On 26 May 2017, at 09:51, Jan Filipiak  wrote:

Hi,

quick question: From the KIP it doesn't quite makes sense to me how that fits 
with caching.
With caching the consumer record might not be at all related to some processor 
throwing while processing.

would it not make more sense to get the ProcessorName + object object for 
processing and
statestore or topic name + byte[] byte[]  for serializers? maybe passing in the 
used serdes?

Best Jan



On 25.05.2017 11:47, Eno Thereska wrote:

Hi there,

I’ve added a KIP on improving exception handling in streams:
KIP-161: streams record processing exception handlers. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+record+processing+exception+handlers
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+processing+exception+handlers>

Discussion and feedback is welcome, thank you.
Eno




Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-26 Thread Jan Filipiak

Hi,

quick question: From the KIP it doesn't quite makes sense to me how that 
fits with caching.
With caching the consumer record might not be at all related to some 
processor throwing while processing.


would it not make more sense to get the ProcessorName + object object 
for processing and
statestore or topic name + byte[] byte[]  for serializers? maybe passing 
in the used serdes?


Best Jan



On 25.05.2017 11:47, Eno Thereska wrote:

Hi there,

I’ve added a KIP on improving exception handling in streams:
KIP-161: streams record processing exception handlers. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+record+processing+exception+handlers
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+processing+exception+handlers>

Discussion and feedback is welcome, thank you.
Eno




Re: Issue in Kafka running for few days

2017-04-30 Thread jan
I looked this up yesterday  when I read the grandparent, as my old
company ran two and I needed to know.
Your link is a bit ambiguous but it has a link to the zookeeper
Getting Started guide which says this:

"
For replicated mode, a minimum of three servers are required, and it
is strongly recommended that you have an odd number of servers. If you
only have two servers, then you are in a situation where if one of
them fails, there are not enough machines to form a majority quorum.
Two servers is inherently less stable than a single server, because
there are two single points of failure.
"

<https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html>

cheers

jan


On 30/04/2017, Michal Borowiecki  wrote:
> Svante, I don't share your opinion.
> Having an even number of zookeepers is not a problem in itself, it
> simply means you don't get any better resilience than if you had one
> fewer instance.
> Yes, it's not common or recommended practice, but you are allowed to
> have an even number of zookeepers and it's most likely not related to
> the problem at hand and does NOT need to be addressed first.
> https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_zkMulitServerSetup
>
> Abhit, I'm afraid the log snippet is not enough for me to help.
> Maybe someone else in the community with more experience can recognize
> the symptoms but in the meantime, if you haven't already done so, you
> may want to search for similar issues:
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20text%20~%20%22ZK%20expired%3B%20shut%20down%20all%20controller%22
>
> searching for text like "ZK expired; shut down all controller" or "No
> broker in ISR is alive for" or other interesting events form the log.
>
> Hope that helps,
> Michal
>
>
> On 26/04/17 21:40, Svante Karlsson wrote:
>> You are not supposed to run an even number of zookeepers. Fix that first
>>
>> On Apr 26, 2017 20:59, "Abhit Kalsotra"  wrote:
>>
>>> Any pointers please
>>>
>>>
>>> Abhi
>>>
>>> On Wed, Apr 26, 2017 at 11:03 PM, Abhit Kalsotra 
>>> wrote:
>>>
>>>> Hi *
>>>>
>>>> My kafka setup
>>>>
>>>>
>>>> **OS: Windows Machine*6 broker nodes , 4 on one Machine and 2 on other
>>>> Machine*
>>>>
>>>> **ZK instance on (4 broker nodes Machine) and another ZK on (2 broker
>>>> nodes machine)*
>>>> ** 2 Topics with partition size = 50 and replication factor = 3*
>>>>
>>>> I am producing on an average of around 500 messages / sec with each
>>>> message size close to 98 bytes...
>>>>
>>>> More or less the message rate stays constant throughout, but after
>>> running
>>>> the setup for close to 2 weeks , my Kafka cluster broke and this
>>>> happened
>>>> twice in a month.  Not able to understand what's the issue, Kafka gurus
>>>> please do share your inputs...
>>>>
>>>> the controlle.log file at the time of Kafka broken looks like
>>>>
>>>>
>>>>
>>>>
>>>> *[2017-04-26 12:03:34,998] INFO [Controller 0]: Broker failure callback
>>>> for 0,1,3,5,6 (kafka.controller.KafkaController)[2017-04-26
>>> 12:03:34,998]
>>>> INFO [Controller 0]: Removed ArrayBuffer() from list of shutting down
>>>> brokers. (kafka.controller.KafkaController)[2017-04-26 12:03:34,998]
>>> INFO
>>>> [Partition state machine on Controller 0]: Invoking state change to
>>>> OfflinePartition for partitions
>>>> [__consumer_offsets,19],[mytopic,11],[__consumer_
>>> offsets,30],[mytopicOLD,18],[mytopic,13],[__consumer_
>>> offsets,47],[mytopicOLD,26],[__consumer_offsets,29],[
>>> mytopicOLD,0],[__consumer_offsets,41],[mytopic,44],[
>>> mytopicOLD,38],[mytopicOLD,2],[__consumer_offsets,17],[__
>>> consumer_offsets,10],[mytopic,20],[mytopic,23],[mytopic,30],
>>> [__consumer_offsets,14],[__consumer_offsets,40],[mytopic,
>>> 31],[mytopicOLD,43],[mytopicOLD,19],[mytopicOLD,35]
>>> ,[__consumer_offsets,18],[mytopic,43],[__consumer_offsets,26],[__consumer_
>>> offsets,0],[mytopic,32],[__consumer_offsets,24],[
>>> mytopicOLD,3],[mytopic,2],[mytopic,3],[mytopicOLD,45],[
>>> mytopic,35],[__consumer_offsets,20],[mytopic,1],[
>>> mytopicOLD,33],[__consumer_offsets,5],[mytopicOLD,47],[__
>>> consumer_offsets,22],[mytopicOLD,8],[mytopic,33],[
>>> mytopic,36],[mytopicOLD,11],[mytopic,47],[myto

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-19 Thread jan
@Robert Quinlivan: the producer is just the kafka-console-producer
shell that comes in the kafka/bin directory (kafka/bin/windows in my
case). Nothing special.
I'll try messing with acks because this problem is somewhat incidental
to what I'm trying to do which is see how big the log directory grows.

It's possible kafkacat or other producers would do a better job than
the console producer but I'll try that on linux as getting them
working on windows, meh.

thanks all

jan


On 18/04/2017, David Garcia  wrote:
> The “NewShinyProducer” is also deprecated.
>
> On 4/18/17, 5:41 PM, "David Garcia"  wrote:
>
> The console producer in the 0.10.0.0 release uses the old producer which
> doesn’t have “backoff”…it’s really just for testing simple producing:
>
> object ConsoleProducer {
>
>   def main(args: Array[String]) {
>
> try {
> val config = new ProducerConfig(args)
> val reader =
> Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
> reader.init(System.in, getReaderProps(config))
>
> val producer =
>   if(config.useOldProducer) {
> new OldProducer(getOldProducerProps(config))
>   } else {
> new NewShinyProducer(getNewProducerProps(config))
>   }
>
>
>
> On 4/18/17, 5:31 PM, "Robert Quinlivan"  wrote:
>
> I am curious how your producer is configured. The producer maintains
> an
> internal buffer of messages to be sent over to the broker. Is it
> possible
> you are terminating the producer code in your test before the buffer
> is
> exhausted?
>
> On Tue, Apr 18, 2017 at 5:29 PM, jan 
> wrote:
>
> > Thanks to both of you. Some quick points:
> >
> > I'd expect there to be backpressure from the producer if the
> broker is
> > busy ie. the broker would not respond to the console producer if
> the
> > broker was too busy accept more messages, and the producer would
> hang
> > on the socket. Alternatively I'd hope the console producer would
> have
> > the sense to back off and retry but clearly(?) not.
> > This behaviour is actually relevant to my old job so I need to
> know more.
> >
> > Perhaps the timeout mentioned in the error msg can just be upped?
> >
> > *Is* the claimed timeout relevant?
> > > Batch containing 8 record(s) expired due to timeout while
> requesting
> > metadata from brokers for big_ptns1_repl1_nozip-0
> >
> > Why is the producer expiring records?
> >
> > But I'm surprised this happened because my setup is one machine
> with
> > everything running on it. No network. Also Kafka writes to the
> disk
> > without an fsync (or its equivalent on windows) which means it
> just
> > gets cached in ram before being lazily written to disk, and I've
> got
> > plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its
> overhead
> > so it grows to ~8GB but still, it need not hit disk at all (and
> the
> > file goes into the windows memory, not java's).
> > Maybe it is GC holding things up but I dunno, GC even for a second
> or
> > two should not cause a socket failure, just delay the read, though
> I'm
> > not an expert on this *at all*.
> >
> > I'll go over the answers tomorrow more carefully but thanks
> anyway!
> >
> > cheers
> >
> > jan
> >
> > On 18/04/2017, Serega Sheypak  wrote:
> > >> err, isn't it supposed to? Isn't the loss of data a very
> serious error?
> > > Kafka can't fix networking issues like latencies, blinking,
> > unavailability
> > > or any other weird stuff. Kafka promises you to persist data if
> data
> > > reaches Kafka. Data delivery responsibility to kafka is on your
> side. You
> > > fail to do it according to logs.
> > >
> > > 0.02% not 2%
> > > You should check broker logs to figure out what went wrong. All
> things
> > > happen on one machine as far as I understand. Maybe your brokers
> don't
> > have
> > > enough mem and they stuck because of GC and don't respond to
> producer.
> > &

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread jan
Thanks to both of you. Some quick points:

I'd expect there to be backpressure from the producer if the broker is
busy ie. the broker would not respond to the console producer if the
broker was too busy accept more messages, and the producer would hang
on the socket. Alternatively I'd hope the console producer would have
the sense to back off and retry but clearly(?) not.
This behaviour is actually relevant to my old job so I need to know more.

Perhaps the timeout mentioned in the error msg can just be upped?

*Is* the claimed timeout relevant?
> Batch containing 8 record(s) expired due to timeout while requesting metadata 
> from brokers for big_ptns1_repl1_nozip-0

Why is the producer expiring records?

But I'm surprised this happened because my setup is one machine with
everything running on it. No network. Also Kafka writes to the disk
without an fsync (or its equivalent on windows) which means it just
gets cached in ram before being lazily written to disk, and I've got
plenty of ram - 16GB ram vs 5GB of input file. Kafka adds its overhead
so it grows to ~8GB but still, it need not hit disk at all (and the
file goes into the windows memory, not java's).
Maybe it is GC holding things up but I dunno, GC even for a second or
two should not cause a socket failure, just delay the read, though I'm
not an expert on this *at all*.

I'll go over the answers tomorrow more carefully but thanks anyway!

cheers

jan

On 18/04/2017, Serega Sheypak  wrote:
>> err, isn't it supposed to? Isn't the loss of data a very serious error?
> Kafka can't fix networking issues like latencies, blinking, unavailability
> or any other weird stuff. Kafka promises you to persist data if data
> reaches Kafka. Data delivery responsibility to kafka is on your side. You
> fail to do it according to logs.
>
> 0.02% not 2%
> You should check broker logs to figure out what went wrong. All things
> happen on one machine as far as I understand. Maybe your brokers don't have
> enough mem and they stuck because of GC and don't respond to producer.
> Async producer fails to send data. That is why you observe data loss on
> consumer side.
>
>
> 2017-04-18 23:32 GMT+02:00 jan :
>
>> Hi Serega,
>>
>> > data didn't reach producer. So why should data appear in consumer?
>>
>> err, isn't it supposed to? Isn't the loss of data a very serious error?
>>
>> > loss rate is more or less similar [...] Not so bad.
>>
>> That made me laugh at least.  Is kafka intended to be a reliable
>> message delivery system, or is a 2% data loss officially acceptable?
>>
>> I've been reading the other threads and one says windows is really not
>> supported, and certainly not for production. Perhaps that's the root
>> of it. Well I'm hoping to try it on linux shortly so I'll see if I can
>> replicate the issue but I would like to know whether it *should* work
>> in windows.
>>
>> cheers
>>
>> jan
>>
>> On 18/04/2017, Serega Sheypak  wrote:
>> > Hi,
>> >
>> > [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
>> > big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
>> > (org.apache.kafka.clients.
>> > producer.internals.ErrorLoggingCallback)
>> > org.apache.kafka.common.errors.TimeoutException: Batch containing 8
>> > record(s) expired due to timeout while requesting metadata from
>> > brokers for big_ptns1_repl1_nozip-0
>> >
>> > data didn't reach producer. So why should data appear in consumer?
>> > loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03% (150mb
>> > /
>> > 5000gb) Not so bad.
>> >
>> >
>> > 2017-04-18 21:46 GMT+02:00 jan :
>> >
>> >> Hi all, I'm something of a kafka n00b.
>> >> I posted the following in the  google newsgroup, haven't had a reply
>> >> or even a single read so I'll try here. My original msg, slightly
>> >> edited, was:
>> >>
>> >> 
>> >>
>> >> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
>> >> server, latest version of java)
>> >>
>> >> I've spent several days trying to sort out unexpected behaviour
>> >> involving kafka and the kafka console producer and consumer.
>> >>
>> >>  If I set  the console produced and console consumer to look at the
>> >> same topic then I can type lines into the producer window and see them
>> >> appear in the consumer window, so it works.
>> >>
>> >> If I try 

Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread jan
Hi Serega,

> data didn't reach producer. So why should data appear in consumer?

err, isn't it supposed to? Isn't the loss of data a very serious error?

> loss rate is more or less similar [...] Not so bad.

That made me laugh at least.  Is kafka intended to be a reliable
message delivery system, or is a 2% data loss officially acceptable?

I've been reading the other threads and one says windows is really not
supported, and certainly not for production. Perhaps that's the root
of it. Well I'm hoping to try it on linux shortly so I'll see if I can
replicate the issue but I would like to know whether it *should* work
in windows.

cheers

jan

On 18/04/2017, Serega Sheypak  wrote:
> Hi,
>
> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> record(s) expired due to timeout while requesting metadata from
> brokers for big_ptns1_repl1_nozip-0
>
> data didn't reach producer. So why should data appear in consumer?
> loss rate is more or less similar : 0.02 (130k / 5400mb) ~ 0.03% (150mb /
> 5000gb) Not so bad.
>
>
> 2017-04-18 21:46 GMT+02:00 jan :
>
>> Hi all, I'm something of a kafka n00b.
>> I posted the following in the  google newsgroup, haven't had a reply
>> or even a single read so I'll try here. My original msg, slightly
>> edited, was:
>>
>> 
>>
>> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
>> server, latest version of java)
>>
>> I've spent several days trying to sort out unexpected behaviour
>> involving kafka and the kafka console producer and consumer.
>>
>>  If I set  the console produced and console consumer to look at the
>> same topic then I can type lines into the producer window and see them
>> appear in the consumer window, so it works.
>>
>> If I try to pipe in large amounts of data to the producer, some gets
>> lost and the producer reports errors eg.
>>
>> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
>> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
>> (org.apache.kafka.clients.
>> producer.internals.ErrorLoggingCallback)
>> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
>> record(s) expired due to timeout while requesting metadata from
>> brokers for big_ptns1_repl1_nozip-0
>>
>> I'm using as input a file either shakespeare's full works (about 5.4
>> meg ascii), or a much larger file of shakespear's full works
>> replicated 900 times to make it about 5GB. Lines are ascii and short,
>> and each line should be a single record when read in by the console
>> producer. I need to do some benchmarking on time and space and this
>> was my first try.
>>
>> As mentioned, data gets lost. I presume it is expected that any data
>> we pipe into the producer should arrive in the consumer, so if I do
>> this in one windows console:
>>
>> kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
>> big_ptns1_repl1_nozip --zookeeper localhost:2181 >
>> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt
>>
>> and this in another:
>>
>> kafka-console-producer.bat --broker-list localhost:9092  --topic
>> big_ptns1_repl1_nozip <
>> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt
>>
>> then the output file "single_all_shakespear_OUT.txt" should be
>> identical to the input file "complete_works_no_bare_lines.txt" except
>> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
>> about 130K in the output.
>> For the replicated shakespeare, which is about 5GB, I lost about 150 meg.
>>
>> This can't be right surely and it's repeatable but happens at
>> different places in the file when errors start to be produced, it
>> seems.
>>
>> I've done this using all 3 versions of kafak in the 0.10.x.y branch
>> and I get the same problem (the above commands were using the 0.10.0.0
>> branch so they look a little obsolete but they are right for that
>> branch I think). It's cost me some days.
>> So, am I making a mistake, if so what?
>>
>> thanks
>>
>> jan
>>
>


possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread jan
Hi all, I'm something of a kafka n00b.
I posted the following in the  google newsgroup, haven't had a reply
or even a single read so I'll try here. My original msg, slightly
edited, was:



(windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
server, latest version of java)

I've spent several days trying to sort out unexpected behaviour
involving kafka and the kafka console producer and consumer.

 If I set  the console produced and console consumer to look at the
same topic then I can type lines into the producer window and see them
appear in the consumer window, so it works.

If I try to pipe in large amounts of data to the producer, some gets
lost and the producer reports errors eg.

[2017-04-17 18:14:05,868] ERROR Error when sending message to topic
big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
(org.apache.kafka.clients.
producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 8
record(s) expired due to timeout while requesting metadata from
brokers for big_ptns1_repl1_nozip-0

I'm using as input a file either shakespeare's full works (about 5.4
meg ascii), or a much larger file of shakespear's full works
replicated 900 times to make it about 5GB. Lines are ascii and short,
and each line should be a single record when read in by the console
producer. I need to do some benchmarking on time and space and this
was my first try.

As mentioned, data gets lost. I presume it is expected that any data
we pipe into the producer should arrive in the consumer, so if I do
this in one windows console:

kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
big_ptns1_repl1_nozip --zookeeper localhost:2181 >
F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt

and this in another:

kafka-console-producer.bat --broker-list localhost:9092  --topic
big_ptns1_repl1_nozip <
F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt

then the output file "single_all_shakespear_OUT.txt" should be
identical to the input file "complete_works_no_bare_lines.txt" except
it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
about 130K in the output.
For the replicated shakespeare, which is about 5GB, I lost about 150 meg.

This can't be right surely and it's repeatable but happens at
different places in the file when errors start to be produced, it
seems.

I've done this using all 3 versions of kafak in the 0.10.x.y branch
and I get the same problem (the above commands were using the 0.10.0.0
branch so they look a little obsolete but they are right for that
branch I think). It's cost me some days.
So, am I making a mistake, if so what?

thanks

jan


ThoughWorks Tech Radar: Assess Kafka Streams

2017-03-29 Thread Jan Filipiak

Regardless of how usefull you find the tech radar.

Well deserved! even though we all here agree that trial or adopt is in reach

https://www.thoughtworks.com/radar/platforms/kafka-streams

Best Jan




Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Jan Filipiak

Hi,

yeah if the proposed solution is doable (only constrain really is to not 
have a parent key with lots of children) completly in the DSL except the 
lateral view

wich is a pretty easy thing in PAPI.

Our own implementation is a mix of reusing DSL interfaces but using 
reflection against KTableImpl to drop down to PAPI. Probably one 
limiting factor why i am not that eager to share publicly, cause its 
kinda ugly. The development at the moment (removing many featueres from 
PAPI) is very worrisome for me, so I should get moving having upstream 
support.


regarding the output key, we forced the user to pick a combined key 
parent+child_id, this works out pretty nicely as you get access to the 
partition information in the partitioner also in the delete cases + on 
the recieving side you can use just a regular KTableSource to materialze 
and have the parent key as prefix automatically. + It will do the 
naturally correct thing if you update parent_id in the child table. 
Upstream support would also be helpfull as the statestores are changelog 
even though we can use the intermediate topic for state store high 
availability.


Best Jan

On 21.02.2017 20:15, Guozhang Wang wrote:

Jan,

Sure I would love to hear what you did for non-key joins. Last time we
chatted there are discussions on the ordering issue, that we HAVE TO
augment the join result stream keys as a combo of both, which may not be
elegant as used in the DSL.

For your proposed solution, it seems you did not do that on the DSL but at
the PAPI layer, right?

Guozhang

On Tue, Feb 21, 2017 at 6:05 AM, Jan Filipiak 
wrote:


Just a little note here:

if you can take all rows of the "children" table for each key into memory,
you get get away by using group_by and make a list of them. With this
aggregation the join is straight forward and you can use a lateral view
later to get to the same result. For this you could use the current DSL to
a greater extend.

Best Jan

On 21.02.2017 13:10, Frank Lyaruu wrote:


I've read that JIRA (although I don't understand every single thing), and
I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+
Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
wrote:

Hi Frank,

As far as I know the design in that wiki has been superceded by the
Global
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
mentioned there (like KAFKA-3705). There are some extensive comments in
https://issues.apache.org/jira/browse/KAFKA-3705 <
https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
design is particularly challenging and why Global KTables was chosen
instead. I'm not sure if you still want to pursue that original design,
since it is not proven to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the
new design?

Thanks
Eno

On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:

Hi all,

I'm trying to implement joining two Kafka tables using a 'remote' key,
basically as described here:

https://cwiki.apache.org/confluence/display/KAFKA/


Discussion%3A+Non-key+KTable-KTable+Joins


Under the "Implementation Details" there is one line I don't know how to
do:


1. First of all, we will repartition this KTable's stream, by key
computed from the *mapper(K, V) → K1*, so that it is co-partitioned
by
the same key. The co-partition topic is partitioned on the new key,


but the


message key and value are unchanged, and log compaction is turned
off.


How do I do that? I've been unable to find any documentation, I've
looked
at the StreamPartitionAssignor, that seems relevant, but I could use
some
help. Does anyone have an example?

regards, Frank









Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Jan Filipiak

Just a little note here:

if you can take all rows of the "children" table for each key into 
memory, you get get away by using group_by and make a list of them. With 
this aggregation the join is straight forward and you can use a lateral 
view later to get to the same result. For this you could use the current 
DSL to a greater extend.


Best Jan

On 21.02.2017 13:10, Frank Lyaruu wrote:

I've read that JIRA (although I don't understand every single thing), and I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
wrote:


Hi Frank,

As far as I know the design in that wiki has been superceded by the Global
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
mentioned there (like KAFKA-3705). There are some extensive comments in
https://issues.apache.org/jira/browse/KAFKA-3705 <
https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
design is particularly challenging and why Global KTables was chosen
instead. I'm not sure if you still want to pursue that original design,
since it is not proven to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the
new design?

Thanks
Eno


On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:

Hi all,

I'm trying to implement joining two Kafka tables using a 'remote' key,
basically as described here:

https://cwiki.apache.org/confluence/display/KAFKA/

Discussion%3A+Non-key+KTable-KTable+Joins

Under the "Implementation Details" there is one line I don't know how to
do:


   1. First of all, we will repartition this KTable's stream, by key
   computed from the *mapper(K, V) → K1*, so that it is co-partitioned by
   the same key. The co-partition topic is partitioned on the new key,

but the

   message key and value are unchanged, and log compaction is turned off.


How do I do that? I've been unable to find any documentation, I've looked
at the StreamPartitionAssignor, that seems relevant, but I could use some
help. Does anyone have an example?

regards, Frank






Re: Implementing a non-key in Kafka Streams using the Processor API

2017-02-21 Thread Jan Filipiak

Hi,

yes the ticket is exactly about what you want to do. The lengthy 
discussion is mainly about what the key of the output KTable is.


@gouzhang would you be interested in seeing what we did so far?

best Jan

On 21.02.2017 13:10, Frank Lyaruu wrote:

I've read that JIRA (although I don't understand every single thing), and I
got the feeling it is not exactly the same problem.
I am aware of the Global Tables, and I've tried that first, but I seem
unable to do what I need to do.

I'm replicating a relational database, and on a one-to-many relationship
I'd like to publish a joined message if either of the source streams
receives an update.

In the Global Table Wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams

I see this:
"The GlobalKTable will only be used for doing lookups. That is, data
arriving in the GlobalKTable will not trigger the join. "

So how would I go about doing this?
regards, Frank



On Tue, Feb 21, 2017 at 10:38 AM, Eno Thereska 
wrote:


Hi Frank,

As far as I know the design in that wiki has been superceded by the Global
KTables design which is now coming in 0.10.2. Hence, the JIRAs that are
mentioned there (like KAFKA-3705). There are some extensive comments in
https://issues.apache.org/jira/browse/KAFKA-3705 <
https://issues.apache.org/jira/browse/KAFKA-3705> illustrating why this
design is particularly challenging and why Global KTables was chosen
instead. I'm not sure if you still want to pursue that original design,
since it is not proven to work.

Guozhang, perhaps we need to add a note saying that Global KTables is the
new design?

Thanks
Eno


On 21 Feb 2017, at 07:35, Frank Lyaruu  wrote:

Hi all,

I'm trying to implement joining two Kafka tables using a 'remote' key,
basically as described here:

https://cwiki.apache.org/confluence/display/KAFKA/

Discussion%3A+Non-key+KTable-KTable+Joins

Under the "Implementation Details" there is one line I don't know how to
do:


   1. First of all, we will repartition this KTable's stream, by key
   computed from the *mapper(K, V) → K1*, so that it is co-partitioned by
   the same key. The co-partition topic is partitioned on the new key,

but the

   message key and value are unchanged, and log compaction is turned off.


How do I do that? I've been unable to find any documentation, I've looked
at the StreamPartitionAssignor, that seems relevant, but I could use some
help. Does anyone have an example?

regards, Frank






Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-08 Thread Jan Filipiak

Hi,

Just my few thoughts:

does it need to be json?
the old zkOffset tool had a nice format,
very easy to manipulate on cli
very powerfull: changes as many consumergroups/topics/partitions in one 
go as you want


maybe allow -1 and -2 to indicate earliest and latest reset regardless 
of what the group has as auto mechanism


I would definitely prefer a line oriented format rather than json. I 
ramped my https://stedolan.github.io/jq/ skills up

so I can do some partition assignments but its no joy, better grep awk ...

Best Jan

On 08.02.2017 03:43, Jorge Esteban Quilcate Otoya wrote:

Hi all,

I would like to propose a KIP to Add a tool to Reset Consumer Group Offsets.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+a+tool+to+Reset+Consumer+Group+Offsets

Please, take a look at the proposal and share your feedback.

Thanks,
Jorge.





Re: At Least Once semantics for Kafka Streams

2017-02-03 Thread Jan Filipiak

Hey,

with a little more effort you can try to make your stream application 
idempotent.
Maybe giving you the same results. Say you want to aggregate a KStream 
by some key.
Instead of keeping the aggregate, you keep a Set of raw values and then 
do the aggregate calculations

with a map().

This is very much more resource intensive as you have quite some O(n^2) 
statechange logs etc + the framework
is not really friendly helping you with that. With caching you can 
hopefully spare quite a good chunk of the quadratic properties but it 
can act as an exactly once processing.



Best Jan

On 30.01.2017 05:13, Mahendra Kariya wrote:

Hey All,

I am new to Kafka streams. From the documentation
<http://docs.confluent.io/3.1.0/streams/architecture.html#processing-guarantees>,
it is pretty much clear that streams support at least once semantics. But I
couldn't find details about how this is supported. I am interested in
knowing the finer details / design of this.

Is there some documentation around this?
Is there some documentation around what semantics are followed by the
various Kafka streams examples
<https://github.com/confluentinc/examples/tree/3.1.x/kafka-streams>
available on Github? Do all of them follow at least once?


Thanks,
Mahendra





Re: kafka-consumer-offset-checker complaining about NoNode for X in zk

2017-02-02 Thread Jan Filipiak

Hi,

sorry and using the consumer group tool, instead of the offset checker


On 02.02.2017 20:08, Jan Filipiak wrote:

Hi,

 if its a kafka stream app, its most likely going to store its offsets 
in kafka rather than zookeeper.

You can use the --new-consumer option to check for kafka stored offsets.

Best Jan


On 01.02.2017 21:14, Ara Ebrahimi wrote:

Hi,

For a subset of our topics we get this error:

$KAFKA_HOME/bin/kafka-consumer-offset-checker.sh --group 
argyle-streams --topic topic_name --zookeeper $ZOOKEEPERS
[2017-02-01 12:08:56,115] WARN WARNING: ConsumerOffsetChecker is 
deprecated and will be dropped in releases following 0.9.0. Use 
ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: 
KeeperErrorCode = NoNode for 
/consumers/streams-app/offsets/topic_name/2.


All topics are created via the same process but a few of them report 
this error.


Why I check in zk there’s noting under /consumers:

zookeeper-client -server $ZOOKEEPERS -cmd ls /consumers

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[]

But the kafka-consumer-offset-checker does work for many topics 
anyway, and it fails for a few.


Why does this happen? How can I fix it?

Thanks,
Ara.





This message is for the designated recipient only and may contain 
privileged, proprietary, or otherwise confidential information. If 
you have received it in error, please notify the sender immediately 
and delete the original. Any other use of the e-mail by you is 
prohibited. Thank you in advance for your cooperation.









Re: kafka-consumer-offset-checker complaining about NoNode for X in zk

2017-02-02 Thread Jan Filipiak

Hi,

 if its a kafka stream app, its most likely going to store its offsets 
in kafka rather than zookeeper.

You can use the --new-consumer option to check for kafka stored offsets.

Best Jan


On 01.02.2017 21:14, Ara Ebrahimi wrote:

Hi,

For a subset of our topics we get this error:

$KAFKA_HOME/bin/kafka-consumer-offset-checker.sh --group argyle-streams --topic 
topic_name --zookeeper $ZOOKEEPERS
[2017-02-01 12:08:56,115] WARN WARNING: ConsumerOffsetChecker is deprecated and 
will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. 
(kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: 
KeeperErrorCode = NoNode for /consumers/streams-app/offsets/topic_name/2.

All topics are created via the same process but a few of them report this error.

Why I check in zk there’s noting under /consumers:

zookeeper-client -server $ZOOKEEPERS -cmd ls /consumers

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[]

But the kafka-consumer-offset-checker does work for many topics anyway, and it 
fails for a few.

Why does this happen? How can I fix it?

Thanks,
Ara.





This message is for the designated recipient only and may contain privileged, 
proprietary, or otherwise confidential information. If you have received it in 
error, please notify the sender immediately and delete the original. Any other 
use of the e-mail by you is prohibited. Thank you in advance for your 
cooperation.






Re: Does KafkaStreams be configured to use multiple broker clusters in the same topology

2017-02-02 Thread Jan Filipiak

Sometimes I wake up cause I dreamed that this had gone down:

https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics




On 02.02.2017 19:07, Roger Vandusen wrote:

Ah, yes, I see your point and use case, thanks for the feedback.

On 2/2/17, 11:02 AM, "Damian Guy"  wrote:

 Hi Roger,
 
 The problem is that you can't do it ansyc and still guarantee at-least-once

 delivery. For example:
 if your streams app looked something like this:
 
 builder.stream("input").mapValue(...).process(yourCustomerProcessSupplier);
 
 On the commit interval, kafka streams will commit the consumed offsets for

 the topic "input". Now if you do an async call in process, there is no
 guarantee that the message has been delivered. The broker might fail, there
 may be some other transient error. So you can end up dropping messages as
 the consumer has committed the offset of the source topic, but the receiver
 has not actually received it.
 
 Does that make sense?
 
 Thanks,

 Damian
 
 On Thu, 2 Feb 2017 at 17:56 Roger Vandusen 

 wrote:
 
 > Damian,

 >
 > We could lessen the producer.send(..).get() impact on throughput by 
simply
 > handing it off to another async worker component in our springboot app, 
any
 > feedback on that?
 >
 > -Roger
 >
 > On 2/2/17, 10:35 AM, "Damian Guy"  wrote:
 >
 > Hi, yes you could attach a custom processor that writes to another
 > Kafka
 > cluster. The problem is going to be guaranteeing at least once 
delivery
 > without impacting throughput. To guarantee at least once you would
 > need to
 > do a blocking send on every call to process, i.e.,
 > producer.send(..).get(),
 > this is going to have an impact on throughput, but i can't currently
 > think
 > of another way of doing it (with the current framework) that will
 > guarantee
 > at-least-once delivery.
 >
 > On Thu, 2 Feb 2017 at 17:26 Roger Vandusen <
 > roger.vandu...@ticketmaster.com>
 > wrote:
 >
 > > Thanks for the quick reply Damian.
 > >
 > > So the work-around would be to configure our source topology’s 
with a
 > > processor component that would use another app component as a
 > stand-alone
 > > KafkaProducer, let’s say an injected spring bean, configured to the
 > other
 > > (sink) cluster, and then publish sink topic messages through this
 > producer
 > > to the sink cluster?
 > >
 > > Sound like a solution? Have a better suggestion or any warnings
 > about this
 > > approach?
 > >
 > > -Roger
 > >
 > >
 > > On 2/2/17, 10:10 AM, "Damian Guy"  wrote:
 > >
 > > Hi Roger,
 > >
 > > This is not currently supported and won't be available in
 > 0.10.2.0.
 > > This has been discussed, but it doesn't look there is a JIRA 
for
 > it
 > > yet.
 > >
 > > Thanks,
 > > Damian
 > >
 > > On Thu, 2 Feb 2017 at 16:51 Roger Vandusen <
 > > roger.vandu...@ticketmaster.com>
 > > wrote:
 > >
 > > > We would like to source topics from one cluster and sink them
 > to a
 > > > different cluster from the same topology.
 > > >
 > > > If this is not currently supported then is there a KIP/JIRA 
to
 > track
 > > work
 > > > to support this in the future? 0.10.2.0?
 > > >
 > > > -Roger
 > > >
 > > >
 > >
 > >
 > >
 >
 >
 >
 





Re: [DISCUSS] KIP-114: KTable materialization and improved semantics

2017-01-30 Thread Jan Filipiak

Hi Eno,

thanks for putting into different points. I want to put a few remarks 
inline.


Best Jan

On 30.01.2017 12:19, Eno Thereska wrote:

So I think there are several important discussion threads that are emerging 
here. Let me try to tease them apart:

1. inconsistency in what is materialized and what is not, what is queryable and 
what is not. I think we all agree there is some inconsistency there and this 
will be addressed with any of the proposed approaches. Addressing the 
inconsistency is the point of the original KIP.

2. the exact API for materializing a KTable. We can specify 1) a "store name" (as we do today) or 
2) have a ".materialize[d]" call or 3) get a handle from a KTable ".getQueryHandle" or 4) 
have a builder construct. So we have discussed 4 options. It is important to remember in this discussion that 
IQ is not designed for just local queries, but also for distributed queries. In all cases an identifying 
name/id is needed for the store that the user is interested in querying. So we end up with a discussion on 
who provides the name, the user (as done today) or if it is generated automatically (as Jan suggests, as I 
understand it). If it is generated automatically we need a way to expose these auto-generated names to the 
users and link them to the KTables they care to query.
Hi, the last sentence is what I currently arguing against. The user 
would never see a stringtype indentifier name or anything. All he gets 
is the queryHandle if he executes a get(K) that will be an interactive 
query get. with all the finding the right servers that currently have a 
copy of this underlying store stuff going on. The nice part is that if 
someone retrieves a queryHandle, you know that you have to materialized 
(if you are not already) as queries will be coming. Taking away the 
confusion mentioned in point 1 IMO.


3. The exact boundary between the DSL, that is the processing language, and the 
storage/IQ queries, and how we jump from one to the other. This is mostly for 
how we get a handle on a store (so it's related to point 2), rather than for 
how we query the store. I think we all agree that we don't want to limit ways 
one can query a store (e.g., using gets or range queries etc) and the query 
APIs are not in the scope of the DSL.
Does the IQ work with range currently? The range would have to be 
started on all stores and then merged by maybe the client. Range force a 
flush to RocksDB currently so I am sure you would get a performance hit 
right there. Time-windows might be okay, but I am not sure if the first 
version should offer the user range access.


4. The nature of the DSL and whether its declarative enough, or flexible 
enough. Damian made the point that he likes the builder pattern since users can 
specify, per KTable, things like caching and logging needs. His observation (as 
I understand it) is that the processor API (PAPI) is flexible but doesn't 
provide any help at all to users. The current DSL provides declarative 
abstractions, but it's not fine-grained enough. This point is much broader than 
the KIP, but discussing it in this KIPs context is ok, since we don't want to 
make small piecemeal changes and then realise we're not in the spot we want to 
be.
This is indeed much broader. My guess here is that's why both API's 
exists and helping the users to switch back and forth might be a thing.


Feel free to pitch in if I have misinterpreted something.

Thanks
Eno



On 30 Jan 2017, at 10:22, Jan Filipiak  wrote:

Hi Eno,

I have a really hard time understanding why we can't. From my point of view 
everything could be super elegant DSL only + public api for the PAPI-people as 
already exist.

The above aproach implementing a .get(K) on KTable is foolisch in my opinion as 
it would be to late to know that materialisation would be required.
But having an API that allows to indicate I want to query this table and then 
wrapping the say table's processorname can work out really really nice. The 
only obstacle I see is people not willing to spend the additional time in 
implementation and just want a quick shot option to make it work.

For me it would look like this:

table =  builder.table()
filteredTable = table.filter()
rawHandle = table.getQueryHandle() // Does the materialisation, really all 
names possible but id rather hide the implication of it materializes
filteredTableHandle = filteredTable.getQueryHandle() // this would _not_ 
materialize again of course, the source or the aggregator would stay the only 
materialized processors
streams = new streams(builder)

This middle part is highly flexible I could imagin to force the user todo 
something like this. This implies to the user that his streams need to be 
running
instead of propagating the missing initialisation back by exceptions. Also if 
the users is forced to pass the appropriate streams instance back can change.
I think its

  1   2   >