RE: Question about data distribution
Yes - so I've been merely observing some 'unlucky' distribution if it's solely chance-based. Since the real data will consist of thousands of measurements it should be fine as it is, thanks! Best regards svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
RE: Question about data distribution
Hi! My class for my keys looks like this. private String deviceId; @AffinityKeyMapped private long measurementId; private long timestamp; public IgniteKey(String deviceId, long measurementId, long timestamp) { this.deviceId = deviceId; this.measurementId = measurementId; this.timestamp = timestamp; } One device can have multiple measurements, but any calculation only requires other entries from the same measurement as of now, thus only the measurementId should be relevant. One measurement contains 100k - 200k entries in one stream, and 500-1000 in the other stream. Both streams use the same class for keys. Whenever a new measurementId arrives I'm doing some output on the node it's being processed on - I've had following case: Measurement 1 (short M1) -> node1 M2 -> node1 M3 -> node2 M4 -> node1 M5 -> node1 M6 -> node1 I expected that even M2 will already be placed on node2 - however, performance wise, I don't think either node is close to it's limit, I'm not sure if that also relevant. Due to the 5min expiry policy I can end up with one node having ~1 million cache entries while the other one has 0. - svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Question about data distribution
Hi! I have two server nodes and I've set up an AffinityKey mapping via some ID. I'm streaming data from kafka to ignite and for my test data, about 5min worth of data belongs to one ID, then the data for the next ID starts (real data will mostly come in parallel). The cache I'm streaming the data into has about 30min expiration policy. I've noticed that the data seems to get very unevently distributed. One node sometimes gets 9 IDs to work with, while the other one only works on a single ID. Is that due to the fact that they aren't arriving simultaniously? Can this behaviour be adjusted? Best regards svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: ClassCastException Issues
Hi! I actually fixed the issue, even though I'm still not 100% sure why it caused this: The kafka connector got a setting called "tasks.max", which I set to a number higher than 1. After setting tasks.max=1 I can process all data I want without any issues - I assume it somehow can't use the extractor for any additional tasks. - svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
ClassCastException Issues
Hi! I've got some issues I'm struggling to debug properly: Im receiving two streams, each has a binary object as key and a binary object as value. The keys are built with a Extractor (the data coming from kafka has a String as key). When I'm simply starting my stack, everything runs fine - when I'm storing some data in kafka before starting the connectors, I'm running into following error (FULL STACKTRACE: https://pastebin.com/9ykra6Ei ): java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.ignite.binary.BinaryObject For following function: long startTimestamp = prevGpsValue.field("timestamp"); long endTimestamp = curGpsValue.field("timestamp"); IgniteCache<BinaryObject, BinaryObject> apCache = Ignition.ignite().cache("AccelerationPoint").withKeepBinary(); ScanQuery<BinaryObject, BinaryObject> scan = new ScanQuery<>( (IgniteBiPredicate<BinaryObject, BinaryObject>) (key, value) -> (key.field("timestamp") >= startTimestamp && key.field("timestamp") < endTimestamp) && key.field("deviceId").equals(curGpsKey.field("deviceId")) && key.field("measurementId").equals(curGpsKey.field("measurementId")) && !value.field("interpolated") ); scan.setLocal(true); try (QueryCursor<Cache.EntryBinaryObject, BinaryObject>> cursor = apCache.query(scan)) { for (Cache.Entry<BinaryObject, BinaryObject> entry : cursor) { interpolate(prevGpsValue, curGpsValue, entry.getKey()); } } catch (Exception e) { e.printStackTrace(); } Apparently, the cursor is receiving a String where he expected a BinaryObject - yet I can't figure out how thats even possible. Both streams have a continuous query listing to all incoming events, and neither of them is throwing errors. Since it has to be some entry in the apCache, here's my CQ for those events: IgniteCache<BinaryObject, BinaryObject> apCache = Ignition.ignite().cache("AccelerationPoint").withKeepBinary(); ContinuousQuery<BinaryObject, BinaryObject> continuousQuery = new ContinuousQuery<>(); continuousQuery.setLocalListener(evts -> { for (CacheEntryEvent e : evts) { processAccelerationPoint(e.getKey(), e.getValue()); } }); continuousQuery.setRemoteFilter(e -> e.getEventType() == EventType.CREATED); continuousQuery.setLocal(true); apCache.query(continuousQuery); The function processAccelerationPoint called here relies on it being BinaryObjects and modifies them as follows: IgniteCache<BinaryObject, BinaryObject> accCache = Ignition.ignite().cache("AccelerationPoint").withKeepBinary(); if (dcmgMatrixMap.containsKey(accPointKey.field("measurementId"))) { accCache.<BinaryObject, BinaryObject>withKeepBinary().invoke( accPointKey, (CacheEntryProcessor<BinaryObject, BinaryObject, Object>) (entry, objects) -> { RealMatrix dcm_g = dcmgMatrixMap.get(entry.getValue().field("measurementId")); double[] accPointVector = dcm_g.operate(new double[]{entry.getValue().field("ax"), entry.getValue().field("ay"), entry.getValue().field("az")}); BinaryObjectBuilder builder = entry.getValue().toBuilder(); double zMean = dcm_g.getEntry(2, 2); builder.setField("ax", accPointVector[0]); builder.setField("ay", accPointVector[1]); builder.setField("az", accPointVector[2] - zMean); builder.setField("calibrated", true); if (builder.getField("interpolated")) { MetricStatus.dataLatency.add(System.currentTimeMillis() - (Long) builder.getField("createdAt")); } entry.setValue(builder.build()); return null; }); } else { calibrate(accPointKey, accPointValue); } I really got no clue whats going on, if somehow kafka data can enter the cache without being transformed by the extractor and without getting caught by the CQs. Any hints would be appreciated! Best regards, svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Question about persisting stream processing results
Hi, It's less about a specific code snipped, more about a conceptual question: How to keep a time window of 5 minutes in Offheap/Inmemory while storing anything older on the harddrive. @Stan: That sounds like possible solution! I'll just have to figure out how to catch, process and delete those properly. The docu for expiry policies says: "Eager TTL Entries that are expired can be removed from cache either eagerly or when they are touched by different cache operations. If there is at least one cache configured with eager TTL enabled, Ignite will create a single thread to clean up expired entries in the background." That "or when they are touched by different cache operations" sounds like I could disable Eager TTL and just set up: ignite.events().localListen(MyListener, EventType.EVT_CACHE_OBJECT_EXPIRED); And it would delete the cache entries after executing the listener, is that correct? - Svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Question about persisting stream processing results
Hi! I'm receiving two streams of events, stream one (1) is basically only used as basis for interpolating and putting data in stream two (2). Whenever an element in stream (1) arrives, the local listener of my ContinuousQuery starting searching the previous element belonging to the same group. More specifically, its a ScanQuery that compares some IDs and searches for the one that has a timestamp bigger than the current one minus 1500ms while being smaller than the current timestamp. Currently, I want to persist stream (2) while keeping a stable performance. Whats the best way to do that? Simply activating Ignite persistence sounds like it's simply starting to move data from off-heap RAM to the harddrive when the RAM space is shrinking. However, if I understood it correctly, it will still query those elements for all my stream processing queries. So trying to find the previous element of stream (1) or trying to find all elements that are between those two elements in stream (2) would become slower and slower the longer the task runs. The incoming data is relevant for about 5 minutes, thus I tried using an expiration policy. This keeps the performance stable, but I'm not sure how to persist the expired data properly. Also, for calibration purposes, I'm generating a Map to store and apply calibration on elements - when I'm activating the expiry policy, I'm starting to run in Nullpointer Exceptions after about 5 minutes - is the policy also deleting the Map? Best regards Svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: How to do 'stream processing' and more questions of a Ignite newbie
Hi, Thanks alot! I managed to finally get my interpolation up an running. Most of my questions were caused by confusing due to weird peer class loading errors. The errors I encountered while having peer class loading enabled apparently were caused by classes not getting redeployed on the ignite nodes. Hence, I kept runnining into the very same errors despite editing the code. From what I've read at https://apacheignite.readme.io/docs/deployment-modes it should work with the standard 'shared' mode, so I'm not quite sure why it didn't update the classes. I'll do some further testing with 'continuous mode', as of now I need to restart my nodes to redeploy the class. For now, my only question left is following thing: Here's a method from my class: Ignition.setClientMode(true); Ignite ignite = Ignition.ignite(); IgniteCache<BinaryObject, BinaryObject> accCache = Ignition.ignite().cache("AccelerationPoint").withKeepBinary(); (1)double interpolationProgress = ((double) (accPoint.getKey().field("timestamp") - prevGps.field("timestamp"))) / ((double) (curGps.field("timestamp") - prevGps.field("timestamp"))); double interpolatedLat = prevGps.field("lat") + interpolationProgress * (curGps.field("lat") - prevGps.field("lat")); double interpolatedLon = prevGps.field("lon") + interpolationProgress * (curGps.field("lon") - prevGps.field("lon")); (2) System.out.println("Previous lat: " + prevGps.field("lat") + " ; Interpolated lat: " + interpolatedLat + " ; Next lat: " + curGps.field("lat")); (3)accCache.<BinaryObject, BinaryObject>withKeepBinary().invoke( accPoint.getKey(), new CacheEntryProcessor<BinaryObject, BinaryObject, Object>() { public Object process(MutableEntry<BinaryObject, BinaryObject> entry, Object... objects) throws EntryProcessorException { // Create builder from the old value. BinaryObjectBuilder bldr = entry.getValue().toBuilder(); //Update the field in the builder. bldr.setField("lat", interpolatedLat); bldr.setField("lon", interpolatedLon); // Set new value to the entry. entry.setValue(bldr.build()); return null; } }); I'm still trying to grasp how Ignite 'decides' to run the code. For (3) I know that it's run directly on the ignite node. (2) Is printed in my IDE, thus it has to be executed on the client node. Where is (2) being calculated? If its on the client node, whats the best way to transfer that processing to the nodes? If it's on the nodes already, how does it 'know' where to do it? Best regards and thanks alot for the help! Svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: How to do 'stream processing' and more questions of a Ignite newbie
.messages.TcpDiscoveryCustomEventMessage.message(TcpDiscoveryCustomEventMessage.java:81) at org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.notifyDiscoveryListener(ServerImpl.java:5460) at org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.processCustomMessage(ServerImpl.java:5346) at org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.processMessage(ServerImpl.java:2656) at org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.processMessage(ServerImpl.java:2447) at org.apache.ignite.spi.discovery.tcp.ServerImpl$MessageWorkerAdapter.body(ServerImpl.java:6648) at org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.body(ServerImpl.java:2533) at org.apache.ignite.spi.IgniteSpiThread.run(IgniteSpiThread.java:62) Caused by: java.lang.ClassNotFoundException: de.tudresden.inf.streambench.ignite.tasks.Interpolation at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8497) at org.apache.ignite.marshaller.jdk.JdkMarshallerObjectInputStream.resolveClass(JdkMarshallerObjectInputStream.java:54) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1678) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1518) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.readExternal(CacheContinuousQueryHandler.java:1163) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2076) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2025) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.ignite.internal.processors.continuous.StartRequestData.readExternal(StartRequestData.java:260) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2076) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2025) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:121) ... 12 more 3. - This question might already by answered by answering 1 and 2 - I don't understand how deployment in Docker is supposed to work yet. When enabling peer class loading, I was able to run the above code when running the logic from 'getPreviousGpsPoint' directly within the listener / initial query cursor. I encountered two issues here: 4.1. Before refactoring the logic to a method, the scan query would find too many results - From what I've seen it didn't matter if I scanned for timestamp differences between 0 and 1 or 0 and 1. 4.2. After refactoring it into the method it was treated differently - it looked like it tried to run directly on the nodes, where it failed because it couldn't cast my class "Interpolation" to IgniteBiPredicate - since I had no clue what the hell it was trying to do I turned off peer class loading, spooky stuff. 4.3. Closely linked to all of that: How would you run the continuous query on the server nodes instead of the client node - currently it would try to process anything directly on the client node, wouldn't it? Best regards Svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: How to do 'stream processing' and more questions of a Ignite newbie
e.message(TcpDiscoveryCustomEventMessage.java:81) at org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.notifyDiscoveryListener(ServerImpl.java:5460) at org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.processCustomMessage(ServerImpl.java:5346) at org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.processMessage(ServerImpl.java:2656) at org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.processMessage(ServerImpl.java:2447) at org.apache.ignite.spi.discovery.tcp.ServerImpl$MessageWorkerAdapter.body(ServerImpl.java:6648) at org.apache.ignite.spi.discovery.tcp.ServerImpl$RingMessageWorker.body(ServerImpl.java:2533) at org.apache.ignite.spi.IgniteSpiThread.run(IgniteSpiThread.java:62) Caused by: java.lang.ClassNotFoundException: de.tudresden.inf.streambench.ignite.tasks.Interpolation at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8497) at org.apache.ignite.marshaller.jdk.JdkMarshallerObjectInputStream.resolveClass(JdkMarshallerObjectInputStream.java:54) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1678) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1518) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler.readExternal(CacheContinuousQueryHandler.java:1163) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2076) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2025) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.ignite.internal.processors.continuous.StartRequestData.readExternal(StartRequestData.java:260) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2076) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2025) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:121) ... 12 more 3. - This question might already by answered by answering 1 and 2 - I don't understand how deployment in Docker is supposed to work yet. When enabling peer class loading, I was able to run the above code when running the logic from 'getPreviousGpsPoint' directly within the listener / initial query cursor. I encountered two issues here: 4.1. Before refactoring the logic to a method, the scan query would find too many results - From what I've seen it didn't matter if I scanned for timestamp differences between 0 and 1 or 0 and 1. 4.2. After refactoring it to the method it was treated differently - it looked like it tried to run directly on the nodes, where it failed becaused it couldn't cast my class "Interpolation" to IgniteBiPredicate - since I had no clue what the hell it was trying to do I turned off peer class loading, spooky stuff. 4.3. Closely linked to all of that: How would you run the continuous query on the server nodes instead of the client node - currently it would try to process anything directly on the client node, wouldn't it? Best regards Svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
How to do 'stream processing' and more questions of a Ignite newbie
Hello! I've finally overcome the hurdle of getting Kafka Connect to work with Apache Ignite. For people reading this, that have issues with it: At the end of the mail I'm posting my currently used Converter and Extractor. Connecting multiple topics to Ignite was possible after setting different REST ports and offset files for all connectors. As of now, I have two different Caches: The first one contains acceleration data, for my key I'm using a customObject containing the deviceId, measurementId and timestamp. Here's a sample from ignitevisorcmd: | o.a.i.i.binary.BinaryObjectImpl | de.tudresden.inf.streambench.ignite.cyfacedata.IgniteKey [hash=-2052950622, deviceId=c7ba0238-4f4c-4237-a42a-89e25d146d9c, measurementId=4, timestamp=1502738916842] | o.a.i.i.binary.BinaryObjectImpl | de.tudresden.inf.streambench.ignite.cyfacedata.AccelerationPoint [hash=1103982604, ax=-3.6176388, ay=4.76925, az=8.710106, deviceId=c7ba0238-4f4c-4237-a42a-89e25d146d9c, measurementId=4, timestamp=1502738916842] | The second cache contains GPS locations, a similar key/value case. Since I have about 200 AccelerationPoints for each GpsPoint I want to do some interpolation as the first task, later there will be more tasks like removing gravity effects from the acceleration data etc. While I know that this should be doable with Ignite, I simply can't understand the given documentation well enough to find any starting point. All those continuous queries, service and compute grids and whatever all sound neat, yet I'm having a hard time telling at which feature I'm supposed to look at and where to find examples more complex than adding 1+1. In case my issue isn't clear enough yet, here's are more specific tasks I'm struggling to solve: 1. On each Ignite node, do the following thing for each AccelerationPoint (that is ever arriving): 2. Find previous/next GpsPoint according to following rules: Same deviceId, same measurementId, closest timestamp (positive/negative). All the required data is available in both key and value in this case. 3. Any way to secure that GPS and Acceleration points that share the same measurementId and deviceId are located on the same node? 4. Since the data is constantly being streamed, I suppose that it's the best to remove any finished AccelerationPoint after interpolating it (to a new Cache) while keeping the GPS data - is there a feature that already does that for me? I'd be super happy about any help, information, and links for better examples. Currently, I'm not really sure how to continue. I'll add some code samples for an Extractor, a Converter as well as for my config.xml any suggestions and tips for those are also appreciated! Bonus question: I'm working with a docker-compose file that is starting kafka, zookeeper, the testdata producer as well as two instances of ignite servers as nodes. I'm currently doing the task in IntelliJ with the same config xml, except that I have clientmode set to true. How would you deploy the finished product? Do you start another container containing the instructions as client node? Thanks in advance and best regards, Sven __ My 'toConnectData' for the Kafka Connect AccelerationPointConverter: @Override public SchemaAndValue toConnectData(String topic, byte[] value) { AccelerationPoint ap; try{ final AccelerationPointDeserializer accelerationPointDeserializer = new AccelerationPointDeserializer(); ap = accelerationPointDeserializer.deserialize(topic, (byte[])value); } catch(Exception e){ throw new DataException("Error deserializing Accleration point in Kafka Connect"); } //String deviceId, long measurementId, long timestamp, double ax, double ay, double az Schema apSchema = SchemaBuilder.struct() .name("de.tudresden.inf.streambench.ignite.deserializer.AccelerationPointDeserializer").version(1).doc("A schema for AccelerationPoints") .field("deviceId", Schema.STRING_SCHEMA) .field("measurementId", Schema.INT64_SCHEMA) .field("timestamp", Schema.INT16_SCHEMA) .field("ax", Schema.FLOAT64_SCHEMA) .field("ay", Schema.FLOAT64_SCHEMA) .field("az", Schema.FLOAT64_SCHEMA) .build(); return new SchemaAndValue(apSchema, ap); } ___ My AccelerationPointExtractor: public class AccelerationPointExtractor implements StreamSingleTupleExtractor{ @Override public Map.Entry extract(SinkRecord msg) { String key = (String) msg.key(); AccelerationPoint accelerationPoint = (AccelerationPoint) msg.value(); String deviceId = ((String) msg.key()).split(":")[0]; long measurementId = Long.parseLong(((String) msg.key()).split(":")[1]); accelerationPoint.setDeviceId(deviceId);
Re: Student Blog about Apache Ignite & Questions how to efficiently handle data
I solved the key issue with a singleTupleExtractor - For both GpsPoints and AccelerationPoints I'm simply adding a hashvalue over the Object. -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Student Blog about Apache Ignite & Questions how to efficiently handle data
Apparently I messed up the Blog link, here's the proper one: https://streambench.wordpress.com/category/apache-ignite/ Best Regards Sven -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Student Blog about Apache Ignite & Questions how to efficiently handle data
Hello! I noticed that this community is pretty active, so there might be some people that are interested in this: For a university project, we're trying to compare different stream processing engines. I decided to use Apache Ignite. Since our professorship hasn't really worked with most of those engines, we're supposed to write a Blog about our progressions - mostly focused on the stuff that doesn't work. So if you're interested in following a newbie struggling with Apache Ignite, you might like this: https://wordpress.com/post/streambench.wordpress.com/1310 I am always super happy about recommendations, tips, and experience I can get from others, so don't hesitate with feedback! __ Now, to the question part: The following data is to be streamed to Ignite: Measurement: A measurement is basically the biggest entity, containing both AccelerationPoints as well as GpsPoints. The key sent by our produced consists of the deviceId concatinated with a measurementId. This key is unique as works as primary key. GpsPoint: A GpsPoint belongs to a Measurement and has the very same key, deviceId:MeasurementId. Its value (serialized as byte[] in Kafka, I convert it to a binary object for Ignite currently) contains a timestamp, which could probably be enough in combination with the key provided by Kafka for a primary key. AccelerationPoint: Similar to a GpsPoint, only that we get about 200 of those per GpsPoint - they need to be interpolated later. The issue now is that Kafka Connect provides the deviceId:MeasurementId key schema for all three entities - therefore I'm either running in key conflicts or I'm forced to overwrite the data already in the cache. How do I deal with those key issues? I was thinking about either adding the timestamp from the value to the key or just add any new entity to a list of entities with the same key. For both those approaches, I don't really know how to do it properly since it has to happen somewhere between consuming the data from Kafka and writing it to Ignite. Any help would be appreciated! Best regards, Sven -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Issues with multiple kafka connectors, questions regarding ignite caches
I think I figured it out now: Using the same worker for three different topics might be nonsense either way, at least in my case - All three topics require different converters, therefor I'll need three different connectors. To start multiple instances of connect-standalone, I need to specify the REST port in the worker.property as well as the file to store the offset. Following two lines where missing: rest.port = #differentport# offset.storage.file.filename=/tmp/#differentname#.offsets I suspect that this will also work with the distributed mode, I'll have to try that later. The issue we encountered simply stops people to use the option of starting multiple connectors for the same worker property - which probably doesn't find much use anyway. Best regards Sven -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Issues with multiple kafka connectors, questions regarding ignite caches
Hi, Apparently the very same bug also affects the standalone kafka connect: When I add multiple connectors it will start the last one given, shut that one down (resulting in another 'Data streamer has been closed' exception) and then start the next one - So there's always only a single one running. I'm not quite sure how I'm supposed to stream different kafka topics in different Ignite caches. Any help would be greatly appreciated. Best regards, Svonn -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Issues with multiple kafka connectors, questions regarding ignite caches
ntln(next); } } which results in the error: >Exception in thread "main" javax.cache.CacheException: class org.apache.ignite.IgniteCheckedException: Query >execution failed: ... alot of stacktrace that doesn't look too interesting (to me) .. until: >Caused by: java.lang.ClassCastException: [B cannot be cast to [Ljava.lang.Byte; at the line i marked with (*) The value is stored as byte[] as well, so I am not sure what I am doing wrong. I am really in need of some more knowledge about why that stuff is not working, I was not very successfull at finding it anywhere. For question 1) I couldn't find a single up-to-date example, no clue how you'd use the single/multipleTupleExtractor for instance (especially in docker environment). Debugging and reading through thousands of loglines is really tedious. Question 2) is probably something I'm simply lacking experience in, would be really helpful getting advice so I don't need to try all possibilities For question 3 I might miss something obvious, couldn't figure it out for quite some time now though. I would be super happy if someone can give me more information! Best regards, Svonn P.S: Here's my docker-compose.yml, I am using a volume to provide the config files (test.xml) and .jars for the container: --- version: '3.3' services: zookeeper: image: confluentinc/cp-zookeeper hostname: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 container_name: zookeeper broker: image: confluentinc/cp-enterprise-kafka hostname: broker depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:9092' KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092 CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' container_name: broker schema_registry: image: confluentinc/cp-schema-registry hostname: schema_registry depends_on: - zookeeper - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema_registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' container_name: schemaregistry connect: image: confluentinc/cp-kafka-connect hostname: connect depends_on: - zookeeper - broker - schema_registry ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:9092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 1 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081' CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.converters.ByteArrayConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081' CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' volumes: - /usr/local/connector:/etc/kafka-connect/jars container_name: connect control-center: image: confluentinc/cp-enterprise-control-center hostname: control-center depends_on: - zookeeper - broker - schema_registry - connect ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9092' CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181' CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083' CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 container_name: controlcenter test-data-producer: build:test-data-producer/ depends_on: - broker - zookeeper command: (starts the producer) ignite: image: "apacheignite/ignite:2.3.0" environment: - "CONFIG_URI=/usr/local/connector/test.xml" volumes: - /usr/local/connector:/usr/local/connector container_name: ignite -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/