[ https://issues.apache.org/jira/browse/SPARK-35126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-35126: ------------------------------------ Assignee: (was: Apache Spark) > Execute jdbc cancellation method when jdbc load job is interrupted > ------------------------------------------------------------------ > > Key: SPARK-35126 > URL: https://issues.apache.org/jira/browse/SPARK-35126 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.1.1 > Environment: Environment version: > * spark3.1.1 > * jdk1.8.201 > * scala2.12 > * mysql5.7.31 > * mysql-connector-java-5.1.32.jar /mysql-connector-java-8.0.32.jar > Reporter: zhangrenhua > Priority: Major > Original Estimate: 2h > Remaining Estimate: 2h > > I have a long-running spark service that continuously receives and runs spark > programs submitted by the client. There is a program to load jdbc table. > Query sql is very complicated. Each execution takes a lot of time and > resources. When the client submits such a similar request, the client may > interrupt the job at any time. At that time, I found that the database select > after the job was interrupted. The process is still executing and has not > been killed. > > *Scene demonstration:* > 1. Prepare two tables: SPARK_TEST1/SPARK_TEST2, each of which has 1000 > records) > 2. Test code > {code:java} > import org.apache.spark.SparkConf; > import org.apache.spark.SparkContext; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.SparkSession; > import java.util.concurrent.TimeUnit; > /** > * jdbc load cancel test > * > * @author gavin > * @create 2021/4/18 10:58 > */ > public class JdbcLoadCancelTest { > public static void main(String[] args) throws Exception { > final SparkConf sparkConf = new SparkConf(); > sparkConf.setAppName("jdbc load test"); > sparkConf.setMaster("local[*]"); > final SparkContext sparkContext = new SparkContext(sparkConf); > final SparkSession sparkSession = new SparkSession(sparkContext); > // This is a sql that takes about a minute to execute > String querySql = "select t1.*\n" + > "from SPARK_TEST1 t1\n" + > "left join SPARK_TEST1 t2 on 1=1\n" + > "left join (select aa from SPARK_TEST1 limit 3) t3 on 1=1"; > // Specify job information > final String jobGroup = "test"; > sparkContext.clearJobGroup(); > sparkContext.setJobGroup(jobGroup, "test", true); > // Start the independent thread to start the jdbc load test logic > new Thread(() -> { > final Dataset<Row> table = sparkSession.read() > > .format("org.apache.spark.sql.execution.datasources.jdbc3") > .option("url", > "jdbc:mysql://192.168.10.226:32320/test?useUnicode=true&characterEncoding=utf-8&useSSL=false") > .option("user", "root") > .option("password", "123456") > .option("query", querySql) > .load(); > // Print the first data > System.out.println(table.limit(1).first()); > }).start(); > // Wait for the jdbc load job to start > TimeUnit.SECONDS.sleep(10); > // Cancel the job just now > sparkContext.cancelJobGroup(jobGroup); > // Simulate a long-running service without stopping the driver > process, which is used to wait for new jobs to be received > TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); > } > } > {code} > > 3. View the mysql process > {code:java} > select * from information_schema.`PROCESSLIST` where info is not null;{code} > When the program started 10 seconds later, and interrupted the job, it was > found that the database query process has not been killed. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org