Hello!

init() will be called before execute() and stop().

To be extra sure, you can check for null.

Regards,
-- 
Ilya Kasnacheev


чт, 2 июл. 2020 г. в 19:06, Maxim Volkomorov <2201...@gmail.com>:

> Ok, if i put an assignment to init(), how will i start it in execute()
> method, and stop in canсel()? This example was taken here
> http://apache-ignite-users.70518.x6.nabble.com/Kindly-tell-me-where-to-find-these-jar-files-td12649.html
>
>
> чт, 2 июл. 2020 г. в 18:57, Ilya Kasnacheev <ilya.kasnach...@gmail.com>:
>
>> Hello!
>>
>> You should do the assignment in init() method.
>>
>> Don't create KafkaStreamer before service is sent over and is ready for
>> initialization.
>>
>> Regards,
>> --
>> Ilya Kasnacheev
>>
>>
>> чт, 2 июл. 2020 г. в 18:54, Maxim Volkomorov <2201...@gmail.com>:
>>
>>> public class KafkaStreamerService implements Service {
>>>
>>>     public static final String SERVICE_NAME = "KafkaStreamerService";
>>>     private static final long serialVersionUID = 1L;
>>>
>>>     @IgniteInstanceResource
>>>     private Ignite ignite;
>>>     private KafkaStreamer<String, String> kafkaStreamer = new
>>> KafkaStreamer<>();
>>>     private IgniteLogger log;
>>>
>>> чт, 2 июл. 2020 г. в 18:51, Ilya Kasnacheev <ilya.kasnach...@gmail.com>:
>>>
>>>> Hello!
>>>>
>>>> Where do you assign kafkaStreamer field?
>>>>
>>>> Regards,
>>>> --
>>>> Ilya Kasnacheev
>>>>
>>>>
>>>> чт, 2 июл. 2020 г. в 18:46, Maxim Volkomorov <2201...@gmail.com>:
>>>>
>>>>>     Now i disabled node Filtering. You mean i started KafkaStreamer in
>>>>> init()?
>>>>> I started KafkaStreamer like:
>>>>>
>>>>> @Override
>>>>>     public void execute(ServiceContext ctx) throws Exception {
>>>>>         log.info("KafkaStreamerService starting ...");
>>>>>         kafkaStreamer.start();
>>>>>         log.info("KafkaStreamerService started OK");
>>>>>
>>>>> In init() i only configure KafkaStreamer parameters.
>>>>>
>>>>> @Override
>>>>>     public void init(ServiceContext ctx) throws Exception {
>>>>>         log = ignite.log();
>>>>>
>>>>>         IgniteDataStreamer<String, String> stmr =
>>>>> ignite.dataStreamer("kafkaCache");
>>>>>         stmr.allowOverwrite(true);
>>>>>         stmr.autoFlushFrequency(1000);
>>>>>
>>>>>         kafkaStreamer.setIgnite(ignite);
>>>>>         kafkaStreamer.setStreamer(stmr);
>>>>>         kafkaStreamer.setThreads(4);
>>>>> ...
>>>>>
>>>>>
>>>>> If i have to avoid fat instances, how i can share kafkaStreamer
>>>>> instance between init(), execute() and cancel()?
>>>>>
>>>>>
>>>>> чт, 2 июл. 2020 г. в 16:53, Ilya Kasnacheev <ilya.kasnach...@gmail.com
>>>>> >:
>>>>>
>>>>>> Hello!
>>>>>>
>>>>>> You should probably start KafkaStreamer on remote node when the
>>>>>> service is initialized (init()), instead of starting it in e.g. 
>>>>>> constructor
>>>>>> and trying to send it to remote node.
>>>>>>
>>>>>> Avoid putting fat instances in the fields of
>>>>>> service/compute/predicate classes.
>>>>>>
>>>>>> Regards,
>>>>>> --
>>>>>> Ilya Kasnacheev
>>>>>>
>>>>>>
>>>>>> чт, 2 июл. 2020 г. в 10:42, Maxim Volkomorov <2201...@gmail.com>:
>>>>>>
>>>>>>> I have 1 DataNod and 1 Service with streaming. I have a filter for
>>>>>>> service:
>>>>>>>
>>>>>>> <property name="nodeFilter">
>>>>>>> <bean class="common.filters.KafkaStreamerServiceFilter"/>
>>>>>>> </property>
>>>>>>>
>>>>>>> public boolean apply(ClusterNode node) {
>>>>>>> Boolean dataNode = node.attribute("kafkastreamer.service.node");
>>>>>>>
>>>>>>> return dataNode != null && dataNode;
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> I have a marshalling error java.io.NotSerializableException:
>>>>>>> org.apache.ignite.stream.kafka.KafkaStreamer using KafkaStreamer in my
>>>>>>> Service:
>>>>>>>
>>>>>>> private KafkaStreamer<String, String> kafkaStreamer = new
>>>>>>> KafkaStreamer<>();
>>>>>>>
>>>>>>> I only can start service with:
>>>>>>>
>>>>>>> private static KafkaStreamer<String, String> kafkaStreamer = new
>>>>>>> KafkaStreamer<>();
>>>>>>>
>>>>>>> Is it because Ignite trying data transfer KafkaStreamer instance
>>>>>>> between nodes?
>>>>>>>
>>>>>>> Log:
>>>>>>>
>>>>>>> [2020-07-02 10:27:47,552][ERROR][main][IgniteServiceProcessor]
>>>>>>> Failed to marshal service with configured marshaller
>>>>>>> [name=KafkaStreamerService,
>>>>>>> srvc=services.kafkastreamer.KafkaStreamerService@3dedb4a6,
>>>>>>> marsh=JdkMarshaller [clsFilter=null]]
>>>>>>> class org.apache.ignite.IgniteCheckedException: Failed to serialize
>>>>>>> object: services.kafkastreamer.KafkaStreamerService@3dedb4a6
>>>>>>> at
>>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:102)
>>>>>>> at
>>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:109)
>>>>>>> at
>>>>>>> org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.marshal(AbstractNodeNameAwareMarshaller.java:57)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.util.IgniteUtils.marshal(IgniteUtils.java:10386)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.prepareServiceConfigurations(IgniteServiceProcessor.java:583)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.staticallyConfiguredServices(IgniteServiceProcessor.java:1541)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.processors.service.IgniteServiceProcessor.collectJoiningNodeData(IgniteServiceProcessor.java:354)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager$5.collect(GridDiscoveryManager.java:861)
>>>>>>> at
>>>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.collectExchangeData(TcpDiscoverySpi.java:2032)
>>>>>>> at
>>>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.joinTopology(ServerImpl.java:1029)
>>>>>>> at
>>>>>>> org.apache.ignite.spi.discovery.tcp.ServerImpl.spiStart(ServerImpl.java:427)
>>>>>>> at
>>>>>>> org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.spiStart(TcpDiscoverySpi.java:2099)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.managers.GridManagerAdapter.startSpi(GridManagerAdapter.java:299)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.start(GridDiscoveryManager.java:943)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.IgniteKernal.startManager(IgniteKernal.java:1960)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.IgniteKernal.start(IgniteKernal.java:1276)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start0(IgnitionEx.java:2045)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.IgnitionEx$IgniteNamedInstance.start(IgnitionEx.java:1703)
>>>>>>> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1117)
>>>>>>> at
>>>>>>> org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1035)
>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:921)
>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:820)
>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:690)
>>>>>>> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:659)
>>>>>>> at org.apache.ignite.Ignition.start(Ignition.java:346)
>>>>>>> at
>>>>>>> app.KafkaStreamerServiceNodeStartup.main(KafkaStreamerServiceNodeStartup.java:40)
>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>> org.apache.ignite.stream.kafka.KafkaStreamer
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>>>>>>> at
>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>>>>>>> at
>>>>>>> org.apache.ignite.marshaller.jdk.JdkMarshaller.marshal0(JdkMarshaller.java:97)
>>>>>>> ... 25 more
>>>>>>>
>>>>>>

Reply via email to