[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r173383032 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -520,4 +520,16 @@ package object config { .checkValue(v => v > 0, "The threshold should be positive.") .createWithDefault(1000) + private[spark] val CREDENTIALS_RENEWAL_INTERVAL_RATIO = +ConfigBuilder("spark.security.credentials.renewalRatio") + .doc("Ratio of the credential's expiration time when Spark should fetch new credentials.") + .doubleConf + .createWithDefault(0.75d) + + private[spark] val CREDENTIALS_RENEWAL_RETRY_WAIT = +ConfigBuilder("spark.security.credentials.retryWait") + .doc("How long to wait before retrying to fetch new credentials after a failure.") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("1h") --- End diff -- Is this "1h" too big if the token expire time is small, for example 8 hours, or even smaller, which will make the next retry directly fail. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20778: [SPARK-23584][SQL] NewInstance should support int...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20778#discussion_r173382375 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -392,8 +392,44 @@ case class NewInstance( childrenResolved && !needOuterPointer } - override def eval(input: InternalRow): Any = -throw new UnsupportedOperationException("Only code-generated evaluation is supported.") + private lazy val constructor: (Seq[AnyRef]) => Any = { +val paramTypes = arguments.map { expr => + CallMethodViaReflection.typeMapping.getOrElse(expr.dataType, +Seq(expr.dataType.asInstanceOf[ObjectType].cls)) +} +val findConstructor = (types: Seq[Seq[Class[_]]]) => { + val constructorOption = cls.getConstructors.find { c => --- End diff -- ok, I'll re-check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20657: [SPARK-23361][yarn] Allow AM to restart after ini...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20657#discussion_r173380826 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala --- @@ -18,221 +18,160 @@ package org.apache.spark.deploy.yarn.security import java.security.PrivilegedExceptionAction import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicReference import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens +import org.apache.spark.ui.UIUtils import org.apache.spark.util.ThreadUtils /** - * The following methods are primarily meant to make sure long-running apps like Spark - * Streaming apps can run without interruption while accessing secured services. The - * scheduleLoginFromKeytab method is called on the AM to get the new credentials. - * This method wakes up a thread that logs into the KDC - * once 75% of the renewal interval of the original credentials used for the container - * has elapsed. It then obtains new credentials and writes them to HDFS in a - * pre-specified location - the prefix of which is specified in the sparkConf by - * spark.yarn.credentials.file (so the file(s) would be named c-timestamp1-1, c-timestamp2-2 etc. - * - each update goes to a new file, with a monotonically increasing suffix), also the - * timestamp1, timestamp2 here indicates the time of next update for CredentialUpdater. - * After this, the credentials are renewed once 75% of the new tokens renewal interval has elapsed. + * A manager tasked with periodically updating delegation tokens needed by the application. * - * On the executor and driver (yarn client mode) side, the updateCredentialsIfRequired method is - * called once 80% of the validity of the original credentials has elapsed. At that time the - * executor finds the credentials file with the latest timestamp and checks if it has read those - * credentials before (by keeping track of the suffix of the last file it read). If a new file has - * appeared, it will read the credentials and update the currently running UGI with it. This - * process happens again once 80% of the validity of this has expired. + * This manager is meant to make sure long-running apps (such as Spark Streaming apps) can run + * without interruption while accessing secured services. It periodically logs in to the KDC with + * user-provided credentials, and contacts all the configured secure services to obtain delegation + * tokens to be distributed to the rest of the application. + * + * This class will manage the kerberos login, by renewing the TGT when needed. Because the UGI API + * does not expose the TTL of the TGT, a configuration controls how often to check that a relogin is + * necessary. This is done reasonably often since the check is a no-op when the relogin is not yet + * needed. The check period can be overridden in the configuration. + * + * New delegation tokens are created once 75% of the renewal interval of the original tokens has + * elapsed. The new tokens are sent to the Spark driver endpoint once it's registered with the AM. + * The driver is tasked with distributing the tokens to other processes that might need them. */ private[yarn] class AMCredentialRenewer( sparkConf: SparkConf, -hadoopConf: Configuration, -credentialManager: YARNHadoopDelegationTokenManager) extends Logging { +hadoopConf: Configuration) extends Logging { - private var lastCredentialsFileSuffix = 0 + private val principal = sparkConf.get(PRINCIPAL).get + private val keytab = sparkConf.get(KEYTAB).get + private val credentialManager = new YARNHadoopDelegationTokenManager(sparkConf, hadoopConf) - private val credentialRenewerThread: ScheduledExecutorService = + private val renewalExecutor: ScheduledExecutorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Refresh Thread") - private val hadoopUtil = SparkHadoopUtil.get + private val driverRef =
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20757 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20757 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88111/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20757 **[Test build #88111 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88111/testReport)** for PR 20757 at commit [`32a8dae`](https://github.com/apache/spark/commit/32a8daeb065cb575e04ffbb6dc05634228d61e8c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20779 Under the current situation, I think that we have to create very very huge query since we made codegen stable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/20779 I'd rather avoid to change it, because modifying it we can always trigger exceptions like the constant pool size limit. Can't we repro this without changing that value? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20759: Added description of checkpointInterval parameter
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/20759#discussion_r173379554 --- Diff: docs/ml-collaborative-filtering.md --- @@ -19,6 +19,7 @@ by a small set of latent factors that can be used to predict missing entries. algorithm to learn these latent factors. The implementation in `spark.ml` has the following parameters: +* *checkpointInterval* helps with recovery when nodes fail and StackOverflow exceptions caused by long lineage. **Will be silently ignored if *SparkContext.CheckpointDir* is not set.** (defaults to 10). --- End diff -- the wording is a bit severe... do we have to say node failure or stackoverflow (latter should be rare anyway?) also is this list of param sorted in any way? perhaps add checkpointInterval to the end? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20779 Good point. If we could set a specific value to `GENERATED_CLASS_SIZE_THRESHOLD`, we can add a test case. How do we make `GENERATED_CLASS_SIZE_THRESHOLD` non-final for testing? For example, by adding setter and getter method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20779 Use the example in the PR description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/20779 @kiszk can we add a test case for this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20754: [SPARK-23287][CORE] Spark scheduler does not remove init...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20754 **[Test build #88117 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88117/testReport)** for PR 20754 at commit [`5a7224e`](https://github.com/apache/spark/commit/5a7224eba039d2c6421a710bdcc562c4b96f9876). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20781: [SPARK-23637][YARN]Yarn might allocate more resource if ...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/20781 cc @vanzin @tgravescs @cloud-fan @djvulee Could you please help review this ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20779 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20779 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88110/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20754: [SPARK-23287][CORE] Spark scheduler does not remove init...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/20754 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20779 **[Test build #88110 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88110/testReport)** for PR 20779 at commit [`e206fff`](https://github.com/apache/spark/commit/e206fffb0bb529f7cb030f6744ae97346cc0ca18). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19222 ping @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to Python ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20777 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88115/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to Python ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20777 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to Python ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20777 **[Test build #88115 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88115/testReport)** for PR 20777 at commit [`c1aeac1`](https://github.com/apache/spark/commit/c1aeac15aedac20b927639d3c5a18982336a01f6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20782: [SPARK-SPARK-23627][SQL] Provide isEmpty in DataS...
Github user goungoun closed the pull request at: https://github.com/apache/spark/pull/20782 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20780: [MINOR] [SQL] [TEST] Create table using `dataSourceName`...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20780 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20780: [MINOR] [SQL] [TEST] Create table using `dataSourceName`...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20780 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88112/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20780: [MINOR] [SQL] [TEST] Create table using `dataSourceName`...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20780 **[Test build #88112 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88112/testReport)** for PR 20780 at commit [`29fd5a8`](https://github.com/apache/spark/commit/29fd5a8f1b065f50d324389c2d796e93a3a8844d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20781: [SPARK-23637][YARN]Yarn might allocate more resource if ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20781 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88116/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20781: [SPARK-23637][YARN]Yarn might allocate more resource if ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20781 **[Test build #88116 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88116/testReport)** for PR 20781 at commit [`bd6f8a1`](https://github.com/apache/spark/commit/bd6f8a1dd608d95f6faf2758cbd36e6328f48eb3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20781: [SPARK-23637][YARN]Yarn might allocate more resource if ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20781 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20782: [SPARK-SPARK-23627][SQL] Provide isEmpty in DataSet
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20782 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20782: [SPARK-SPARK-23627][SQL] Provide isEmpty in DataSet
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20782 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20747: [SPARK-23609][SQL][TEST]Test EnsureRequirements's test c...
Github user heary-cao commented on the issue: https://github.com/apache/spark/pull/20747 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20782: [SPARK-SPARK-23627][SQL] Provide isEmpty in DataS...
GitHub user goungoun opened a pull request: https://github.com/apache/spark/pull/20782 [SPARK-SPARK-23627][SQL] Provide isEmpty in DataSet ## What changes were proposed in this pull request? This PR adds a isEmpty in DataSet ## How was this patch tested? Unit tests added. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/goungoun/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20782.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20782 commit c1488201b4bfb050402cc91da3c9619a5e4a0a6e Author: goungounDate: 2018-03-08T14:59:20Z Update DatasetSuite.scala testsuit add commit 1c2211807cda14d891dfa1b82f713c3b24c53bab Author: goungoun Date: 2018-03-09T02:31:05Z Merge pull request #2 from goungoun/goungoun-patch-1 Update DatasetSuite.scala commit 7c8e87ce42fe2a4b1942f0d95660c93831c92730 Author: Goun Na Date: 2018-03-09T06:09:04Z SPARK-23627 provide isEmpty in DataSet commit a1debb997336e9a2edda98dca6c0c03c9a0e7c3c Author: Goun Na Date: 2018-03-09T06:10:29Z SPARK-23627 provide isEmpty in DataSet --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20779 Looks good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20781: [SPARK-23637][YARN]Yarn might allocate more resource if ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20781 **[Test build #88116 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88116/testReport)** for PR 20781 at commit [`bd6f8a1`](https://github.com/apache/spark/commit/bd6f8a1dd608d95f6faf2758cbd36e6328f48eb3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20781: [SPARK-23637][YARN]Yarn might allocate more resource if ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20781 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1422/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20781: [SPARK-23637][YARN]Yarn might allocate more resource if ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20781 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20781: [SPARK-23637][YARN]Yarn might allocate more resou...
GitHub user jinxing64 opened a pull request: https://github.com/apache/spark/pull/20781 [SPARK-23637][YARN]Yarn might allocate more resource if a same executor is killed multiple times. ## What changes were proposed in this pull request? `YarnAllocator` uses `numExecutorsRunning` to track the number of running executor. `numExecutorsRunning` is used to check if there're executors missing and need to allocate more. In current code, `numExecutorsRunning` can be negative when driver asks to kill a same idle executor multiple times. ## How was this patch tested? UT added You can merge this pull request into a Git repository by running: $ git pull https://github.com/jinxing64/spark SPARK-23637 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20781.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20781 commit bd6f8a1dd608d95f6faf2758cbd36e6328f48eb3 Author: jinxingDate: 2018-03-09T05:53:10Z [SPARK-23637][YARN]Yarn might allocate more resource if a same executor is killed multiple times. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20776: [SPARK-23630][yarn] Allow user's hadoop conf customizati...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20776 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20778: [SPARK-23584][SQL] NewInstance should support int...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20778#discussion_r173370038 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -392,8 +392,44 @@ case class NewInstance( childrenResolved && !needOuterPointer } - override def eval(input: InternalRow): Any = -throw new UnsupportedOperationException("Only code-generated evaluation is supported.") + private lazy val constructor: (Seq[AnyRef]) => Any = { +val paramTypes = arguments.map { expr => + CallMethodViaReflection.typeMapping.getOrElse(expr.dataType, +Seq(expr.dataType.asInstanceOf[ObjectType].cls)) +} +val findConstructor = (types: Seq[Seq[Class[_]]]) => { + val constructorOption = cls.getConstructors.find { c => --- End diff -- Can we directly call `cls.getConstructor` with required `Class` to get it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to Python ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20777 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1421/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to Python ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20777 **[Test build #88115 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88115/testReport)** for PR 20777 at commit [`c1aeac1`](https://github.com/apache/spark/commit/c1aeac15aedac20b927639d3c5a18982336a01f6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to Python ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20777 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20777: [SPARK-23615][ML][PYSPARK]Add maxDF Parameter to ...
Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/20777#discussion_r173369004 --- Diff: python/pyspark/ml/feature.py --- @@ -465,26 +522,26 @@ class CountVectorizer(JavaEstimator, HasInputCol, HasOutputCol, JavaMLReadable, " Default False", typeConverter=TypeConverters.toBoolean) @keyword_only -def __init__(self, minTF=1.0, minDF=1.0, vocabSize=1 << 18, binary=False, inputCol=None, - outputCol=None): +def __init__(self, minTF=1.0, minDF=1.0, maxDF=2 ** 63 - 1, vocabSize=1 << 18, binary=False, --- End diff -- Thank you very much for the comments. Will make changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20771: [SPARK-23587][SQL] Add interpreted execution for MapObje...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20771 **[Test build #88114 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88114/testReport)** for PR 20771 at commit [`9144287`](https://github.com/apache/spark/commit/914428729dae101c369f29c245c8b8755fe15f43). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20771: [SPARK-23587][SQL] Add interpreted execution for MapObje...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20771 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20771: [SPARK-23587][SQL] Add interpreted execution for MapObje...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20771 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1420/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20780: [MINOR] [SQL] [TEST] Create table using `dataSour...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20780#discussion_r173366467 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala --- @@ -335,7 +335,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { withTable("t") { - sql("CREATE TABLE t(i INT) USING parquet") + sql(s"CREATE TABLE t(i INT) USING $dataSourceName") intercept[AnalysisException] { testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t") } --- End diff -- Could you add an error message check logic here explicitly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20779 cc: @hvanhovell @mgaido91 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20771#discussion_r173364839 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -501,12 +502,22 @@ case class LambdaVariable( value: String, isNull: String, dataType: DataType, -nullable: Boolean = true) extends LeafExpression - with Unevaluable with NonSQLExpression { +nullable: Boolean = true) extends LeafExpression with NonSQLExpression { + + // Interpreted execution of `LambdaVariable` always get the 0-index element from input row. + override def eval(input: InternalRow): Any = { +assert(input.numFields == 1, + "The input row of interpreted LambdaVariable should have only 1 field.") +input.get(0, dataType) --- End diff -- You mean like this? ```scala lazy val accessor: InternalRow => Any = dataType match { case IntegerType => (inputRow) => inputRow.getInt(0) case LongType => (inputRow) => inputRow.getLong(0) ... } override def eval(input: InternalRow): Any = accessor(input) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20756: [SPARK-23593][SQL] Add interpreted execution for Initial...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20756 **[Test build #88113 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88113/testReport)** for PR 20756 at commit [`1a78334`](https://github.com/apache/spark/commit/1a783346a14e93e5dd755185071ca2e93360f387). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20756: [SPARK-23593][SQL] Add interpreted execution for Initial...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20756 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1419/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20756: [SPARK-23593][SQL] Add interpreted execution for Initial...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20756 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20756#discussion_r173363964 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1261,8 +1261,39 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp override def children: Seq[Expression] = beanInstance +: setters.values.toSeq override def dataType: DataType = beanInstance.dataType - override def eval(input: InternalRow): Any = -throw new UnsupportedOperationException("Only code-generated evaluation is supported.") + private lazy val resolvedSetters = { +val ObjectType(beanClass) = beanInstance.dataType +setters.map { + case (name, expr) => +// Looking for known type mapping first, then using Class attached in `ObjectType`. +// Finally also looking for general `Object`-type parameter for generic methods. +val paramTypes = CallMethodViaReflection.typeMapping.getOrElse(expr.dataType, --- End diff -- If we want to expand the support, maybe we can have another PR to expand it in `CallMethodViaReflection` and merge those variables. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20753: [SPARK-23582][SQL] StaticInvoke should support in...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20753#discussion_r173361869 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala --- @@ -95,6 +162,21 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(createExternalRow, Row.fromSeq(Seq(1, "x")), InternalRow.fromSeq(Seq())) } + // This is an alternative version of `checkEvaluation` to compare results --- End diff -- If this pr is merged first, I'll remove this in my pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20753: [SPARK-23582][SQL] StaticInvoke should support in...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20753#discussion_r173361648 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -188,8 +189,30 @@ case class StaticInvoke( override def nullable: Boolean = needNullCheck || returnNullable override def children: Seq[Expression] = arguments - override def eval(input: InternalRow): Any = -throw new UnsupportedOperationException("Only code-generated evaluation is supported.") + override def eval(input: InternalRow): Any = { +val args = arguments.map(e => e.eval(input).asInstanceOf[Object]) +val argClasses = CallMethodViaReflection.expressionJavaClasses(arguments) +val cls = if (staticObject.getName == objectName) { + staticObject +} else { + Utils.classForName(objectName) +} +val method = cls.getDeclaredMethod(functionName, argClasses : _*) +if (needNullCheck && args.exists(_ == null)) { + // return null if one of arguments is null + null +} else { + val ret = method.invoke(null, args: _*) + + if (CodeGenerator.defaultValue(dataType) == "null") { +ret + } else { +// cast a primitive value using Boxed class +val boxedClass = CallMethodViaReflection.typeBoxedJavaMapping(dataType) --- End diff -- Good catch. When discussions related to ` `typeJavaMapping`, `typeBoxedJavaMapping`, and others are fixed, I will address this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20757 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20780: [MINOR] [SQL] [TEST] Create table using `dataSourceName`...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20780 **[Test build #88112 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88112/testReport)** for PR 20780 at commit [`29fd5a8`](https://github.com/apache/spark/commit/29fd5a8f1b065f50d324389c2d796e93a3a8844d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20780: [MINOR] [SQL] [TEST] Create table using `dataSourceName`...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20780 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1417/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20757 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1418/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20780: [MINOR] [SQL] [TEST] Create table using `dataSourceName`...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20780 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20780: [MINOR] [SQL] [TEST] Create table using `dataSour...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/20780 [MINOR] [SQL] [TEST] Create table using `dataSourceName` in `HadoopFsRelationTest` ## What changes were proposed in this pull request? This PR fixes a minor issue in `HadoopFsRelationTest`, that you should create table using `dataSourceName` instead of `parquet`. The issue won't affect the correctness, but it will generate wrong error message in case the test fails. ## How was this patch tested? Exsiting tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark dataSourceName Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20780.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20780 commit 29fd5a8f1b065f50d324389c2d796e93a3a8844d Author: Xingbo JiangDate: 2018-03-09T03:58:37Z create table using dataSourceName --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20778: [SPARK-23584][SQL] NewInstance should support int...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20778#discussion_r173361185 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala --- @@ -138,4 +154,40 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(decodeUsingSerializer, null, InternalRow.fromSeq(Seq(null))) } } + + // This is an alternative version of `checkEvaluation` to compare results --- End diff -- This function was copy from #20757, so I'll remove later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20757 **[Test build #88111 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88111/testReport)** for PR 20757 at commit [`32a8dae`](https://github.com/apache/spark/commit/32a8daeb065cb575e04ffbb6dc05634228d61e8c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20757 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20757: [SPARK-23595][SQL] ValidateExternalType should su...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/20757#discussion_r173361092 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala --- @@ -121,6 +121,19 @@ object ScalaReflection extends ScalaReflection { case _ => false } + def classForNativeTypeOf(dt: DataType): Class[_] = dt match { --- End diff -- @hvanhovell How about the latest commit? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20757 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1416/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20779 **[Test build #88110 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88110/testReport)** for PR 20779 at commit [`e206fff`](https://github.com/apache/spark/commit/e206fffb0bb529f7cb030f6744ae97346cc0ca18). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20779 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20779: [SPARK-23598][SQL] Make methods in BufferedRowIterator p...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20779 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1415/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20778: [SPARK-23584][SQL] NewInstance should support interprete...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20778 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88108/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20779: [SPARK-23598][SQL] Make methods in BufferedRowIte...
GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/20779 [SPARK-23598][SQL] Make methods in BufferedRowIterator public to avoid compilation for a large query ## What changes were proposed in this pull request? This PR fixes compilation error regarding a large query when a generated code has split classes. The issue is `append()`, `stopEarly()`, and other methods are not accessible from split classes that are not subclasses of `BufferedRowIterator`. This PR fixes this issue by making them `public`. Before applying the PR, we see the following exception by running the attached program with `CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD=-1`. ``` test("SPARK-23598") { // When set -1 to CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD, an exception is thrown val df_pet_age = Seq((8, "bat"), (15, "mouse"), (5, "horse")).toDF("age", "name") df_pet_age.groupBy("name").avg("age").show() } ``` Exception: ``` 19:40:52.591 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19:41:32.319 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.IllegalAccessError: tried to access method org.apache.spark.sql.execution.BufferedRowIterator.shouldStop()Z from class org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(generated.java:203) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:160) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ... ``` Generated code (line 195 calles `stopEarly()`). ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private boolean agg_initAgg; /* 010 */ private boolean agg_bufIsNull; /* 011 */ private double agg_bufValue; /* 012 */ private boolean agg_bufIsNull1; /* 013 */ private long agg_bufValue1; /* 014 */ private agg_FastHashMap agg_fastHashMap; /* 015 */ private org.apache.spark.unsafe.KVIteratoragg_fastHashMapIter; /* 016 */ private org.apache.spark.unsafe.KVIterator agg_mapIter; /* 017 */ private org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap agg_hashMap; /* 018 */ private org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter; /* 019 */ private scala.collection.Iterator inputadapter_input; /* 020 */ private boolean agg_agg_isNull11; /* 021 */ private boolean agg_agg_isNull25; /* 022 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] agg_mutableStateArray1 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[2]; /* 023 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] agg_mutableStateArray2 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2]; /* 024 */ private UnsafeRow[] agg_mutableStateArray = new UnsafeRow[2]; /* 025 */ /* 026 */ public GeneratedIteratorForCodegenStage1(Object[] references) { /* 027 */ this.references = references; /* 028 */ } /* 029 */ /* 030 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 031 */ partitionIndex = index; /* 032 */ this.inputs = inputs; /* 033 */ /* 034 */ agg_fastHashMap = new
[GitHub] spark issue #20778: [SPARK-23584][SQL] NewInstance should support interprete...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20778 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20778: [SPARK-23584][SQL] NewInstance should support interprete...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20778 **[Test build #88108 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88108/testReport)** for PR 20778 at commit [`077a0bf`](https://github.com/apache/spark/commit/077a0bf1f66b1224f75880622237add12e9f23db). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20705: [SPARK-23553][TESTS] Tests should not assume the ...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/20705#discussion_r173358166 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala --- @@ -852,52 +846,52 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv (from to to).map(i => i -> s"str$i").toDF("c1", "c2") } -withTable("insertParquet") { - createDF(0, 9).write.format("parquet").saveAsTable("insertParquet") +withTable("t") { + createDF(0, 9).write.saveAsTable("t") checkAnswer( -sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), +sql("SELECT p.c1, p.c2 FROM t p WHERE p.c1 > 5"), (6 to 9).map(i => Row(i, s"str$i"))) intercept[AnalysisException] { -createDF(10, 19).write.format("parquet").saveAsTable("insertParquet") +createDF(10, 19).write.saveAsTable("t") } - createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet") + createDF(10, 19).write.mode(SaveMode.Append).saveAsTable("t") checkAnswer( -sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), +sql("SELECT p.c1, p.c2 FROM t p WHERE p.c1 > 5"), (6 to 19).map(i => Row(i, s"str$i"))) - createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet") + createDF(20, 29).write.mode(SaveMode.Append).saveAsTable("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"), +sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 25"), (6 to 24).map(i => Row(i, s"str$i"))) intercept[AnalysisException] { -createDF(30, 39).write.saveAsTable("insertParquet") +createDF(30, 39).write.saveAsTable("t") } - createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet") + createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"), +sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 35"), (6 to 34).map(i => Row(i, s"str$i"))) - createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet") + createDF(40, 49).write.mode(SaveMode.Append).insertInto("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"), +sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 45"), (6 to 44).map(i => Row(i, s"str$i"))) - createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet") + createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"), +sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 51 AND p.c1 < 55"), (52 to 54).map(i => Row(i, s"str$i"))) - createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet") + createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p"), +sql("SELECT p.c1, c2 FROM t p"), (50 to 59).map(i => Row(i, s"str$i"))) - createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet") + createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p"), +sql("SELECT p.c1, c2 FROM t p"), (70 to 79).map(i => Row(i, s"str$i"))) --- End diff -- That is because this PR minimally changed only the test case causing failures. We cannot generalize all test cases at an one-shot huge PR for all modules. That will make it difficult to backport the other commits. The main goal of this PR is improving test- ability for new data sources. For example, `SPARK-8156:create table to specific database by 'use dbname'` writes to parquet, but reads with SQL, not by `read.parquet`. So, it doesn't fail. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20767 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20767 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88109/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20767 **[Test build #88109 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88109/testReport)** for PR 20767 at commit [`0a838c1`](https://github.com/apache/spark/commit/0a838c190ab75de2fc1ba9bd37ff8df37287d185). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20757 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20757 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88107/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20757: [SPARK-23595][SQL] ValidateExternalType should support i...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20757 **[Test build #88107 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88107/testReport)** for PR 20757 at commit [`66141ea`](https://github.com/apache/spark/commit/66141eac6fc74acb0279af990689f1800b0a1ed3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20766: [BackPort][SPARK-23490][SQL] Check storage.locati...
Github user gengliangwang closed the pull request at: https://github.com/apache/spark/pull/20766 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20767 **[Test build #88109 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88109/testReport)** for PR 20767 at commit [`0a838c1`](https://github.com/apache/spark/commit/0a838c190ab75de2fc1ba9bd37ff8df37287d185). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20767 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1414/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20767: [SPARK-23623] [SS] Avoid concurrent use of cached consum...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20767 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173351789 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) + private def releaseKafkaConsumer( +topicPartition: TopicPartition, +kafkaParams: ju.Map[String, Object]): Unit = { +val key = new CacheKey(topicPartition, kafkaParams) synchronized { val consumer = cache.get(key) if (consumer != null) { -consumer.inuse = false +if (consumer.markedForClose) { + consumer.close() + cache.remove(key) +} else { + consumer.inuse = false +} } else { logWarning(s"Attempting to release consumer that does not exist") --- End diff -- This should not be the case. We do not remove any consumer from the cache while it is being used. So the scenario that you mentioned should not happen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20744: [SPARK-23608][CORE][WebUI] Add synchronization in SHS be...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20744 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20744: [SPARK-23608][CORE][WebUI] Add synchronization in SHS be...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20744 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88105/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20744: [SPARK-23608][CORE][WebUI] Add synchronization in SHS be...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20744 **[Test build #88105 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88105/testReport)** for PR 20744 at commit [`45f66a4`](https://github.com/apache/spark/commit/45f66a4568fa778be452d1093af8bb344b89b0c8). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20756#discussion_r173346317 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1261,8 +1261,39 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp override def children: Seq[Expression] = beanInstance +: setters.values.toSeq override def dataType: DataType = beanInstance.dataType - override def eval(input: InternalRow): Any = -throw new UnsupportedOperationException("Only code-generated evaluation is supported.") + private lazy val resolvedSetters = { +val ObjectType(beanClass) = beanInstance.dataType +setters.map { + case (name, expr) => +// Looking for known type mapping first, then using Class attached in `ObjectType`. +// Finally also looking for general `Object`-type parameter for generic methods. +val paramTypes = CallMethodViaReflection.typeMapping.getOrElse(expr.dataType, --- End diff -- Is there any special reason it only supports basically primitives and string? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20770: [SPARK-23626][CORE] DAGScheduler blocked due to JobSubmi...
Github user markhamstra commented on the issue: https://github.com/apache/spark/pull/20770 @squito is the master of DAGSchedulerSuite, and can provide you the best advice on changing or adding to the existing DAGSchedulerSuite. I'll be back from skiing next week and try to find some time to look at this. Hopefully @kayousterhout can find some time too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20776: [SPARK-23630][yarn] Allow user's hadoop conf customizati...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20776 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20776: [SPARK-23630][yarn] Allow user's hadoop conf customizati...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20776 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/88104/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20776: [SPARK-23630][yarn] Allow user's hadoop conf customizati...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20776 **[Test build #88104 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/88104/testReport)** for PR 20776 at commit [`5abac2c`](https://github.com/apache/spark/commit/5abac2ca3ff906be80ae1f5de7deea21bcdefb19). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20705: [SPARK-23553][TESTS] Tests should not assume the ...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/20705#discussion_r173331220 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala --- @@ -591,7 +591,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } test("Pre insert nullability check (ArrayType)") { -withTable("arrayInParquet") { +withTable("array") { --- End diff -- It would be good, maybe in a future cleanup, to replace all these repeating string literals (e.g, "array" 7 times, "map" 7 times) with a variable name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20705: [SPARK-23553][TESTS] Tests should not assume the ...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/20705#discussion_r173332327 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala --- @@ -852,52 +846,52 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv (from to to).map(i => i -> s"str$i").toDF("c1", "c2") } -withTable("insertParquet") { - createDF(0, 9).write.format("parquet").saveAsTable("insertParquet") +withTable("t") { + createDF(0, 9).write.saveAsTable("t") checkAnswer( -sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), +sql("SELECT p.c1, p.c2 FROM t p WHERE p.c1 > 5"), (6 to 9).map(i => Row(i, s"str$i"))) intercept[AnalysisException] { -createDF(10, 19).write.format("parquet").saveAsTable("insertParquet") +createDF(10, 19).write.saveAsTable("t") } - createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet") + createDF(10, 19).write.mode(SaveMode.Append).saveAsTable("t") checkAnswer( -sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), +sql("SELECT p.c1, p.c2 FROM t p WHERE p.c1 > 5"), (6 to 19).map(i => Row(i, s"str$i"))) - createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet") + createDF(20, 29).write.mode(SaveMode.Append).saveAsTable("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"), +sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 25"), (6 to 24).map(i => Row(i, s"str$i"))) intercept[AnalysisException] { -createDF(30, 39).write.saveAsTable("insertParquet") +createDF(30, 39).write.saveAsTable("t") } - createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet") + createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"), +sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 35"), (6 to 34).map(i => Row(i, s"str$i"))) - createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet") + createDF(40, 49).write.mode(SaveMode.Append).insertInto("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"), +sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 5 AND p.c1 < 45"), (6 to 44).map(i => Row(i, s"str$i"))) - createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet") + createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"), +sql("SELECT p.c1, c2 FROM t p WHERE p.c1 > 51 AND p.c1 < 55"), (52 to 54).map(i => Row(i, s"str$i"))) - createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet") + createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p"), +sql("SELECT p.c1, c2 FROM t p"), (50 to 59).map(i => Row(i, s"str$i"))) - createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet") + createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("t") checkAnswer( -sql("SELECT p.c1, c2 FROM insertParquet p"), +sql("SELECT p.c1, c2 FROM t p"), (70 to 79).map(i => Row(i, s"str$i"))) --- End diff -- Curious about why the test named "SPARK-8156:create table to specific database by 'use dbname'" still has a hard-coded format of parquet. Is it testing functionality that is orthogonal to the format maybe? I changed the hard-coded format to json, orc, and csv, and each time that test passed. Similarly with Suite: org.apache.spark.sql.sources.SaveLoadSuite Test: SPARK-23459: Improve error message when specified unknown column in partition columns --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20774: [SPARK-23549][SQL] Cast to timestamp when comparing time...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20774 cc: @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173341089 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) + private def releaseKafkaConsumer( +topicPartition: TopicPartition, +kafkaParams: ju.Map[String, Object]): Unit = { +val key = new CacheKey(topicPartition, kafkaParams) synchronized { val consumer = cache.get(key) if (consumer != null) { -consumer.inuse = false +if (consumer.markedForClose) { + consumer.close() + cache.remove(key) +} else { + consumer.inuse = false +} } else { logWarning(s"Attempting to release consumer that does not exist") --- End diff -- Aah. The warning was misleading. Will add comments to clarify that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20767: [SPARK-23623] [SS] Avoid concurrent use of cached...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20767#discussion_r173340037 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends Logging { } } - def releaseKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) + private def releaseKafkaConsumer( +topicPartition: TopicPartition, +kafkaParams: ju.Map[String, Object]): Unit = { +val key = new CacheKey(topicPartition, kafkaParams) synchronized { val consumer = cache.get(key) if (consumer != null) { -consumer.inuse = false +if (consumer.markedForClose) { + consumer.close() + cache.remove(key) +} else { + consumer.inuse = false +} } else { logWarning(s"Attempting to release consumer that does not exist") } } } - /** - * Removes (and closes) the Kafka Consumer for the given topic, partition and group id. - */ - def removeKafkaConsumer( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): Unit = { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -synchronized { - val removedConsumer = cache.remove(key) - if (removedConsumer != null) { -removedConsumer.close() - } -} - } /** * Get a cached consumer for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. */ - def getOrCreate( - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] -val topicPartition = new TopicPartition(topic, partition) -val key = CacheKey(groupId, topicPartition) - -// If this is reattempt at running the task, then invalidate cache and start with -// a new consumer + def acquire( + topicPartition: TopicPartition, + kafkaParams: ju.Map[String, Object], + useCache: Boolean): KafkaDataConsumer = synchronized { +val key = new CacheKey(topicPartition, kafkaParams) +val existingInternalConsumer = cache.get(key) + +lazy val newNonCachedConsumer = + new NonCachedKafkaDataConsumer(new InternalKafkaConsumer(topicPartition, kafkaParams)) + if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - removeKafkaConsumer(topic, partition, kafkaParams) - val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams) - consumer.inuse = true - cache.put(key, consumer) - consumer -} else { - if (!cache.containsKey(key)) { -cache.put(key, new CachedKafkaConsumer(topicPartition, kafkaParams)) + // If this is reattempt at running the task, then invalidate cache and start with + // a new consumer + if (existingInternalConsumer != null) { --- End diff -- This is indeed better. What I was doing was always deferring to a later point. But that would lead to it being used one more time before being closed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20753: [SPARK-23582][SQL] StaticInvoke should support in...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/20753#discussion_r173339081 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala --- @@ -95,6 +162,21 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(createExternalRow, Row.fromSeq(Seq(1, "x")), InternalRow.fromSeq(Seq())) } + // This is an alternative version of `checkEvaluation` to compare results --- End diff -- This code is intentionally imported for testing from https://github.com/apache/spark/pull/20757 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org