diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java
index 6a994b49d3a..42b3e60a758 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskSingleSpillMapOutputWriter.java
@@ -21,6 +21,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
 import org.apache.spark.shuffle.IndexShuffleBlockResolver;
 import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter;
 import org.apache.spark.util.Utils;
@@ -49,8 +51,12 @@ public class LocalDiskSingleSpillMapOutputWriter
     // The map spill file already has the proper format, and it contains all of the partition data.
     // So just transfer it directly to the destination without any merging.
     File outputFile = blockResolver.getDataFile(shuffleId, mapId);
+
     File tempFile = Utils.tempFileWith(outputFile);
     Files.move(mapSpillFile.toPath(), tempFile.toPath());
+    if (TaskContext$.MODULE$.get().isOuterTablePart1() ) {
+      TaskContext$.MODULE$.get().setShuffleFile(new File[]{outputFile, tempFile});
+    }
     blockResolver
       .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile);
   }
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index a660bccd2e6..12864a9f225 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -295,6 +295,21 @@ private class ShuffleStatus(
    * remove outputs which are served by an external shuffle server (if one exists).
    */
   def removeOutputsByFilter(f: BlockManagerId => Boolean): Unit = withWriteLock {
+    /*
+    for (mapIndex <- mapStatuses.indices) {
+      val currentMapStatus = mapStatuses(mapIndex)
+      if (currentMapStatus != null && f(currentMapStatus.location)) {
+        _numAvailableMapOutputs -= 1
+        mapIdToMapIndex.remove(currentMapStatus.mapId)
+        mapStatusesDeleted(mapIndex) = currentMapStatus
+        mapStatuses(mapIndex) = null
+        invalidateSerializedMapOutputStatusCache()
+      }
+    } */
+  }
+
+  def removeOutputsByFilterX(f: BlockManagerId => Boolean): Unit = withWriteLock {
+
     for (mapIndex <- mapStatuses.indices) {
       val currentMapStatus = mapStatuses(mapIndex)
       if (currentMapStatus != null && f(currentMapStatus.location)) {
@@ -857,7 +872,7 @@ private[spark] class MapOutputTrackerMaster(
   def unregisterAllMapAndMergeOutput(shuffleId: Int): Unit = {
     shuffleStatuses.get(shuffleId) match {
       case Some(shuffleStatus) =>
-        shuffleStatus.removeOutputsByFilter(x => true)
+        shuffleStatus.removeOutputsByFilterX(x => true)
         shuffleStatus.removeMergeResultsByFilter(x => true)
         shuffleStatus.removeShuffleMergerLocations()
         incrementEpoch()
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 30d772bd62d..0d552715d96 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -3266,7 +3266,7 @@ object SparkContext extends Logging {
     import SparkMasterRegex._
 
     // When running locally, don't try to re-execute tasks on failure.
-    val MAX_LOCAL_TASK_FAILURES = 1
+    val MAX_LOCAL_TASK_FAILURES = 4
 
     // Ensure that default executor's resources satisfies one or more tasks requirement.
     // This function is for cluster managers that don't set the executor cores config, for
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 5384fd86a8f..81fdcac0fd7 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -17,20 +17,33 @@
 
 package org.apache.spark
 
-import java.io.Closeable
+import java.io.{Closeable, File}
 import java.util.Properties
-
 import org.apache.spark.annotation.{DeveloperApi, Evolving, Since}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.resource.ResourceInformation
-import org.apache.spark.scheduler.Task
+import org.apache.spark.scheduler.{ResultTask, Task}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, TaskFailureListener}
 
 
 object TaskContext {
+
+  private var shuffleFileToDelete: Option[Array[File]] = None
+  private var failResult = false
+  var resultTaskCount: Int = 0
+
+  def setFailResult(): Unit = {
+    this.failResult = true
+  }
+
+  def unsetFailResult(): Unit = {
+    this.failResult = false
+  }
+
+  def getFailResult: Boolean = this.failResult
   /**
    * Return the currently active TaskContext. This can be called inside of
    * user functions to access contextual information about running tasks.
@@ -71,6 +84,19 @@ object TaskContext {
     new TaskContextImpl(0, 0, 0, 0, 0, 1,
       null, new Properties, null, TaskMetrics.empty, 1)
   }
+
+  def deleteShuffleFile(): Unit = {
+    import scala.sys.process._
+    val delete = this.shuffleFileToDelete.nonEmpty
+    this.shuffleFileToDelete.foreach(files => files.foreach { f =>
+      assert(Process(Seq("rm", "-rf", f.getPath)).! == 0)
+
+    })
+    if (delete) {
+      assert(Process(Seq("rm", "-rf", "/private/tmp/bug/blockmgr*/**/temp_shuffle*")).! == 0)
+    }
+    this.shuffleFileToDelete = None
+  }
 }
 
 
@@ -82,6 +108,8 @@ object TaskContext {
  * }}}
  */
 abstract class TaskContext extends Serializable {
+
+  private var shuffleFile: Option[File] = None
   // Note: TaskContext must NOT define a get method. Otherwise it will prevent the Scala compiler
   // from generating a static get method (based on the companion object's get method).
 
@@ -89,6 +117,20 @@ abstract class TaskContext extends Serializable {
 
   // Note: getters in this class are defined with parentheses to maintain backward compatibility.
 
+
+
+  def setShuffleFile(files: Array[File]): Unit = {
+    TaskContext.shuffleFileToDelete = Some(files)
+    // throw new RuntimeException("test")
+  }
+
+  private var isOuterTable: Boolean = false
+
+  def setIsOuterTablePart1(): Unit = {
+    isOuterTable = true
+  }
+
+  def isOuterTablePart1: Boolean = this.isOuterTable
   /**
    * Returns true if the task has completed.
    */
@@ -168,7 +210,18 @@ abstract class TaskContext extends Serializable {
       // context too within run(). If that's the case, kill the thread before it starts executing
       // the actual task.
       killTaskIfInterrupted()
-      task.runTask(this)
+      task match {
+        case r: ResultTask[_, _] => TaskContext.synchronized {
+          TaskContext.resultTaskCount += 1
+          if (r.partitionId == 1) {
+            TaskContext.deleteShuffleFile()
+          }
+        }
+        case _ => TaskContext.resultTaskCount = 0
+      }
+      val x = task.runTask(this)
+
+      x
     } catch {
       case e: Throwable =>
         // Catch all errors; run task failure and completion callbacks, and rethrow the exception.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 5dda7afc3eb..0d20f4459ab 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1700,7 +1700,7 @@ package object config {
         "map-side aggregation and there are at most this many reduce partitions")
       .version("1.1.1")
       .intConf
-      .createWithDefault(200)
+      .createWithDefault(1)
 
   private[spark] val SHUFFLE_MANAGER =
     ConfigBuilder("spark.shuffle.manager")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 05bcafdb14d..6a7a6b7d50b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -103,6 +103,12 @@ private[spark] class ShuffleMapTask(
     } else {
       context.taskAttemptId()
     }
+
+    if (TaskContext.getFailResult && partition.toString.contains("outer") &&
+      partitionId == 1) {
+      TaskContext.get().setIsOuterTablePart1()
+      TaskContext.unsetFailResult()
+    }
     dep.shuffleWriterProcessor.write(
       rdd.iterator(partition, context),
       dep,
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala
index 4957d76af9a..feb3b0ae5e3 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala
@@ -261,7 +261,7 @@ object Encoders {
     TransformingEncoder(classTag[T], BinaryEncoder, provider)
   }
 
-  private[sql] def tupleEncoder[T](encoders: Encoder[_]*): Encoder[T] = {
+  def tupleEncoder[T](encoders: Encoder[_]*): Encoder[T] = {
     ProductEncoder.tuple(encoders.map(agnosticEncoderFor(_))).asInstanceOf[Encoder[T]]
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 6e19a1d6bbc..5cc7ff41f8a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -316,7 +316,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
    * Returns an expression that will produce a valid partition ID(i.e. non-negative and is less
    * than numPartitions) based on hashing expressions.
    */
-  def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions))
+  def partitionIdExpression: Expression = Pmod(expressions.head, Literal(numPartitions))
 
   override protected def withNewChildrenInternal(
     newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 7b560002ede..fce1a67d826 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -815,7 +815,7 @@ object SQLConf {
         s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'), to avoid too many small tasks.")
       .version("3.0.0")
       .booleanConf
-      .createWithDefault(true)
+      .createWithDefault(false)
 
   val COALESCE_PARTITIONS_PARALLELISM_FIRST =
     buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")
@@ -1741,7 +1741,7 @@ object SQLConf {
         "side. This could help to eliminate unnecessary shuffles")
       .version("3.4.0")
       .booleanConf
-      .createWithDefault(true)
+      .createWithDefault(false)
 
   val V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED =
     buildConf("spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 226debc9764..86d03fcd4dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -618,12 +618,15 @@ case class FileSourceScanExec(
   // Note that some vals referring the file-based relation are lazy intentionally
   // so that this plan can be canonicalized on executor side too. See SPARK-23731.
   override lazy val supportsColumnar: Boolean = {
-    val conf = relation.sparkSession.sessionState.conf
+  /*  val conf = relation.sparkSession.sessionState.conf
     // Only output columnar if there is WSCG to read it.
     val requiredWholeStageCodegenSettings =
       conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, schema)
     requiredWholeStageCodegenSettings &&
       relation.fileFormat.supportBatch(relation.sparkSession, schema)
+
+   */
+    false
   }
 
   private lazy val needsUnsafeRowConversion: Boolean = {
@@ -668,6 +671,7 @@ case class FileSourceScanExec(
         toUnsafe.initialize(index)
         iter.map { row =>
           numOutputRows += 1
+        //  println(s"row iter ${row.getString(1)}")
           toUnsafe(row)
         }
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
index 50af845c37c..fa05bcea7c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala
@@ -51,6 +51,9 @@ case class FilePartition(index: Int, files: Array[PartitionedFile])
       case (host, numBytes) => host
     }.toArray
   }
+
+  override def toString(): String = s"partition index = $index. files =" +
+    s" ${files.map(f => f.filePath.toString).mkString}"
 }
 
 object FilePartition extends Logging {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index b8348cefe7c..0d461da269c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -66,6 +66,7 @@ trait SharedSparkSessionBase
 
   protected def sparkConf = {
     val conf = new SparkConf()
+      .setMaster("local[2]")
       .set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
       .set(UNSAFE_EXCEPTION_ON_MEMORY_LEAK, true)
       .set(SQLConf.CODEGEN_FALLBACK.key, "false")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 91c6ac6f96e..980cb1155f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.internal.{SessionState, SessionStateBuilder, SQLConf
  * A special `SparkSession` prepared for testing.
  */
 private[spark] class TestSparkSession(sc: SparkContext) extends SparkSession(sc) { self =>
-  def this(sparkConf: SparkConf, maxLocalTaskFailures: Int = 1, numCores: Int = 2) = {
+  def this(sparkConf: SparkConf, maxLocalTaskFailures: Int = 4, numCores: Int = 2) = {
     this(new SparkContext(s"local[$numCores,$maxLocalTaskFailures]", "test-sql-context",
       sparkConf.set("spark.sql.testkey", "true")))
   }
