Re: ZK, Kafka, Streamclient not cleanly starting up (two issues)

2017-01-31 Thread Brian Krahmer

Hi Peter,

  We are doing heavy microservice development (many of which comm with 
kafka), and a typical service has an integration test suite that runs 
under docker-compose.  We found the fastest way to get our service up 
and running is to disable topic auto-create and use the topic-creation 
parameters in the docker file.  We then have a check in the service 
startup that waits for the topics to be created before building 
topologies, and can likewise monitor the logs with some code in our 
junit tests to know when kafka is fully ready.  Hope that helps.


brian


On 31.01.2017 18:28, Peter Kopias wrote:

Hello.

  I've got a local virtual development environment, with:
  - kafka 0.10.1.1
  - java version "1.8.0_121"

  I don't need anything special, this is just for trial, so I set up zk and
kafka and the stream processor to use /tmp for data, log and state.

  It's not persistent, but I can always try new things, with no memory
effect at all. :)

  Issue1:
  - I coldstart zk (no previous history whatsoever)
  - I coldstart kafka (no previous history whatsoever)
  - Seems fine.
  - I coldstart my stream api node

My app writes these to stdout for the first 2 seconds:

[2017-01-31 12:49:16,062] WARN [StreamThread-1] Error while fetching
metadata with correlation id 1 :
{mousetracker-pixelprocessor-Count-repartition=LEADER_NOT_AVAILABLE,
mousetracker-ioclient2backend=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)

... >8 cut here    8<





Re: Re: Problem with processor API partition assignments

2017-01-09 Thread Brian Krahmer

Hi Damian,
  I started to pull together enough code to show my topology, and have 
a feeling I see what the problem is.  Is it correct that when 
configuring a processor that it uses the sources applied to that 
processor to ensure the partitions are aligned for that task?


thanks,
brian

On 05.01.2017 13:37, Damian Guy wrote:

Hi Brian,

It might be helpful if you provide some code showing your Topology.

Thanks,
Damian

On Thu, 5 Jan 2017 at 10:59 Brian Krahmer  wrote:


Hey guys,

I'm fighting an issue where I can currently only run one instance of
my streams application because when other instances come up, the
partition reassignment (looks to me) to be incorrect.

I'm testing with docker-compose at the moment.  When I scale my
application to 3 instances and the 2nd and 3rd connect to kafka, causing
a rebalance, I get the following assignment on one of my instances:

FleetData-0
FleetData-1
VehicleJourneyMapData-0
VehicleJourneyMapData-1
JourneyStarted-0
VehicleStateChanged-1
VehicleIgnitionData-0
VinVehicleMapData-1

As you can see, the assignments are clearly not symmetric, which causes
problems, as I'm essentially doing join operations.  All topics have 3
partition in this testing scenario.  I'm using version 0.10.1.0.  Any
ideas?

thanks,
brian






Problem with processor API partition assignments

2017-01-05 Thread Brian Krahmer

Hey guys,

  I'm fighting an issue where I can currently only run one instance of 
my streams application because when other instances come up, the 
partition reassignment (looks to me) to be incorrect.


I'm testing with docker-compose at the moment.  When I scale my 
application to 3 instances and the 2nd and 3rd connect to kafka, causing 
a rebalance, I get the following assignment on one of my instances:


FleetData-0
FleetData-1
VehicleJourneyMapData-0
VehicleJourneyMapData-1
JourneyStarted-0
VehicleStateChanged-1
VehicleIgnitionData-0
VinVehicleMapData-1

As you can see, the assignments are clearly not symmetric, which causes 
problems, as I'm essentially doing join operations.  All topics have 3 
partition in this testing scenario.  I'm using version 0.10.1.0.  Any ideas?


thanks,
brian



Problem with multiple joins in one topology

2016-12-07 Thread Brian Krahmer

Hey guys,

  I'm having a hell of a time here.  I've worked for days trying to get 
this joining pipeline working.  I thought I had it working last week, 
but my jubilation was premature.  The point was to take data in from 
five different topics and merge them together to obtain one enriched 
event (output to compacted topic).  Can anybody spot what I'm doing 
wrong?  The ordering makes no difference.  For example, I've switched 
the locationInput and the vehicleReservedInput inputs in the leftJoin 
calls below, and I get the same results.  The location part of the 
enrichment works while the vehicleReserved part does not.  I can't even 
think of how to restructure the topology without resorting to building 
my own lower-level topology.


thanks,
brian

KTable fleetInput = 
builder.table(Serdes.String(),
vehicleFinderDataSerde, FLEET_TOPIC, 
VEHICLE_ENRICHER_FLEET_STORE);

...
fleetInput.print("fleetInput");
locationInput.print("locationInput");
vehicleReservedInput.print("vehicleReservedInput");
vehicleReleasedInput.print("vehicleReleasedInput");
vehicleUsageEndedInput.print("vehicleUsageEndedInput");

KTable mergeStepOne = 
fleetInput.leftJoin(locationInput, VehicleFinderData::merge);

mergeStepOne.print("mergeStepOne");
KTable mergeStepTwo = 
mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge);

mergeStepTwo.print("mergeStepTwo");
KTable mergeStepThree = 
mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge);

mergeStepThree.print("mergeStepThree");
KTable mergeStepFour = 
mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge);

mergeStepFour.print("mergeStepFour");

** Generate a location event **

[locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Deserializing from topic VehicleEnricherFleetStore
Merge operation called
[mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)


** New event correctly serialized **

---

** Generate a vehicleReserved event **

[vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped 
json value}<-null)

[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null)

** NO EVENT **



Problem with multiple joins in one topology

2016-12-07 Thread Brian Krahmer

Hey guys,

  I'm having a hell of a time here.  I've worked for days trying to get 
this joining pipeline working.  I thought I had it working last week, 
but my jubilation was premature.  The point was to take data in from 
five different topics and merge them together to obtain one enriched 
event (output to compacted topic).  Can anybody spot what I'm doing 
wrong?  The ordering makes no difference.  For example, I've switched 
the locationInput and the vehicleReservedInput inputs in the leftJoin 
calls below, and I get the same results.  The location part of the 
enrichment works while the vehicleReserved part does not.  I can't even 
think of how to restructure the topology without resorting to building 
my own lower-level topology.


thanks,
brian


KTable fleetInput = 
builder.table(Serdes.String(),
vehicleFinderDataSerde, FLEET_TOPIC, 
VEHICLE_ENRICHER_FLEET_STORE);

...
fleetInput.print("fleetInput");
locationInput.print("locationInput");
vehicleReservedInput.print("vehicleReservedInput");
vehicleReleasedInput.print("vehicleReleasedInput");
vehicleUsageEndedInput.print("vehicleUsageEndedInput");

KTable mergeStepOne = 
fleetInput.leftJoin(locationInput, VehicleFinderData::merge);

mergeStepOne.print("mergeStepOne");
KTable mergeStepTwo = 
mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge);

mergeStepTwo.print("mergeStepTwo");
KTable mergeStepThree = 
mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge);

mergeStepThree.print("mergeStepThree");
KTable mergeStepFour = 
mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge);

mergeStepFour.print("mergeStepFour");

** Generate a location event **

[locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Deserializing from topic VehicleEnricherFleetStore
Merge operation called
[mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)

Merge operation called
[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json 
value}<-null)


** New event correctly serialized **

---

** Generate a vehicleReserved event **

[vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped 
json value}<-null)

[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null)

** NO EVENT **



Re: Re: Topic discovery when supporting multiple kafka clusters

2016-12-06 Thread Brian Krahmer
You didn't mention anything about your current configuration, just that 
you are 'out of resources'.  Perhaps you misunderstand how to size your 
partitions per topic, and how partition allocation works.  If your 
brokers are maxed on cpu, and you double the number of brokers but keep 
the replica count the same, I would expect cpu usage to nearly get cut 
in half.  How many brokers do you have, how many topics do you have and 
how many partitions per topic do you have?  What is your resource 
utilization for bandwidth, CPU, and memory?  How many average consumers 
do you have for each topic?


brian


On 06.12.2016 21:23, Yifan Ying wrote:

Hi Aseem, the concern is to create too many partitions in total in one
cluster no matter how many brokers I have in this cluster. I think the two
articles that I mentioned explain why too many partitions in one cluster
could cause issues.






Re: Streams - merging multiple topics

2016-11-22 Thread Brian Krahmer
Thanks Damian!  Based on your response, I finally got it working.  I did 
end up using left joins and added a final step that goes from table -> 
stream and then filters out nulls.


thanks,
brian


On 21.11.2016 22:03, Damian Guy wrote:

Hi Brian,

It sounds like you might want do something like:

KTable inputOne = builder.table("input-one");
KTable inputTwo = builder.table("input-two");
KTable inputThree = builder.table("input-three");
ValueJoiner joiner1 = //...
ValueJoiner joiner2 = //...

inputOne.join(inputTwo, joiner1)
.join(inputThree, joiner2)
.to(outputTopic)

This would result in the join logic being triggered when any record arrives
in any of the input topics. There will be some de-duplication of results
written to the output topic. If you want immediate output then you will
want to set StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG = 0. You should
create and configure the output topic yourself before you run your streams
app (so you can make it compacted etc).

Thanks,
Damian






Streams - merging multiple topics

2016-11-21 Thread Brian Krahmer
Hey guys. I've been banging my head for about 3 days now trying to get a 
streams application working with no luck. I've read through all of the 
docs and examples I can find, and just am not getting it.  I'm using 
0.10.1 and have worked quite a bit with the high-level consumer and 
publisher. What I would like to do is stream from 3 different topics 
(all keyed by the same uuid, with each having their own value type), and 
when a message comes in, compute a new aggregated value to a 4th topic. 
Each of the 3 topics has components that make up the output (though one 
is required and is guaranteed to be generated before any of the others). 
I would also like the output topic to be compacted. My first doubt is 
whether I should use windowing or not. I want to get low-latency 
throughput. Second, it's not clear to me how I should put the pipeline 
together. Do I start with an empty KTable and stream all 3 topics into 
it using join operators? TIA for any help. This looks like a great 
technology, and we have multiple use cases of event enriching that we'd 
like to do with this technique.


thanks, brian