Hi! I've got some issues I'm struggling to debug properly:
Im receiving two streams, each has a binary object as key and a binary object as value. The keys are built with a Extractor (the data coming from kafka has a String as key). When I'm simply starting my stack, everything runs fine - when I'm storing some data in kafka before starting the connectors, I'm running into following error (FULL STACKTRACE: https://pastebin.com/9ykra6Ei ): java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.ignite.binary.BinaryObject For following function: long startTimestamp = prevGpsValue.<Long>field("timestamp"); long endTimestamp = curGpsValue.<Long>field("timestamp"); IgniteCache<BinaryObject, BinaryObject> apCache = Ignition.ignite().cache("AccelerationPoint").withKeepBinary(); ScanQuery<BinaryObject, BinaryObject> scan = new ScanQuery<>( (IgniteBiPredicate<BinaryObject, BinaryObject>) (key, value) -> (key.<Long>field("timestamp") >= startTimestamp && key.<Long>field("timestamp") < endTimestamp) && key.<String>field("deviceId").equals(curGpsKey.<String>field("deviceId")) && key.<Long>field("measurementId").equals(curGpsKey.<Long>field("measurementId")) && !value.<Boolean>field("interpolated") ); scan.setLocal(true); try (QueryCursor<Cache.Entry<BinaryObject, BinaryObject>> cursor = apCache.query(scan)) { for (Cache.Entry<BinaryObject, BinaryObject> entry : cursor) { interpolate(prevGpsValue, curGpsValue, entry.getKey()); } } catch (Exception e) { e.printStackTrace(); } Apparently, the cursor is receiving a String where he expected a BinaryObject - yet I can't figure out how thats even possible. Both streams have a continuous query listing to all incoming events, and neither of them is throwing errors. Since it has to be some entry in the apCache, here's my CQ for those events: IgniteCache<BinaryObject, BinaryObject> apCache = Ignition.ignite().cache("AccelerationPoint").withKeepBinary(); ContinuousQuery<BinaryObject, BinaryObject> continuousQuery = new ContinuousQuery<>(); continuousQuery.setLocalListener(evts -> { for (CacheEntryEvent<? extends BinaryObject, ? extends BinaryObject> e : evts) { processAccelerationPoint(e.getKey(), e.getValue()); } }); continuousQuery.setRemoteFilter(e -> e.getEventType() == EventType.CREATED); continuousQuery.setLocal(true); apCache.query(continuousQuery); The function processAccelerationPoint called here relies on it being BinaryObjects and modifies them as follows: IgniteCache<BinaryObject, BinaryObject> accCache = Ignition.ignite().cache("AccelerationPoint").withKeepBinary(); if (dcmgMatrixMap.containsKey(accPointKey.<Long>field("measurementId"))) { accCache.<BinaryObject, BinaryObject>withKeepBinary().invoke( accPointKey, (CacheEntryProcessor<BinaryObject, BinaryObject, Object>) (entry, objects) -> { RealMatrix dcm_g = dcmgMatrixMap.get(entry.getValue().<Long>field("measurementId")); double[] accPointVector = dcm_g.operate(new double[]{entry.getValue().<Double>field("ax"), entry.getValue().<Double>field("ay"), entry.getValue().<Double>field("az")}); BinaryObjectBuilder builder = entry.getValue().toBuilder(); double zMean = dcm_g.getEntry(2, 2); builder.setField("ax", accPointVector[0]); builder.setField("ay", accPointVector[1]); builder.setField("az", accPointVector[2] - zMean); builder.setField("calibrated", true); if (builder.getField("interpolated")) { MetricStatus.dataLatency.add(System.currentTimeMillis() - (Long) builder.getField("createdAt")); } entry.setValue(builder.build()); return null; }); } else { calibrate(accPointKey, accPointValue); } I really got no clue whats going on, if somehow kafka data can enter the cache without being transformed by the extractor and without getting caught by the CQs. Any hints would be appreciated! Best regards, svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/