Hello! init() will be called before execute() and stop().
To be extra sure, you can check for null. Regards, -- Ilya Kasnacheev чт, 2 июл. 2020 г. в 19:06, Maxim Volkomorov <2201...@gmail.com>: > Ok, if i put an assignment to init(), how will i start it in execute() > method, and stop in canсel()? This example was taken here > http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-td12649.html > > > чт, 2 июл. 2020 г. в 18:57, Ilya Kasnacheev <ilya.kasnach...@gmail.com>: > >> Hello! >> >> You should do the assignment in init() method. >> >> Don't create KafkaStreamer before service is sent over and is ready for >> initialization. >> >> Regards, >> -- >> Ilya Kasnacheev >> >> >> чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <2201...@gmail.com>: >> >>> public class KafkaStreamerService implements Service { >>> >>> public static final String SERVICE_NAME = "KafkaStreamerService"; >>> private static final long serialVersionUID = 1L; >>> >>> @IgniteInstanceResource >>> private Ignite ignite; >>> private KafkaStreamer<String, String> kafkaStreamer = new >>> KafkaStreamer<>(); >>> private IgniteLogger log; >>> >>> чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <ilya.kasnach...@gmail.com>: >>> >>>> Hello! >>>> >>>> Where do you assign kafkaStreamer field? >>>> >>>> Regards, >>>> -- >>>> Ilya Kasnacheev >>>> >>>> >>>> чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <2201...@gmail.com>: >>>> >>>>> Now i disabled node Filtering. You mean i started KafkaStreamer in >>>>> init()? >>>>> I started KafkaStreamer like: >>>>> >>>>> @Override >>>>> public void execute(ServiceContext ctx) throws Exception { >>>>> log.info("KafkaStreamerService starting ..."); >>>>> kafkaStreamer.start(); >>>>> log.info("KafkaStreamerService started OK"); >>>>> >>>>> In init() i only configure KafkaStreamer parameters. >>>>> >>>>> @Override >>>>> public void init(ServiceContext ctx) throws Exception { >>>>> log = ignite.log(); >>>>> >>>>> IgniteDataStreamer<String, String> stmr = >>>>> ignite.dataStreamer("kafkaCache"); >>>>> stmr.allowOverwrite(true); >>>>> stmr.autoFlushFrequency(1000); >>>>> >>>>> kafkaStreamer.setIgnite(ignite); >>>>> kafkaStreamer.setStreamer(stmr); >>>>> kafkaStreamer.setThreads(4); >>>>> ... >>>>> >>>>> >>>>> If i have to avoid fat instances, how i can share kafkaStreamer >>>>> instance between init(), execute() and cancel()? >>>>> >>>>> >>>>> чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <ilya.kasnach...@gmail.com >>>>> >: >>>>> >>>>>> Hello! >>>>>> >>>>>> You should probably start KafkaStreamer on remote node when the >>>>>> service is initialized (init()), instead of starting it in e.g. >>>>>> constructor >>>>>> and trying to send it to remote node. >>>>>> >>>>>> Avoid putting fat instances in the fields of >>>>>> service/compute/predicate classes. >>>>>> >>>>>> Regards, >>>>>> -- >>>>>> Ilya Kasnacheev >>>>>> >>>>>> >>>>>> чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <2201...@gmail.com>: >>>>>> >>>>>>> I have 1 DataNod and 1 Service with streaming. I have a filter for >>>>>>> service: >>>>>>> >>>>>>> <property name="nodeFilter"> >>>>>>> <bean class="common.filters.KafkaStreamerServiceFilter"/> >>>>>>> </property> >>>>>>> >>>>>>> public boolean apply(ClusterNode node) { >>>>>>> Boolean dataNode = node.attribute("kafkastreamer.service.node"); >>>>>>> >>>>>>> return dataNode != null && dataNode; >>>>>>> } >>>>>>> >>>>>>> >>>>>>> I have a marshalling error java.io.NotSerializableException: >>>>>>> org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my >>>>>>> Service: >>>>>>> >>>>>>> private KafkaStreamer<String, String> kafkaStreamer = new >>>>>>> KafkaStreamer<>(); >>>>>>> >>>>>>> I only can start service with: >>>>>>> >>>>>>> private static KafkaStreamer<String, String> kafkaStreamer = new >>>>>>> KafkaStreamer<>(); >>>>>>> >>>>>>> Is it because Ignite trying data transfer KafkaStreamer instance >>>>>>> between nodes? >>>>>>> >>>>>>> Log: >>>>>>> >>>>>>> [2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] >>>>>>> Failed to marshal service with configured marshaller >>>>>>> [name=KafkaStreamerService, >>>>>>> srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, >>>>>>> marsh=JdkMarshaller [clsFilter=null]] >>>>>>> class org.apache.ignite.IgniteCheckedException: Failed to serialize >>>>>>> object: services.kafkastreamer.KafkaStreamerService@3dedb4a6 >>>>>>> at >>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102) >>>>>>> at >>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109) >>>>>>> at >>>>>>> org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57) >>>>>>> at >>>>>>> org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386) >>>>>>> at >>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583) >>>>>>> at >>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541) >>>>>>> at >>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354) >>>>>>> at >>>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861) >>>>>>> at >>>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032) >>>>>>> at >>>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029) >>>>>>> at >>>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427) >>>>>>> at >>>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099) >>>>>>> at >>>>>>> org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299) >>>>>>> at >>>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943) >>>>>>> at >>>>>>> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960) >>>>>>> at >>>>>>> org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276) >>>>>>> at >>>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045) >>>>>>> at >>>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703) >>>>>>> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117) >>>>>>> at >>>>>>> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035) >>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921) >>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820) >>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690) >>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659) >>>>>>> at org.apache.ignite.Ignition.start(Ignition.java:346) >>>>>>> at >>>>>>> app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40) >>>>>>> Caused by: java.io.NotSerializableException: >>>>>>> org.apache.ignite.stream.kafka.KafkaStreamer >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>>>> at >>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>>>> at >>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97) >>>>>>> ... 25 more >>>>>>> >>>>>>