Hi!

I've got some issues I'm struggling to debug properly:

Im receiving two streams, each has a binary object as key and a binary
object as value.
The keys are built with a Extractor (the data coming from kafka has a String
as key).
When I'm simply starting my stack, everything runs fine - when I'm storing
some data in kafka before starting the connectors, I'm running into
following error (FULL STACKTRACE:  https://pastebin.com/9ykra6Ei ):

java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.ignite.binary.BinaryObject

For following function:

long startTimestamp = prevGpsValue.<Long>field("timestamp");
        long endTimestamp = curGpsValue.<Long>field("timestamp");

        IgniteCache<BinaryObject, BinaryObject> apCache =
Ignition.ignite().cache("AccelerationPoint").withKeepBinary();

        ScanQuery<BinaryObject, BinaryObject> scan = new ScanQuery<>(
                (IgniteBiPredicate<BinaryObject, BinaryObject>) (key, value)
->
                        (key.<Long>field("timestamp") >= startTimestamp
                                && key.<Long>field("timestamp") <
endTimestamp)
                                &&
key.<String>field("deviceId").equals(curGpsKey.<String>field("deviceId"))
                                &&
key.<Long>field("measurementId").equals(curGpsKey.<Long>field("measurementId"))
                                && !value.<Boolean>field("interpolated")

        );

        scan.setLocal(true);

        try (QueryCursor<Cache.Entry&lt;BinaryObject, BinaryObject>> cursor
= apCache.query(scan)) {
            for (Cache.Entry<BinaryObject, BinaryObject> entry : cursor) {
                interpolate(prevGpsValue, curGpsValue, entry.getKey());
            }
        } catch (Exception e) {
             e.printStackTrace();
        }


Apparently, the cursor is receiving a String where he expected a
BinaryObject - yet I can't figure out how thats even possible.
Both streams have a continuous query listing to all incoming events, and
neither of them is throwing errors. Since it has to be some entry in the
apCache, here's my CQ for those events:

 IgniteCache<BinaryObject, BinaryObject> apCache =
Ignition.ignite().cache("AccelerationPoint").withKeepBinary();


        ContinuousQuery<BinaryObject, BinaryObject> continuousQuery = new
ContinuousQuery<>();

        continuousQuery.setLocalListener(evts -> {
            for (CacheEntryEvent<? extends BinaryObject, ? extends
BinaryObject> e : evts) {

                processAccelerationPoint(e.getKey(), e.getValue());
            }
        });

        continuousQuery.setRemoteFilter(e -> e.getEventType() ==
EventType.CREATED);

        continuousQuery.setLocal(true);

        apCache.query(continuousQuery);


The function processAccelerationPoint called here relies on it being
BinaryObjects and modifies them as follows:

IgniteCache<BinaryObject, BinaryObject> accCache =
Ignition.ignite().cache("AccelerationPoint").withKeepBinary();

        if
(dcmgMatrixMap.containsKey(accPointKey.<Long>field("measurementId"))) {

            accCache.<BinaryObject, BinaryObject>withKeepBinary().invoke(
                    accPointKey, (CacheEntryProcessor<BinaryObject,
BinaryObject, Object>) (entry, objects) -> {
                        RealMatrix dcm_g =
dcmgMatrixMap.get(entry.getValue().<Long>field("measurementId"));
                        double[] accPointVector = dcm_g.operate(new
double[]{entry.getValue().<Double>field("ax"),
entry.getValue().<Double>field("ay"),
entry.getValue().<Double>field("az")});

                        BinaryObjectBuilder builder =
entry.getValue().toBuilder();

                        double zMean = dcm_g.getEntry(2, 2);

                        builder.setField("ax", accPointVector[0]);
                        builder.setField("ay", accPointVector[1]);
                        builder.setField("az", accPointVector[2] - zMean);

                        builder.setField("calibrated", true);

                        if (builder.getField("interpolated")) {
                           
MetricStatus.dataLatency.add(System.currentTimeMillis() - (Long)
builder.getField("createdAt"));
                        }
                        entry.setValue(builder.build());

                        return null;
                    });
        } else {
            calibrate(accPointKey, accPointValue);
        }



I really got no clue whats going on, if somehow kafka data can enter the
cache without being transformed by the extractor and without getting caught
by the CQs.
Any hints would be appreciated!

Best regards,
svonn






--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to