Re: ZK, Kafka, Streamclient not cleanly starting up (two issues)
Hi Peter, We are doing heavy microservice development (many of which comm with kafka), and a typical service has an integration test suite that runs under docker-compose. We found the fastest way to get our service up and running is to disable topic auto-create and use the topic-creation parameters in the docker file. We then have a check in the service startup that waits for the topics to be created before building topologies, and can likewise monitor the logs with some code in our junit tests to know when kafka is fully ready. Hope that helps. brian On 31.01.2017 18:28, Peter Kopias wrote: Hello. I've got a local virtual development environment, with: - kafka 0.10.1.1 - java version "1.8.0_121" I don't need anything special, this is just for trial, so I set up zk and kafka and the stream processor to use /tmp for data, log and state. It's not persistent, but I can always try new things, with no memory effect at all. :) Issue1: - I coldstart zk (no previous history whatsoever) - I coldstart kafka (no previous history whatsoever) - Seems fine. - I coldstart my stream api node My app writes these to stdout for the first 2 seconds: [2017-01-31 12:49:16,062] WARN [StreamThread-1] Error while fetching metadata with correlation id 1 : {mousetracker-pixelprocessor-Count-repartition=LEADER_NOT_AVAILABLE, mousetracker-ioclient2backend=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) ... >8 cut here 8<
Re: Re: Problem with processor API partition assignments
Hi Damian, I started to pull together enough code to show my topology, and have a feeling I see what the problem is. Is it correct that when configuring a processor that it uses the sources applied to that processor to ensure the partitions are aligned for that task? thanks, brian On 05.01.2017 13:37, Damian Guy wrote: Hi Brian, It might be helpful if you provide some code showing your Topology. Thanks, Damian On Thu, 5 Jan 2017 at 10:59 Brian Krahmer wrote: Hey guys, I'm fighting an issue where I can currently only run one instance of my streams application because when other instances come up, the partition reassignment (looks to me) to be incorrect. I'm testing with docker-compose at the moment. When I scale my application to 3 instances and the 2nd and 3rd connect to kafka, causing a rebalance, I get the following assignment on one of my instances: FleetData-0 FleetData-1 VehicleJourneyMapData-0 VehicleJourneyMapData-1 JourneyStarted-0 VehicleStateChanged-1 VehicleIgnitionData-0 VinVehicleMapData-1 As you can see, the assignments are clearly not symmetric, which causes problems, as I'm essentially doing join operations. All topics have 3 partition in this testing scenario. I'm using version 0.10.1.0. Any ideas? thanks, brian
Problem with processor API partition assignments
Hey guys, I'm fighting an issue where I can currently only run one instance of my streams application because when other instances come up, the partition reassignment (looks to me) to be incorrect. I'm testing with docker-compose at the moment. When I scale my application to 3 instances and the 2nd and 3rd connect to kafka, causing a rebalance, I get the following assignment on one of my instances: FleetData-0 FleetData-1 VehicleJourneyMapData-0 VehicleJourneyMapData-1 JourneyStarted-0 VehicleStateChanged-1 VehicleIgnitionData-0 VinVehicleMapData-1 As you can see, the assignments are clearly not symmetric, which causes problems, as I'm essentially doing join operations. All topics have 3 partition in this testing scenario. I'm using version 0.10.1.0. Any ideas? thanks, brian
Problem with multiple joins in one topology
Hey guys, I'm having a hell of a time here. I've worked for days trying to get this joining pipeline working. I thought I had it working last week, but my jubilation was premature. The point was to take data in from five different topics and merge them together to obtain one enriched event (output to compacted topic). Can anybody spot what I'm doing wrong? The ordering makes no difference. For example, I've switched the locationInput and the vehicleReservedInput inputs in the leftJoin calls below, and I get the same results. The location part of the enrichment works while the vehicleReserved part does not. I can't even think of how to restructure the topology without resorting to building my own lower-level topology. thanks, brian KTable fleetInput = builder.table(Serdes.String(), vehicleFinderDataSerde, FLEET_TOPIC, VEHICLE_ENRICHER_FLEET_STORE); ... fleetInput.print("fleetInput"); locationInput.print("locationInput"); vehicleReservedInput.print("vehicleReservedInput"); vehicleReleasedInput.print("vehicleReleasedInput"); vehicleUsageEndedInput.print("vehicleUsageEndedInput"); KTable mergeStepOne = fleetInput.leftJoin(locationInput, VehicleFinderData::merge); mergeStepOne.print("mergeStepOne"); KTable mergeStepTwo = mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge); mergeStepTwo.print("mergeStepTwo"); KTable mergeStepThree = mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge); mergeStepThree.print("mergeStepThree"); KTable mergeStepFour = mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge); mergeStepFour.print("mergeStepFour"); ** Generate a location event ** [locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) Deserializing from topic VehicleEnricherFleetStore Merge operation called [mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) Merge operation called [mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) Merge operation called [mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) Merge operation called [mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) ** New event correctly serialized ** --- ** Generate a vehicleReserved event ** [vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) [mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null) ** NO EVENT **
Problem with multiple joins in one topology
Hey guys, I'm having a hell of a time here. I've worked for days trying to get this joining pipeline working. I thought I had it working last week, but my jubilation was premature. The point was to take data in from five different topics and merge them together to obtain one enriched event (output to compacted topic). Can anybody spot what I'm doing wrong? The ordering makes no difference. For example, I've switched the locationInput and the vehicleReservedInput inputs in the leftJoin calls below, and I get the same results. The location part of the enrichment works while the vehicleReserved part does not. I can't even think of how to restructure the topology without resorting to building my own lower-level topology. thanks, brian KTable fleetInput = builder.table(Serdes.String(), vehicleFinderDataSerde, FLEET_TOPIC, VEHICLE_ENRICHER_FLEET_STORE); ... fleetInput.print("fleetInput"); locationInput.print("locationInput"); vehicleReservedInput.print("vehicleReservedInput"); vehicleReleasedInput.print("vehicleReleasedInput"); vehicleUsageEndedInput.print("vehicleUsageEndedInput"); KTable mergeStepOne = fleetInput.leftJoin(locationInput, VehicleFinderData::merge); mergeStepOne.print("mergeStepOne"); KTable mergeStepTwo = mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge); mergeStepTwo.print("mergeStepTwo"); KTable mergeStepThree = mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge); mergeStepThree.print("mergeStepThree"); KTable mergeStepFour = mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge); mergeStepFour.print("mergeStepFour"); ** Generate a location event ** [locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) Deserializing from topic VehicleEnricherFleetStore Merge operation called [mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) Merge operation called [mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) Merge operation called [mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) Merge operation called [mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) ** New event correctly serialized ** --- ** Generate a vehicleReserved event ** [vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null) [mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null) ** NO EVENT **
Re: Re: Topic discovery when supporting multiple kafka clusters
You didn't mention anything about your current configuration, just that you are 'out of resources'. Perhaps you misunderstand how to size your partitions per topic, and how partition allocation works. If your brokers are maxed on cpu, and you double the number of brokers but keep the replica count the same, I would expect cpu usage to nearly get cut in half. How many brokers do you have, how many topics do you have and how many partitions per topic do you have? What is your resource utilization for bandwidth, CPU, and memory? How many average consumers do you have for each topic? brian On 06.12.2016 21:23, Yifan Ying wrote: Hi Aseem, the concern is to create too many partitions in total in one cluster no matter how many brokers I have in this cluster. I think the two articles that I mentioned explain why too many partitions in one cluster could cause issues.
Re: Streams - merging multiple topics
Thanks Damian! Based on your response, I finally got it working. I did end up using left joins and added a final step that goes from table -> stream and then filters out nulls. thanks, brian On 21.11.2016 22:03, Damian Guy wrote: Hi Brian, It sounds like you might want do something like: KTable inputOne = builder.table("input-one"); KTable inputTwo = builder.table("input-two"); KTable inputThree = builder.table("input-three"); ValueJoiner joiner1 = //... ValueJoiner joiner2 = //... inputOne.join(inputTwo, joiner1) .join(inputThree, joiner2) .to(outputTopic) This would result in the join logic being triggered when any record arrives in any of the input topics. There will be some de-duplication of results written to the output topic. If you want immediate output then you will want to set StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG = 0. You should create and configure the output topic yourself before you run your streams app (so you can make it compacted etc). Thanks, Damian
Streams - merging multiple topics
Hey guys. I've been banging my head for about 3 days now trying to get a streams application working with no luck. I've read through all of the docs and examples I can find, and just am not getting it. I'm using 0.10.1 and have worked quite a bit with the high-level consumer and publisher. What I would like to do is stream from 3 different topics (all keyed by the same uuid, with each having their own value type), and when a message comes in, compute a new aggregated value to a 4th topic. Each of the 3 topics has components that make up the output (though one is required and is guaranteed to be generated before any of the others). I would also like the output topic to be compacted. My first doubt is whether I should use windowing or not. I want to get low-latency throughput. Second, it's not clear to me how I should put the pipeline together. Do I start with an empty KTable and stream all 3 topics into it using join operators? TIA for any help. This looks like a great technology, and we have multiple use cases of event enriching that we'd like to do with this technique. thanks, brian