[ 
https://issues.apache.org/jira/browse/IGNITE-11724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16956103#comment-16956103
 ] 

Andrey Aleksandrov commented on IGNITE-11724:
---------------------------------------------

[~nizhikov] the proposal is pretty simple. 

We have some method that can throw exceptions but there is no information about 
it.  These exceptions can be related to Ignite internals or can be related to 
user logic.

1)We should catch the Ignite exception inside integration OR add them to 
exceptions list like it was done here (throws javax.cache.CacheException, 
IgniteInterruptedException, IllegalStateException, 
IgniteDataStreamerTimeoutException):

[https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/IgniteDataStreamer.html#addData-K-V-]

2)Users exceptions. Yes, here we should provide to the user the details of the 
implementation or just the information that we can throw here some exceptions.  
It also can be done by catching these exceptions and put them as a clause to 
some IgniteCheckedException that can be added to exception list of savePairs 
API.

Okay, I will create a separate ticket on it with normal example that can be 
started in every environment and description of exceptions that can be handled.

> IgniteSpark integration forget to close the IgniteContext and stops the 
> client node in case if error during PairFunction logic 
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-11724
>                 URL: https://issues.apache.org/jira/browse/IGNITE-11724
>             Project: Ignite
>          Issue Type: Bug
>          Components: spark
>    Affects Versions: 2.8
>            Reporter: Andrey Aleksandrov
>            Assignee: Alexey Zinoviev
>            Priority: Major
>              Labels: await
>             Fix For: 2.8
>
>         Attachments: logs.txt
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Next code could hang in case if PairFunction logic will throw the exception:
> {code:java}
> public class Example {
>     public static void main(String[] args) {
>         String configPath = 
> "/home/andrei/BDP/big-data-accelerator/modules/gridgain-spark-loader-examples/config/client.xml";
>         IgniteSparkSession igniteSession = IgniteSparkSession.builder()
>                 .appName("Spark Ignite catalog example")
>                 .master("local")
>                 .config("ignite.disableSparkSQLOptimization", true)
>                 .igniteConfig(configPath)
>                 .getOrCreate();
>         JavaSparkContext sparkCtx = new 
> JavaSparkContext(igniteSession.sparkContext());
>         final JavaRDD<Row> records = sparkCtx.parallelize(Arrays.asList(
>                 new GenericRow()
>         ));
>         JavaPairRDD<Integer, Integer> rdd_records = records.mapToPair(new 
> PairFunction<Row, Integer, Integer>() {
>             @Override
>             public Tuple2<Integer, Integer> call(Row row) throws Exception {
>                 throw new IllegalStateException("some error");
>             }
>         });
>         JavaIgniteContext<Integer, Integer> igniteContext = new 
> JavaIgniteContext<>(sparkCtx, configPath);
>         JavaIgniteRDD<Integer, Integer> igniteRdd = igniteContext.<Integer, 
> Integer>fromCache("Person");
>         igniteRdd.savePairs(rdd_records);
>         igniteContext.close(true);
>     }
> }
> Looks like next internal code (saveValues method)should also close the 
> IgniteContext in case of an unexpected exception, not only data streamer:
> try {
>      it.foreach(value ⇒
> {          val key = affinityKeyFunc(value, node.orNull)           
> streamer.addData(key, value)        }
> )
>      }
>      finally
> {         streamer.close()     }
> })
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to