Hey guys,

I'm having a hell of a time here. I've worked for days trying to get this joining pipeline working. I thought I had it working last week, but my jubilation was premature. The point was to take data in from five different topics and merge them together to obtain one enriched event (output to compacted topic). Can anybody spot what I'm doing wrong? The ordering makes no difference. For example, I've switched the locationInput and the vehicleReservedInput inputs in the leftJoin calls below, and I get the same results. The location part of the enrichment works while the vehicleReserved part does not. I can't even think of how to restructure the topology without resorting to building my own lower-level topology.

thanks,
brian


KTable<String, VehicleFinderData> fleetInput = builder.table(Serdes.String(), vehicleFinderDataSerde, FLEET_TOPIC, VEHICLE_ENRICHER_FLEET_STORE);
...
fleetInput.print("fleetInput");
locationInput.print("locationInput");
vehicleReservedInput.print("vehicleReservedInput");
vehicleReleasedInput.print("vehicleReleasedInput");
vehicleUsageEndedInput.print("vehicleUsageEndedInput");

KTable<String, VehicleFinderData> mergeStepOne = fleetInput.leftJoin(locationInput, VehicleFinderData::merge);
mergeStepOne.print("mergeStepOne");
KTable<String, VehicleFinderData> mergeStepTwo = mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge);
mergeStepTwo.print("mergeStepTwo");
KTable<String, VehicleFinderData> mergeStepThree = mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge);
mergeStepThree.print("mergeStepThree");
KTable<String, VehicleFinderData> mergeStepFour = mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge);
mergeStepFour.print("mergeStepFour");

** Generate a location event **

[locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null)
Deserializing from topic VehicleEnricherFleetStore
Merge operation called
[mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null)
Merge operation called
[mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null)
Merge operation called
[mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null)
Merge operation called
[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null)

** New event correctly serialized **

-------------------------------------------------------

** Generate a vehicleReserved event **

[vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json value}<-null)
[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null)

** NO EVENT **

Reply via email to