Hello! You should do the assignment in init() method.
Don't create KafkaStreamer before service is sent over and is ready for initialization. Regards, -- Ilya Kasnacheev чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <2201...@gmail.com>: > public class KafkaStreamerService implements Service { > > public static final String SERVICE_NAME = "KafkaStreamerService"; > private static final long serialVersionUID = 1L; > > @IgniteInstanceResource > private Ignite ignite; > private KafkaStreamer<String, String> kafkaStreamer = new > KafkaStreamer<>(); > private IgniteLogger log; > > чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <ilya.kasnach...@gmail.com>: > >> Hello! >> >> Where do you assign kafkaStreamer field? >> >> Regards, >> -- >> Ilya Kasnacheev >> >> >> чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <2201...@gmail.com>: >> >>> Now i disabled node Filtering. You mean i started KafkaStreamer in >>> init()? >>> I started KafkaStreamer like: >>> >>> @Override >>> public void execute(ServiceContext ctx) throws Exception { >>> log.info("KafkaStreamerService starting ..."); >>> kafkaStreamer.start(); >>> log.info("KafkaStreamerService started OK"); >>> >>> In init() i only configure KafkaStreamer parameters. >>> >>> @Override >>> public void init(ServiceContext ctx) throws Exception { >>> log = ignite.log(); >>> >>> IgniteDataStreamer<String, String> stmr = >>> ignite.dataStreamer("kafkaCache"); >>> stmr.allowOverwrite(true); >>> stmr.autoFlushFrequency(1000); >>> >>> kafkaStreamer.setIgnite(ignite); >>> kafkaStreamer.setStreamer(stmr); >>> kafkaStreamer.setThreads(4); >>> ... >>> >>> >>> If i have to avoid fat instances, how i can share kafkaStreamer instance >>> between init(), execute() and cancel()? >>> >>> >>> чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <ilya.kasnach...@gmail.com>: >>> >>>> Hello! >>>> >>>> You should probably start KafkaStreamer on remote node when the service >>>> is initialized (init()), instead of starting it in e.g. constructor and >>>> trying to send it to remote node. >>>> >>>> Avoid putting fat instances in the fields of service/compute/predicate >>>> classes. >>>> >>>> Regards, >>>> -- >>>> Ilya Kasnacheev >>>> >>>> >>>> чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <2201...@gmail.com>: >>>> >>>>> I have 1 DataNod and 1 Service with streaming. I have a filter for >>>>> service: >>>>> >>>>> <property name="nodeFilter"> >>>>> <bean class="common.filters.KafkaStreamerServiceFilter"/> >>>>> </property> >>>>> >>>>> public boolean apply(ClusterNode node) { >>>>> Boolean dataNode = node.attribute("kafkastreamer.service.node"); >>>>> >>>>> return dataNode != null && dataNode; >>>>> } >>>>> >>>>> >>>>> I have a marshalling error java.io.NotSerializableException: >>>>> org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my >>>>> Service: >>>>> >>>>> private KafkaStreamer<String, String> kafkaStreamer = new >>>>> KafkaStreamer<>(); >>>>> >>>>> I only can start service with: >>>>> >>>>> private static KafkaStreamer<String, String> kafkaStreamer = new >>>>> KafkaStreamer<>(); >>>>> >>>>> Is it because Ignite trying data transfer KafkaStreamer instance >>>>> between nodes? >>>>> >>>>> Log: >>>>> >>>>> [2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed >>>>> to marshal service with configured marshaller [name=KafkaStreamerService, >>>>> srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, >>>>> marsh=JdkMarshaller [clsFilter=null]] >>>>> class org.apache.ignite.IgniteCheckedException: Failed to serialize >>>>> object: services.kafkastreamer.KafkaStreamerService@3dedb4a6 >>>>> at >>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102) >>>>> at >>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109) >>>>> at >>>>> org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57) >>>>> at >>>>> org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386) >>>>> at >>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583) >>>>> at >>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541) >>>>> at >>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354) >>>>> at >>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861) >>>>> at >>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032) >>>>> at >>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029) >>>>> at >>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427) >>>>> at >>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099) >>>>> at >>>>> org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299) >>>>> at >>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943) >>>>> at >>>>> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960) >>>>> at >>>>> org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276) >>>>> at >>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045) >>>>> at >>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703) >>>>> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117) >>>>> at >>>>> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035) >>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921) >>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820) >>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690) >>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659) >>>>> at org.apache.ignite.Ignition.start(Ignition.java:346) >>>>> at >>>>> app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40) >>>>> Caused by: java.io.NotSerializableException: >>>>> org.apache.ignite.stream.kafka.KafkaStreamer >>>>> at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>>>> at >>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>>>> at >>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>>>> at >>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>>>> at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>>>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>>>> at >>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97) >>>>> ... 25 more >>>>> >>>>