Hey guys,
I'm having a hell of a time here. I've worked for days trying to get
this joining pipeline working. I thought I had it working last week,
but my jubilation was premature. The point was to take data in from
five different topics and merge them together to obtain one enriched
event (output to compacted topic). Can anybody spot what I'm doing
wrong? The ordering makes no difference. For example, I've switched
the locationInput and the vehicleReservedInput inputs in the leftJoin
calls below, and I get the same results. The location part of the
enrichment works while the vehicleReserved part does not. I can't even
think of how to restructure the topology without resorting to building
my own lower-level topology.
thanks,
brian
KTable<String, VehicleFinderData> fleetInput =
builder.table(Serdes.String(),
vehicleFinderDataSerde, FLEET_TOPIC,
VEHICLE_ENRICHER_FLEET_STORE);
...
fleetInput.print("fleetInput");
locationInput.print("locationInput");
vehicleReservedInput.print("vehicleReservedInput");
vehicleReleasedInput.print("vehicleReleasedInput");
vehicleUsageEndedInput.print("vehicleUsageEndedInput");
KTable<String, VehicleFinderData> mergeStepOne =
fleetInput.leftJoin(locationInput, VehicleFinderData::merge);
mergeStepOne.print("mergeStepOne");
KTable<String, VehicleFinderData> mergeStepTwo =
mergeStepOne.leftJoin(vehicleReleasedInput, VehicleFinderData::merge);
mergeStepTwo.print("mergeStepTwo");
KTable<String, VehicleFinderData> mergeStepThree =
mergeStepTwo.leftJoin(vehicleUsageEndedInput, VehicleFinderData::merge);
mergeStepThree.print("mergeStepThree");
KTable<String, VehicleFinderData> mergeStepFour =
mergeStepThree.leftJoin(vehicleReservedInput, VehicleFinderData::merge);
mergeStepFour.print("mergeStepFour");
** Generate a location event **
[locationInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
value}<-null)
Deserializing from topic VehicleEnricherFleetStore
Merge operation called
[mergeStepOne]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
value}<-null)
Merge operation called
[mergeStepTwo]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
value}<-null)
Merge operation called
[mergeStepThree]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
value}<-null)
Merge operation called
[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped json
value}<-null)
** New event correctly serialized **
-------------------------------------------------------
** Generate a vehicleReserved event **
[vehicleReservedInput]: 93838671-e591-4849-ae12-6f30cb9ff7bd , ({snipped
json value}<-null)
[mergeStepFour]: 93838671-e591-4849-ae12-6f30cb9ff7bd , (null<-null)
** NO EVENT **