Hello! Where do you assign kafkaStreamer field?
Regards, -- Ilya Kasnacheev чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <2201...@gmail.com>: > Now i disabled node Filtering. You mean i started KafkaStreamer in > init()? > I started KafkaStreamer like: > > @Override > public void execute(ServiceContext ctx) throws Exception { > log.info("KafkaStreamerService starting ..."); > kafkaStreamer.start(); > log.info("KafkaStreamerService started OK"); > > In init() i only configure KafkaStreamer parameters. > > @Override > public void init(ServiceContext ctx) throws Exception { > log = ignite.log(); > > IgniteDataStreamer<String, String> stmr = > ignite.dataStreamer("kafkaCache"); > stmr.allowOverwrite(true); > stmr.autoFlushFrequency(1000); > > kafkaStreamer.setIgnite(ignite); > kafkaStreamer.setStreamer(stmr); > kafkaStreamer.setThreads(4); > ... > > > If i have to avoid fat instances, how i can share kafkaStreamer instance > between init(), execute() and cancel()? > > > чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <ilya.kasnach...@gmail.com>: > >> Hello! >> >> You should probably start KafkaStreamer on remote node when the service >> is initialized (init()), instead of starting it in e.g. constructor and >> trying to send it to remote node. >> >> Avoid putting fat instances in the fields of service/compute/predicate >> classes. >> >> Regards, >> -- >> Ilya Kasnacheev >> >> >> чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <2201...@gmail.com>: >> >>> I have 1 DataNod and 1 Service with streaming. I have a filter for >>> service: >>> >>> <property name="nodeFilter"> >>> <bean class="common.filters.KafkaStreamerServiceFilter"/> >>> </property> >>> >>> public boolean apply(ClusterNode node) { >>> Boolean dataNode = node.attribute("kafkastreamer.service.node"); >>> >>> return dataNode != null && dataNode; >>> } >>> >>> >>> I have a marshalling error java.io.NotSerializableException: >>> org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my >>> Service: >>> >>> private KafkaStreamer<String, String> kafkaStreamer = new >>> KafkaStreamer<>(); >>> >>> I only can start service with: >>> >>> private static KafkaStreamer<String, String> kafkaStreamer = new >>> KafkaStreamer<>(); >>> >>> Is it because Ignite trying data transfer KafkaStreamer instance between >>> nodes? >>> >>> Log: >>> >>> [2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor] Failed to >>> marshal service with configured marshaller [name=KafkaStreamerService, >>> srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6, >>> marsh=JdkMarshaller [clsFilter=null]] >>> class org.apache.ignite.IgniteCheckedException: Failed to serialize >>> object: services.kafkastreamer.KafkaStreamerService@3dedb4a6 >>> at >>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102) >>> at >>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109) >>> at >>> org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57) >>> at >>> org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386) >>> at >>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583) >>> at >>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541) >>> at >>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354) >>> at >>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861) >>> at >>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032) >>> at >>> org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029) >>> at >>> org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427) >>> at >>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099) >>> at >>> org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299) >>> at >>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943) >>> at >>> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960) >>> at org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276) >>> at >>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045) >>> at >>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703) >>> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117) >>> at >>> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035) >>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921) >>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820) >>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690) >>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659) >>> at org.apache.ignite.Ignition.start(Ignition.java:346) >>> at >>> app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40) >>> Caused by: java.io.NotSerializableException: >>> org.apache.ignite.stream.kafka.KafkaStreamer >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> at >>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97) >>> ... 25 more >>> >>