[jira] [Commented] (SPARK-31437) Try assigning tasks to existing executors by which required resources in ResourceProfile are satisfied
[ https://issues.apache.org/jira/browse/SPARK-31437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084552#comment-17084552 ] Hongze Zhang commented on SPARK-31437: -- Thanks [~tgraves]. I got your point of making them tied. Actually I was thinking of something like this: 1. to break ResourceProfile up to ExecutorSpec and ResourceProfile; 2. ResourceProfile's structure remains as is; 3. ExecutorSpec only includes resource requirements for executor, task resource requirement is removed; 4. ExecutorSpec is required to allocate new executor instances from scheduler backend; 5. Similar to current solution, user specifies ResourceProfile for RDD, then tasks are scheduled onto executors that are allocated using ExecutorResource; 6. Each time ResourceProfile comes, ExecutorSpec is created/selected within one of several strategies; Strategies types: s1. Always creates new ExecutorSpec; s2. If executor resource requirement in ResourceProfile meets existing ExecutorSpec, use the existing one; s3. ... bq. My etl tasks uses 8 cores, my ml tasks use 8 cores and 4 cpus. How do I keep my etl tasks from running on the ML executors without wasting resources? By just using strategy s1, everything should work as current implementation. > Try assigning tasks to existing executors by which required resources in > ResourceProfile are satisfied > -- > > Key: SPARK-31437 > URL: https://issues.apache.org/jira/browse/SPARK-31437 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 3.1.0 >Reporter: Hongze Zhang >Priority: Major > > By the change in [PR|https://github.com/apache/spark/pull/27773] of > SPARK-29154, submitted tasks are scheduled onto executors only if resource > profile IDs strictly match. As a result Spark always starts new executors for > customized ResourceProfiles. > This limitation makes working with process-local jobs unfriendly. E.g. Task > cores has been increased from 1 to 4 in a new stage, and executor has 8 > slots, it is expected that 2 new tasks can be run on the existing executor > but Spark starts new executors for new ResourceProfile. The behavior is > unnecessary. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31457) spark jdbc read hive created the wrong PreparedStatement
[ https://issues.apache.org/jira/browse/SPARK-31457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] daile updated SPARK-31457: -- Attachment: hivejdbc3.png > spark jdbc read hive created the wrong PreparedStatement > > > Key: SPARK-31457 > URL: https://issues.apache.org/jira/browse/SPARK-31457 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.2, 3.1.0 > Environment: spark 2.3.2 > hive 2.1.1 > >Reporter: daile >Priority: Major > Attachments: hivejdbc2.png, hivejdbc3.png, sparkhivejdbc.png > > > {code:java} > val res = spark > .read > .format("jdbc") > .option("url", "jdbc:hive2://host:1/default") > .option("dbtable", "user_info2") > .option("driver","org.apache.hive.jdbc.HiveDriver") > .option("user", "") > .option("password","") > .load() > res.show(){code} > get wrong result > +--+--+---+ > |user_info2.age|user_info2.sex|user_info2.birthday| > +--+--+---+ > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > +--+--+---+ > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31457) spark jdbc read hive created the wrong PreparedStatement
[ https://issues.apache.org/jira/browse/SPARK-31457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] daile updated SPARK-31457: -- Attachment: hivejdbc2.png > spark jdbc read hive created the wrong PreparedStatement > > > Key: SPARK-31457 > URL: https://issues.apache.org/jira/browse/SPARK-31457 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.2, 3.1.0 > Environment: spark 2.3.2 > hive 2.1.1 > >Reporter: daile >Priority: Major > Attachments: hivejdbc2.png, hivejdbc3.png, sparkhivejdbc.png > > > {code:java} > val res = spark > .read > .format("jdbc") > .option("url", "jdbc:hive2://host:1/default") > .option("dbtable", "user_info2") > .option("driver","org.apache.hive.jdbc.HiveDriver") > .option("user", "") > .option("password","") > .load() > res.show(){code} > get wrong result > +--+--+---+ > |user_info2.age|user_info2.sex|user_info2.birthday| > +--+--+---+ > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > +--+--+---+ > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31457) spark jdbc read hive created the wrong PreparedStatement
[ https://issues.apache.org/jira/browse/SPARK-31457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] daile updated SPARK-31457: -- Attachment: sparkhivejdbc.png > spark jdbc read hive created the wrong PreparedStatement > > > Key: SPARK-31457 > URL: https://issues.apache.org/jira/browse/SPARK-31457 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.2, 3.1.0 > Environment: spark 2.3.2 > hive 2.1.1 > >Reporter: daile >Priority: Major > Attachments: hivejdbc2.png, hivejdbc3.png, sparkhivejdbc.png > > > {code:java} > val res = spark > .read > .format("jdbc") > .option("url", "jdbc:hive2://host:1/default") > .option("dbtable", "user_info2") > .option("driver","org.apache.hive.jdbc.HiveDriver") > .option("user", "") > .option("password","") > .load() > res.show(){code} > get wrong result > +--+--+---+ > |user_info2.age|user_info2.sex|user_info2.birthday| > +--+--+---+ > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > |user_info2.age|user_info2.sex|user_info2.birthday| > +--+--+---+ > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31457) spark jdbc read hive created the wrong PreparedStatement
daile created SPARK-31457: - Summary: spark jdbc read hive created the wrong PreparedStatement Key: SPARK-31457 URL: https://issues.apache.org/jira/browse/SPARK-31457 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.2, 3.1.0 Environment: spark 2.3.2 hive 2.1.1 Reporter: daile {code:java} val res = spark .read .format("jdbc") .option("url", "jdbc:hive2://host:1/default") .option("dbtable", "user_info2") .option("driver","org.apache.hive.jdbc.HiveDriver") .option("user", "") .option("password","") .load() res.show(){code} get wrong result +--+--+---+ |user_info2.age|user_info2.sex|user_info2.birthday| +--+--+---+ |user_info2.age|user_info2.sex|user_info2.birthday| |user_info2.age|user_info2.sex|user_info2.birthday| |user_info2.age|user_info2.sex|user_info2.birthday| |user_info2.age|user_info2.sex|user_info2.birthday| |user_info2.age|user_info2.sex|user_info2.birthday| |user_info2.age|user_info2.sex|user_info2.birthday| |user_info2.age|user_info2.sex|user_info2.birthday| |user_info2.age|user_info2.sex|user_info2.birthday| |user_info2.age|user_info2.sex|user_info2.birthday| |user_info2.age|user_info2.sex|user_info2.birthday| +--+--+---+ -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31456) If shutdownhook is added with priority Integer.MIN_VALUE, it's supposed to be called the last, but it gets called before other positive priority shutdownhook
Xiaolei Liu created SPARK-31456: --- Summary: If shutdownhook is added with priority Integer.MIN_VALUE, it's supposed to be called the last, but it gets called before other positive priority shutdownhook Key: SPARK-31456 URL: https://issues.apache.org/jira/browse/SPARK-31456 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.5 Environment: macOS Mojave 10.14.6 Reporter: Xiaolei Liu https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala Since shutdownHookManager use below method to do the comparison. override def compareTo(other: SparkShutdownHook): Int = { other.priority - priority } Which will cause : (Int)(25 - Integer.MIN_VALUE) < 0 Then the shutdownhook with Integer.Min_VALUE would not be called the last. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31455) Fix rebasing of not-existed dates/timestamps
[ https://issues.apache.org/jira/browse/SPARK-31455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084501#comment-17084501 ] Wenchen Fan commented on SPARK-31455: - also https://github.com/apache/spark/pull/28225 > Fix rebasing of not-existed dates/timestamps > > > Key: SPARK-31455 > URL: https://issues.apache.org/jira/browse/SPARK-31455 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Maxim Gekk >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-31455) Fix rebasing of not-existed timestamps
[ https://issues.apache.org/jira/browse/SPARK-31455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-31455. - Fix Version/s: 3.0.0 Resolution: Fixed Issue resolved by pull request 28227 [https://github.com/apache/spark/pull/28227] > Fix rebasing of not-existed timestamps > -- > > Key: SPARK-31455 > URL: https://issues.apache.org/jira/browse/SPARK-31455 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Maxim Gekk >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31455) Fix rebasing of not-existed dates/timestamps
[ https://issues.apache.org/jira/browse/SPARK-31455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan updated SPARK-31455: Summary: Fix rebasing of not-existed dates/timestamps (was: Fix rebasing of not-existed timestamps) > Fix rebasing of not-existed dates/timestamps > > > Key: SPARK-31455 > URL: https://issues.apache.org/jira/browse/SPARK-31455 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Maxim Gekk >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31455) Fix rebasing of not-existed timestamps
Wenchen Fan created SPARK-31455: --- Summary: Fix rebasing of not-existed timestamps Key: SPARK-31455 URL: https://issues.apache.org/jira/browse/SPARK-31455 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.0.0 Reporter: Wenchen Fan Assignee: Maxim Gekk -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-15616) CatalogRelation should fallback to HDFS size of partitions that are involved in Query if statistics are not available.
[ https://issues.apache.org/jira/browse/SPARK-15616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li reassigned SPARK-15616: --- Assignee: Hu Fuwang > CatalogRelation should fallback to HDFS size of partitions that are involved > in Query if statistics are not available. > -- > > Key: SPARK-15616 > URL: https://issues.apache.org/jira/browse/SPARK-15616 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.3.0 >Reporter: Lianhui Wang >Assignee: Hu Fuwang >Priority: Major > Fix For: 3.0.0 > > > Currently if some partitions of a partitioned table are used in join > operation we rely on Metastore returned size of table to calculate if we can > convert the operation to Broadcast join. > if Filter can prune some partitions, Hive can prune partition before > determining to use broadcast joins according to HDFS size of partitions that > are involved in Query.So sparkSQL needs it that can improve join's > performance for partitioned table. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31454) An optimized K-Means based on DenseMatrix and GEMM
Xiaochang Wu created SPARK-31454: Summary: An optimized K-Means based on DenseMatrix and GEMM Key: SPARK-31454 URL: https://issues.apache.org/jira/browse/SPARK-31454 Project: Spark Issue Type: Improvement Components: ML Affects Versions: 3.0.0 Reporter: Xiaochang Wu The main computations in K-Means are calculating distances between individual points and center points. Currently K-Means implementation is vector-based which can't take advantage of optimized native BLAS libraries. When the original points are represented as dense vectors, our approach is to modify the original input data structures to a DenseMatrix-based one by grouping several points together. The original distance calculations can be translated into a Matrix multiplication then optimized native GEMM routines (Intel MKL, OpenBLAS etc.) can be used. This approach can also work with sparse vectors despite having larger memory consumption when translating sparse vectors to dense matrix. Our preliminary benchmark shows this DenseMatrix+GEMM approach can boost the training performance by *3.5x* with Intel MKL, looks very promising! To minimize end user impact, proposed changes are to use config parameters to control if turn on this implementation without modifying public interfaces. Parameter rowsPerMatrix is used to control how many points are grouped together to build a DenseMatrix. An example: $ spark-submit --master $SPARK_MASTER \ --conf "spark.ml.kmeans.matrixImplementation.enabled=true" \ --conf "spark.ml.kmeans.matrixImplementation.rowsPerMatrix=5000" \ --class org.apache.spark.examples.ml.KMeansExample Several code changes are made in "spark.ml" namespace as we think "spark.mllib" is in maintenance mode, some are duplications from spark.mllib for using private definitions in the same package: - Modified: KMeans.scala - Added: KMeansMatrixImpl.scala - Duplications: DistanceMeasure.scala, LocalKMeans.scala If this general idea is accepted by community, we are willing to contribute our code to upstream and polish the implementation according to feedbacks and produce benchmarks. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-31428) Document Common Table Expression in SQL Reference
[ https://issues.apache.org/jira/browse/SPARK-31428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Takeshi Yamamuro resolved SPARK-31428. -- Fix Version/s: 3.0.0 Assignee: Huaxin Gao Resolution: Fixed Resolved by https://github.com/apache/spark/pull/28196 > Document Common Table Expression in SQL Reference > - > > Key: SPARK-31428 > URL: https://issues.apache.org/jira/browse/SPARK-31428 > Project: Spark > Issue Type: Sub-task > Components: Documentation, SQL >Affects Versions: 3.0.0 >Reporter: Huaxin Gao >Assignee: Huaxin Gao >Priority: Major > Fix For: 3.0.0 > > > Document Common Table Expression in SQL Reference -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31380) Peak Execution Memory Quantile is not displayed in Spark History Server UI
[ https://issues.apache.org/jira/browse/SPARK-31380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084431#comment-17084431 ] Baohe Zhang commented on SPARK-31380: - Did you run your application with Spark 3? I tested it in my local 3.0 build and I saw the spark history server UI can display non-zero Peak Execution Memory. !image-2020-04-15-18-16-18-254.png! > Peak Execution Memory Quantile is not displayed in Spark History Server UI > -- > > Key: SPARK-31380 > URL: https://issues.apache.org/jira/browse/SPARK-31380 > Project: Spark > Issue Type: Bug > Components: Spark Core, Web UI >Affects Versions: 3.0.0 >Reporter: Srinivas Rishindra Pothireddi >Priority: Major > Attachments: image-2020-04-15-18-16-18-254.png > > > Peak Execution Memory Quantile is displayed in the regular Spark UI > correctly. If the same application is viewed in Spark History Server UI, Peak > Execution Memory is always displayed as zero. > Spark event log for the application seem to contain Peak Execution > Memory(under the tag "internal.metrics.peakExecutionMemory") correctly. > However this is not reflected in the History Server UI. > *Steps to produce non-zero Peak Execution Memory* > spark.range(0, 20).map\{x => (x , x % 20)}.toDF("a", > "b").createOrReplaceTempView("fred") > spark.range(0, 20).map\{x => (x , x + 1)}.toDF("a", > "b").createOrReplaceTempView("phil") > sql("select p.**,* f.* from phil p join fred f on f.b = p.b").count > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31380) Peak Execution Memory Quantile is not displayed in Spark History Server UI
[ https://issues.apache.org/jira/browse/SPARK-31380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Baohe Zhang updated SPARK-31380: Attachment: image-2020-04-15-18-16-18-254.png > Peak Execution Memory Quantile is not displayed in Spark History Server UI > -- > > Key: SPARK-31380 > URL: https://issues.apache.org/jira/browse/SPARK-31380 > Project: Spark > Issue Type: Bug > Components: Spark Core, Web UI >Affects Versions: 3.0.0 >Reporter: Srinivas Rishindra Pothireddi >Priority: Major > Attachments: image-2020-04-15-18-16-18-254.png > > > Peak Execution Memory Quantile is displayed in the regular Spark UI > correctly. If the same application is viewed in Spark History Server UI, Peak > Execution Memory is always displayed as zero. > Spark event log for the application seem to contain Peak Execution > Memory(under the tag "internal.metrics.peakExecutionMemory") correctly. > However this is not reflected in the History Server UI. > *Steps to produce non-zero Peak Execution Memory* > spark.range(0, 20).map\{x => (x , x % 20)}.toDF("a", > "b").createOrReplaceTempView("fred") > spark.range(0, 20).map\{x => (x , x + 1)}.toDF("a", > "b").createOrReplaceTempView("phil") > sql("select p.**,* f.* from phil p join fred f on f.b = p.b").count > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31236) Spark error while consuming data from Kinesis direct end point
[ https://issues.apache.org/jira/browse/SPARK-31236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084419#comment-17084419 ] Thukarama Prabhu edited comment on SPARK-31236 at 4/15/20, 10:45 PM: - Looks like this requires major code changes. Spark API currently uses KCL 1.x (not currently supported from AWS) implementation. We need to migrate to KCL 2.x. [https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration] was (Author: pthukar): Looks like this requires major code changes. Spark API currently uses KCL 1.x implementation. We need to migrate to KCL 2.x. [https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration] > Spark error while consuming data from Kinesis direct end point > -- > > Key: SPARK-31236 > URL: https://issues.apache.org/jira/browse/SPARK-31236 > Project: Spark > Issue Type: Bug > Components: DStreams, Java API >Affects Versions: 2.4.5 >Reporter: Thukarama Prabhu >Priority: Critical > > Here is the summary of the issue I am experiencing when using kinesis direct > URL for consuming data using spark. > *Kinesis direct URL:* > [https://kinesis-ae1.hdw.r53.deap.tv|https://kinesis-ae1.hdw.r53.deap.tv/] > (Failing with Credential should be scoped to a valid region, not 'ae1') > *Kinesis default URL:* > [https://kinesis.us-east-1.amazonaws.com|https://kinesis.us-east-1.amazonaws.com/] > (Working) > Spark code for consuming data > SparkAWSCredentials credentials = > commonService.getSparkAWSCredentials(kinApp.propConfig); > KinesisInputDStream kinesisStream = KinesisInputDStream.builder() > .streamingContext(jssc) > .checkpointAppName(applicationName) > .streamName(streamName) > .endpointUrl(endpointURL) > .regionName(regionName) > > .initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initPosition)) > .checkpointInterval(checkpointInterval) > .kinesisCredentials(credentials) > .storageLevel(StorageLevel.MEMORY_AND_DISK_2()).build(); > > Spark version 2.4.4 > > org.apache.spark > spark-streaming-kinesis-asl_2.11 > 2.4.5 > > > com.amazonaws > amazon-kinesis-client > 1.13.3 > > > com.amazonaws > aws-java-sdk > 1.11.747 > > > The spark application works fine when I use default URL but fails when I > change to direct URL with below error. The direct URL works when I try to > publish to direct kinesis URL. Issue only when I try to consume data. > > 2020-03-24 08:43:40,650 ERROR - Caught exception while sync'ing Kinesis > shards and leases > com.amazonaws.services.kinesis.model.AmazonKinesisException: Credential > should be scoped to a valid region, not 'ae1'. (Service: AmazonKinesis; > Status Code: 400; Error Code: InvalidSignatureException; Request ID: > fb43b636-8ce2-ec77-adb7-a8ead9e038c2) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1557) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528) > at >
[jira] [Commented] (SPARK-31236) Spark error while consuming data from Kinesis direct end point
[ https://issues.apache.org/jira/browse/SPARK-31236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084419#comment-17084419 ] Thukarama Prabhu commented on SPARK-31236: -- Looks like this requires major code changes. Spark API currently uses KCL 1.x implementation. We need to migrate to KCL 2.x. [https://docs.aws.amazon.com/streams/latest/dev/kcl-migration.html#worker-migration] > Spark error while consuming data from Kinesis direct end point > -- > > Key: SPARK-31236 > URL: https://issues.apache.org/jira/browse/SPARK-31236 > Project: Spark > Issue Type: Bug > Components: DStreams, Java API >Affects Versions: 2.4.5 >Reporter: Thukarama Prabhu >Priority: Critical > > Here is the summary of the issue I am experiencing when using kinesis direct > URL for consuming data using spark. > *Kinesis direct URL:* > [https://kinesis-ae1.hdw.r53.deap.tv|https://kinesis-ae1.hdw.r53.deap.tv/] > (Failing with Credential should be scoped to a valid region, not 'ae1') > *Kinesis default URL:* > [https://kinesis.us-east-1.amazonaws.com|https://kinesis.us-east-1.amazonaws.com/] > (Working) > Spark code for consuming data > SparkAWSCredentials credentials = > commonService.getSparkAWSCredentials(kinApp.propConfig); > KinesisInputDStream kinesisStream = KinesisInputDStream.builder() > .streamingContext(jssc) > .checkpointAppName(applicationName) > .streamName(streamName) > .endpointUrl(endpointURL) > .regionName(regionName) > > .initialPosition(KinesisInitialPositions.fromKinesisInitialPosition(initPosition)) > .checkpointInterval(checkpointInterval) > .kinesisCredentials(credentials) > .storageLevel(StorageLevel.MEMORY_AND_DISK_2()).build(); > > Spark version 2.4.4 > > org.apache.spark > spark-streaming-kinesis-asl_2.11 > 2.4.5 > > > com.amazonaws > amazon-kinesis-client > 1.13.3 > > > com.amazonaws > aws-java-sdk > 1.11.747 > > > The spark application works fine when I use default URL but fails when I > change to direct URL with below error. The direct URL works when I try to > publish to direct kinesis URL. Issue only when I try to consume data. > > 2020-03-24 08:43:40,650 ERROR - Caught exception while sync'ing Kinesis > shards and leases > com.amazonaws.services.kinesis.model.AmazonKinesisException: Credential > should be scoped to a valid region, not 'ae1'. (Service: AmazonKinesis; > Status Code: 400; Error Code: InvalidSignatureException; Request ID: > fb43b636-8ce2-ec77-adb7-a8ead9e038c2) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1557) > at > com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528) > at > com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.listShards(KinesisProxy.java:326) > at > com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.getShardList(KinesisProxy.java:441) > at >
[jira] [Assigned] (SPARK-31399) Closure cleaner broken in Scala 2.12
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li reassigned SPARK-31399: --- Assignee: Kris Mok > Closure cleaner broken in Scala 2.12 > > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.5, 3.0.0 >Reporter: Wenchen Fan >Assignee: Kris Mok >Priority: Blocker > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at :20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} > **Apache Spark 2.4.5 with Scala 2.12** > {code} > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 2.4.5 > /_/ > Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) > at
[jira] [Commented] (SPARK-31399) Closure cleaner broken in Scala 2.12
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084356#comment-17084356 ] Xiao Li commented on SPARK-31399: - [~rednaxelafx] will help this ticket and do more investigation. > Closure cleaner broken in Scala 2.12 > > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.5, 3.0.0 >Reporter: Wenchen Fan >Assignee: Kris Mok >Priority: Blocker > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at :20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} > **Apache Spark 2.4.5 with Scala 2.12** > {code} > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 2.4.5 > /_/ > Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) > at
[jira] [Commented] (SPARK-31423) DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC
[ https://issues.apache.org/jira/browse/SPARK-31423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084308#comment-17084308 ] Maxim Gekk commented on SPARK-31423: [~bersprockets] I think we should take the next valid date for any not-existed dates, see the linked PR. > DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC > -- > > Key: SPARK-31423 > URL: https://issues.apache.org/jira/browse/SPARK-31423 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Bruce Robbins >Priority: Major > > There is a range of days (1582-10-05 to 1582-10-14) for which DATEs and > TIMESTAMPS are changed when stored in ORC. The value is off by 10 days. > For example: > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.show // seems fine > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_date") > scala> spark.read.orc("/tmp/funny_orc_date").show // off by 10 days > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > ORC has the same issue with TIMESTAMPS: > {noformat} > scala> val df = sql("select cast('1582-10-14 00:00:00' as TIMESTAMP) ts") > df: org.apache.spark.sql.DataFrame = [ts: timestamp] > scala> df.show // seems fine > +---+ > | ts| > +---+ > |1582-10-14 00:00:00| > +---+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_timestamp") > scala> spark.read.orc("/tmp/funny_orc_timestamp").show(truncate=false) // off > by 10 days > +---+ > |ts | > +---+ > |1582-10-24 00:00:00| > +---+ > scala> > {noformat} > However, when written to Parquet or Avro, DATES and TIMESTAMPs for this range > do not change. > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").parquet("/tmp/funny_parquet_date") > scala> spark.read.parquet("/tmp/funny_parquet_date").show // reflects > original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").format("avro").save("/tmp/funny_avro_date") > scala> spark.read.format("avro").load("/tmp/funny_avro_date").show // > reflects original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> > {noformat} > It's unclear to me whether ORC is behaving correctly or not, as this is how > Spark 2.4 works with DATEs and TIMESTAMPs in general (and also how Spark 3.x > works with DATEs and TIMESTAMPs in general when > {{spark.sql.legacy.timeParserPolicy}} is set to {{LEGACY}}). In Spark 2.4, > DATEs and TIMESTAMPs in this range don't exist: > {noformat} > scala> sql("select cast('1582-10-14' as DATE) dt").show // the same cast done > in Spark 2.4 > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > I assume the following snippet is relevant (from the Wikipedia entry on the > Gregorian calendar): > {quote}To deal with the 10 days' difference (between calendar and > reality)[Note 2] that this drift had already reached, the date was advanced > so that 4 October 1582 was followed by 15 October 1582 > {quote} > Spark 3.x should treat DATEs and TIMESTAMPS in this range consistently, and > probably based on spark.sql.legacy.timeParserPolicy (or some other config) > rather than file format. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31423) DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC
[ https://issues.apache.org/jira/browse/SPARK-31423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084288#comment-17084288 ] Bruce Robbins commented on SPARK-31423: --- Thanks. It seems we can either - close this as "not a bug", or - I can file an ORC Jira (later this week, after I fiddle with the library a bit) and mark this as blocked on the ORC Jira. > DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC > -- > > Key: SPARK-31423 > URL: https://issues.apache.org/jira/browse/SPARK-31423 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Bruce Robbins >Priority: Major > > There is a range of days (1582-10-05 to 1582-10-14) for which DATEs and > TIMESTAMPS are changed when stored in ORC. The value is off by 10 days. > For example: > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.show // seems fine > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_date") > scala> spark.read.orc("/tmp/funny_orc_date").show // off by 10 days > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > ORC has the same issue with TIMESTAMPS: > {noformat} > scala> val df = sql("select cast('1582-10-14 00:00:00' as TIMESTAMP) ts") > df: org.apache.spark.sql.DataFrame = [ts: timestamp] > scala> df.show // seems fine > +---+ > | ts| > +---+ > |1582-10-14 00:00:00| > +---+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_timestamp") > scala> spark.read.orc("/tmp/funny_orc_timestamp").show(truncate=false) // off > by 10 days > +---+ > |ts | > +---+ > |1582-10-24 00:00:00| > +---+ > scala> > {noformat} > However, when written to Parquet or Avro, DATES and TIMESTAMPs for this range > do not change. > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").parquet("/tmp/funny_parquet_date") > scala> spark.read.parquet("/tmp/funny_parquet_date").show // reflects > original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").format("avro").save("/tmp/funny_avro_date") > scala> spark.read.format("avro").load("/tmp/funny_avro_date").show // > reflects original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> > {noformat} > It's unclear to me whether ORC is behaving correctly or not, as this is how > Spark 2.4 works with DATEs and TIMESTAMPs in general (and also how Spark 3.x > works with DATEs and TIMESTAMPs in general when > {{spark.sql.legacy.timeParserPolicy}} is set to {{LEGACY}}). In Spark 2.4, > DATEs and TIMESTAMPs in this range don't exist: > {noformat} > scala> sql("select cast('1582-10-14' as DATE) dt").show // the same cast done > in Spark 2.4 > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > I assume the following snippet is relevant (from the Wikipedia entry on the > Gregorian calendar): > {quote}To deal with the 10 days' difference (between calendar and > reality)[Note 2] that this drift had already reached, the date was advanced > so that 4 October 1582 was followed by 15 October 1582 > {quote} > Spark 3.x should treat DATEs and TIMESTAMPS in this range consistently, and > probably based on spark.sql.legacy.timeParserPolicy (or some other config) > rather than file format. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31423) DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC
[ https://issues.apache.org/jira/browse/SPARK-31423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084234#comment-17084234 ] Wenchen Fan commented on SPARK-31423: - I hope the ORC community can figure this out and switch to/support the standard proleptic Gregorian calendar. One thing we can do is to check the behavior of Hive 3.x, as Hive also switches to the proleptic Gregorian calendar and should have the same issue. > DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC > -- > > Key: SPARK-31423 > URL: https://issues.apache.org/jira/browse/SPARK-31423 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Bruce Robbins >Priority: Major > > There is a range of days (1582-10-05 to 1582-10-14) for which DATEs and > TIMESTAMPS are changed when stored in ORC. The value is off by 10 days. > For example: > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.show // seems fine > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_date") > scala> spark.read.orc("/tmp/funny_orc_date").show // off by 10 days > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > ORC has the same issue with TIMESTAMPS: > {noformat} > scala> val df = sql("select cast('1582-10-14 00:00:00' as TIMESTAMP) ts") > df: org.apache.spark.sql.DataFrame = [ts: timestamp] > scala> df.show // seems fine > +---+ > | ts| > +---+ > |1582-10-14 00:00:00| > +---+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_timestamp") > scala> spark.read.orc("/tmp/funny_orc_timestamp").show(truncate=false) // off > by 10 days > +---+ > |ts | > +---+ > |1582-10-24 00:00:00| > +---+ > scala> > {noformat} > However, when written to Parquet or Avro, DATES and TIMESTAMPs for this range > do not change. > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").parquet("/tmp/funny_parquet_date") > scala> spark.read.parquet("/tmp/funny_parquet_date").show // reflects > original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").format("avro").save("/tmp/funny_avro_date") > scala> spark.read.format("avro").load("/tmp/funny_avro_date").show // > reflects original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> > {noformat} > It's unclear to me whether ORC is behaving correctly or not, as this is how > Spark 2.4 works with DATEs and TIMESTAMPs in general (and also how Spark 3.x > works with DATEs and TIMESTAMPs in general when > {{spark.sql.legacy.timeParserPolicy}} is set to {{LEGACY}}). In Spark 2.4, > DATEs and TIMESTAMPs in this range don't exist: > {noformat} > scala> sql("select cast('1582-10-14' as DATE) dt").show // the same cast done > in Spark 2.4 > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > I assume the following snippet is relevant (from the Wikipedia entry on the > Gregorian calendar): > {quote}To deal with the 10 days' difference (between calendar and > reality)[Note 2] that this drift had already reached, the date was advanced > so that 4 October 1582 was followed by 15 October 1582 > {quote} > Spark 3.x should treat DATEs and TIMESTAMPS in this range consistently, and > probably based on spark.sql.legacy.timeParserPolicy (or some other config) > rather than file format. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31371) FileStreamSource: Decide seen files on the checksum, instead of filename.
[ https://issues.apache.org/jira/browse/SPARK-31371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084223#comment-17084223 ] Gabor Somogyi commented on SPARK-31371: --- There was a similar feature request before and the conclusion was not to add it. As an extract when the file created it must be done atomically and then the content mustn't change. Didn't really found the jira or PR, maybe closed. AFIAK [~zsxwing] was there... > FileStreamSource: Decide seen files on the checksum, instead of filename. > - > > Key: SPARK-31371 > URL: https://issues.apache.org/jira/browse/SPARK-31371 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.5, 3.0.0 >Reporter: Prashant Sharma >Priority: Major > > At the moment structured streaming's file source, ignores updates to the same > file, it has processed earlier. However, for reasons beyond our control, a > software might update the same file with new data. A case in point can be > rolling logs, where the latest log file is always e.g. log.txt and the rolled > logs could be log-1.txt etc... > So by supporting this, it may not actually be a special casing but supporting > a genuine use case. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31453) Error while converting JavaRDD to Dataframe
Sachit Sharma created SPARK-31453: - Summary: Error while converting JavaRDD to Dataframe Key: SPARK-31453 URL: https://issues.apache.org/jira/browse/SPARK-31453 Project: Spark Issue Type: Bug Components: Java API Affects Versions: 2.4.5 Reporter: Sachit Sharma Please refer to this: [https://stackoverflow.com/questions/61172007/error-while-converting-javardd-to-dataframe] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31423) DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC
[ https://issues.apache.org/jira/browse/SPARK-31423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084218#comment-17084218 ] Bruce Robbins commented on SPARK-31423: --- OK, so this is a case of a limitation of the ORC library, and there is not much we can do about it in Spark (other than throw an user-overridable exception when this happens). Therefore, some users might see a case like this: {noformat} val df = sql("select cast('1582-10-14' as DATE) dt") df.write.mode("overwrite").parquet("/tmp/dateparquet") df.write.mode("overwrite").format("avro").save("/tmp/dateavro") df.write.mode("overwrite").orc("/tmp/dateorc") val dfParquet = spark.read.parquet("/tmp/dateparquet") val dfAvro = spark.read.format("avro").load("/tmp/dateavro") val dfOrc = spark.read.orc("/tmp/dateorc") scala> scala> dfParquet.join(dfAvro, "dt").count // can join to avro res4: Long = 1 scala> dfParquet.join(dfOrc, "dt").count // doesn't find it in orc res5: Long = 0 scala> {noformat} > DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC > -- > > Key: SPARK-31423 > URL: https://issues.apache.org/jira/browse/SPARK-31423 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Bruce Robbins >Priority: Major > > There is a range of days (1582-10-05 to 1582-10-14) for which DATEs and > TIMESTAMPS are changed when stored in ORC. The value is off by 10 days. > For example: > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.show // seems fine > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_date") > scala> spark.read.orc("/tmp/funny_orc_date").show // off by 10 days > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > ORC has the same issue with TIMESTAMPS: > {noformat} > scala> val df = sql("select cast('1582-10-14 00:00:00' as TIMESTAMP) ts") > df: org.apache.spark.sql.DataFrame = [ts: timestamp] > scala> df.show // seems fine > +---+ > | ts| > +---+ > |1582-10-14 00:00:00| > +---+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_timestamp") > scala> spark.read.orc("/tmp/funny_orc_timestamp").show(truncate=false) // off > by 10 days > +---+ > |ts | > +---+ > |1582-10-24 00:00:00| > +---+ > scala> > {noformat} > However, when written to Parquet or Avro, DATES and TIMESTAMPs for this range > do not change. > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").parquet("/tmp/funny_parquet_date") > scala> spark.read.parquet("/tmp/funny_parquet_date").show // reflects > original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").format("avro").save("/tmp/funny_avro_date") > scala> spark.read.format("avro").load("/tmp/funny_avro_date").show // > reflects original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> > {noformat} > It's unclear to me whether ORC is behaving correctly or not, as this is how > Spark 2.4 works with DATEs and TIMESTAMPs in general (and also how Spark 3.x > works with DATEs and TIMESTAMPs in general when > {{spark.sql.legacy.timeParserPolicy}} is set to {{LEGACY}}). In Spark 2.4, > DATEs and TIMESTAMPs in this range don't exist: > {noformat} > scala> sql("select cast('1582-10-14' as DATE) dt").show // the same cast done > in Spark 2.4 > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > I assume the following snippet is relevant (from the Wikipedia entry on the > Gregorian calendar): > {quote}To deal with the 10 days' difference (between calendar and > reality)[Note 2] that this drift had already reached, the date was advanced > so that 4 October 1582 was followed by 15 October 1582 > {quote} > Spark 3.x should treat DATEs and TIMESTAMPS in this range consistently, and > probably based on spark.sql.legacy.timeParserPolicy (or some other config) > rather than file format. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25440) Dump query execution info to a file
[ https://issues.apache.org/jira/browse/SPARK-25440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li reassigned SPARK-25440: --- Assignee: Maxim Gekk > Dump query execution info to a file > --- > > Key: SPARK-25440 > URL: https://issues.apache.org/jira/browse/SPARK-25440 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.1 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > Fix For: 3.0.0 > > > Output of the explain() doesn't contain full information and in some cases > can be truncated. Besides of that it saves info to a string in memory which > can cause OOM. The ticket aims to solve the problem and dump info about query > execution to a file. Need to add new method to queryExecution.debug which > accepts a path to a file. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31452) Do not create partition spec for 0-size partitions
Wenchen Fan created SPARK-31452: --- Summary: Do not create partition spec for 0-size partitions Key: SPARK-31452 URL: https://issues.apache.org/jira/browse/SPARK-31452 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.0 Reporter: Wenchen Fan Assignee: Wenchen Fan -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31451) Kafka connector does not retry in case of RetriableException
Chaoran Yu created SPARK-31451: -- Summary: Kafka connector does not retry in case of RetriableException Key: SPARK-31451 URL: https://issues.apache.org/jira/browse/SPARK-31451 Project: Spark Issue Type: Bug Components: DStreams Affects Versions: 2.4.5 Reporter: Chaoran Yu The Spark DStream API uses spark-streaming-kafka-0-10 to talk to Kafka. The method in the connector code that's responsible for committing offset, commitAll, calls commitAsync in Kafka client to commit the offsets. commitAsync tries to find the group coordinator and sends the commits in case of success, or throws a RetriableCommitFailedException in case of failure and doesn't retry. This behavior was introduced in KAFKA-4034. The reason for not attempting retry was written there as: "we don't want recursive retries which can cause offset commits to arrive out of order". From the Spark side though, we should be able to retry when running into a RetriableException. The issue of potentially committing offsets out of order can be addressed by keeping a monotonically increasing sequence number every time a commit happens and including this number in the callback function of commitAsync. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31437) Try assigning tasks to existing executors by which required resources in ResourceProfile are satisfied
[ https://issues.apache.org/jira/browse/SPARK-31437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084086#comment-17084086 ] Thomas Graves commented on SPARK-31437: --- so there are multiple reasons they are tied together for first implementation 1) it is the way it works now (user specifies an executor requirement and task requirement together) and its much smaller code change wise and less complexity. Get something in and working and see how its used and improve as needed. I've had a hard enough time getting this feature reviewed as is, making it more complex would have made that much harder. 2) You have to have a way to say what your executor requirements are. I thought about user just being able to specify the task requirements, but you still need a way to specify the executor requirements as well. Either in terms of the task requirements (ie I want 3 tasks to fit on one executor) and there are other things that don't fit into task requirements that you might need to specify separately in executor requirements. things like overhead memory and other confs (to be added later). 3) resource waste as already discussed. One of the main use cases we targeted here is the etl to ml use case. If you start putting etl tasks on nodes with GPU that don't use the GPU, that gets expensive as you are wasting the GPU. I understand your use case is different but that wasn't the main target for the first implementation. This is an RDD api and I would have expected much of the ETL to be more dataset/dataframe based. If you specify them separately I see that as potentially a huge waste of resources. 4) I think this is much easier for the user to reason about in most cases. They know exactly what they get and don't have to worry about making sure they have requested executors that meet the task requirements in the past, or figure out how much resources they are wasting because they didn't configure it properly. _" At the same time we can still have the opportunity to keep the overall logic simple: we can choose one strategy from several to create ResourceProfile from incoming ResourceRequest. "_ I don't understand what you mean by this? I think overall you would have to be more specific on a proposal for decoupling them and then how coupling would work. Let says I have my use case where I have etl -> ML. My etl tasks uses 8 cores, my ml tasks use 8 cores and 4 cpus. How do I keep my etl tasks from running on the ML executors without wasting resources? > Try assigning tasks to existing executors by which required resources in > ResourceProfile are satisfied > -- > > Key: SPARK-31437 > URL: https://issues.apache.org/jira/browse/SPARK-31437 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 3.1.0 >Reporter: Hongze Zhang >Priority: Major > > By the change in [PR|https://github.com/apache/spark/pull/27773] of > SPARK-29154, submitted tasks are scheduled onto executors only if resource > profile IDs strictly match. As a result Spark always starts new executors for > customized ResourceProfiles. > This limitation makes working with process-local jobs unfriendly. E.g. Task > cores has been increased from 1 to 4 in a new stage, and executor has 8 > slots, it is expected that 2 new tasks can be run on the existing executor > but Spark starts new executors for new ResourceProfile. The behavior is > unnecessary. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30364) The spark-streaming-kafka-0-10_2.11 test cases are failing on ppc64le
[ https://issues.apache.org/jira/browse/SPARK-30364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084033#comment-17084033 ] Nick Hryhoriev commented on SPARK-30364: I have the same issue with the spark app on Mac OS with 2.4.3 spark. The issue appeared only when I extend Spark Metrics With custom Source. Any known way to avoid it? > The spark-streaming-kafka-0-10_2.11 test cases are failing on ppc64le > - > > Key: SPARK-30364 > URL: https://issues.apache.org/jira/browse/SPARK-30364 > Project: Spark > Issue Type: Test > Components: Build, DStreams >Affects Versions: 2.4.0 > Environment: os: rhel 7.6 > arch: ppc64le >Reporter: AK97 >Priority: Major > > I have been trying to build the Apache Spark on rhel_7.6/ppc64le; however, > the spark-streaming-kafka-0-10_2.11 test cases are failing with following > error : > {code} > [ERROR] > /opt/spark/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala:85: > Symbol 'term org.eclipse' is missing from the classpath. > This symbol is required by 'method > org.apache.spark.metrics.MetricsSystem.getServletHandlers'. > Make sure that term eclipse is in your classpath and check for conflicting > dependencies with `-Ylog-classpath`. > A full rebuild may help if 'MetricsSystem.class' was compiled against an > incompatible version of org. > [ERROR] testUtils.sendMessages(topic, data.toArray) >^ > {code} > Would like some help on understanding the cause for the same . I am running > it on a High end VM with good connectivity. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-31394) Support for Kubernetes NFS volume mounts
[ https://issues.apache.org/jira/browse/SPARK-31394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-31394. --- Fix Version/s: 3.1.0 Resolution: Fixed Issue resolved by pull request 27364 [https://github.com/apache/spark/pull/27364] > Support for Kubernetes NFS volume mounts > > > Key: SPARK-31394 > URL: https://issues.apache.org/jira/browse/SPARK-31394 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.1.0 >Reporter: Seongjin Cho >Assignee: Seongjin Cho >Priority: Major > Fix For: 3.1.0 > > > Kubernetes supports various kinds of volumes, but Spark for Kubernetes > supports only EmptyDir/HostDir/PVC. By adding support for NFS, we can use > Spark for Kubernetes with NFS storage. > NFS could be used using PVC when we want to use some clean new empty disk > space, but in order to use files in existing NFS shares, we need to use NFS > volume mounts. > RP link: [https://github.com/apache/spark/pull/27364] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-31394) Support for Kubernetes NFS volume mounts
[ https://issues.apache.org/jira/browse/SPARK-31394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun reassigned SPARK-31394: - Assignee: Seongjin Cho > Support for Kubernetes NFS volume mounts > > > Key: SPARK-31394 > URL: https://issues.apache.org/jira/browse/SPARK-31394 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.1.0 >Reporter: Seongjin Cho >Assignee: Seongjin Cho >Priority: Major > > Kubernetes supports various kinds of volumes, but Spark for Kubernetes > supports only EmptyDir/HostDir/PVC. By adding support for NFS, we can use > Spark for Kubernetes with NFS storage. > NFS could be used using PVC when we want to use some clean new empty disk > space, but in order to use files in existing NFS shares, we need to use NFS > volume mounts. > RP link: [https://github.com/apache/spark/pull/27364] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31437) Try assigning tasks to existing executors by which required resources in ResourceProfile are satisfied
[ https://issues.apache.org/jira/browse/SPARK-31437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083959#comment-17083959 ] Hongze Zhang commented on SPARK-31437: -- Thanks [~tgraves] and I too found some related discussion around here[1]. I am glad to take this on but now I am still in the phase of understanding the concept and codes. Go back to the topic, So far one of my questions is, what's the point of binding task resource requirements to the executor instances? Can we break the consolidated ResourceProfile up into something like executor's ResourceProfile and task's ResourceRequest? If like that, we can remove the strict id requirement then issue like this one would be pretty easy to deal with. At the same time we can still have the opportunity to keep the overall logic simple: we can choose one strategy from several to create ResourceProfile from incoming ResourceRequest. The strictest strategy is exactly the same as what we are doing currently, and we can use more looser strategies for cases like mine. [1] https://github.com/apache/spark/pull/26284#issuecomment-553481259 > Try assigning tasks to existing executors by which required resources in > ResourceProfile are satisfied > -- > > Key: SPARK-31437 > URL: https://issues.apache.org/jira/browse/SPARK-31437 > Project: Spark > Issue Type: Improvement > Components: Scheduler >Affects Versions: 3.1.0 >Reporter: Hongze Zhang >Priority: Major > > By the change in [PR|https://github.com/apache/spark/pull/27773] of > SPARK-29154, submitted tasks are scheduled onto executors only if resource > profile IDs strictly match. As a result Spark always starts new executors for > customized ResourceProfiles. > This limitation makes working with process-local jobs unfriendly. E.g. Task > cores has been increased from 1 to 4 in a new stage, and executor has 8 > slots, it is expected that 2 new tasks can be run on the existing executor > but Spark starts new executors for new ResourceProfile. The behavior is > unnecessary. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31429) Add additional fields in ExpressionDescription for more granular category in documentation
[ https://issues.apache.org/jira/browse/SPARK-31429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Takeshi Yamamuro updated SPARK-31429: - Issue Type: Improvement (was: Bug) > Add additional fields in ExpressionDescription for more granular category in > documentation > -- > > Key: SPARK-31429 > URL: https://issues.apache.org/jira/browse/SPARK-31429 > Project: Spark > Issue Type: Improvement > Components: Documentation, SQL >Affects Versions: 3.0.0 >Reporter: Huaxin Gao >Priority: Major > > Add additional fields in ExpressionDescription so we can have more granular > category in function documentation. For example, we want to group window > function into finer categories such as ranking functions and analytic > functions. > See Hyukjin's comment below for more details; > https://github.com/apache/spark/pull/28170#issuecomment-611917191 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26385) YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in cache
[ https://issues.apache.org/jira/browse/SPARK-26385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083937#comment-17083937 ] Zhou Jiashuai commented on SPARK-26385: --- [~gsomogyi] There is no unusual log detected in application logs except "Token can't be found" reported by [~stud3nt]. My job stops working with the same stack error as [~stud3nt]. > YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in > cache > --- > > Key: SPARK-26385 > URL: https://issues.apache.org/jira/browse/SPARK-26385 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Hadoop 2.6.0, Spark 2.4.0 >Reporter: T M >Priority: Major > > > Hello, > > I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, > Spark 2.4.0). After 25-26 hours, my job stops working with following error: > {code:java} > 2018-12-16 22:35:17 ERROR > org.apache.spark.internal.Logging$class.logError(Logging.scala:91): Query > TestQuery[id = a61ce197-1d1b-4e82-a7af-60162953488b, runId = > a56878cf-dfc7-4f6a-ad48-02cf738ccc2f] terminated with error > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (token for REMOVED: HDFS_DELEGATION_TOKEN owner=REMOVED, renewer=yarn, > realUser=, issueDate=1544903057122, maxDate=1545507857122, > sequenceNumber=10314, masterKeyId=344) can't be found in cache at > org.apache.hadoop.ipc.Client.call(Client.java:1470) at > org.apache.hadoop.ipc.Client.call(Client.java:1401) at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at > org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977) at > org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:133) at > org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1120) at > org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1116) at > org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at > org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1116) at > org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1581) at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:544) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at >
[jira] [Updated] (SPARK-31394) Support for Kubernetes NFS volume mounts
[ https://issues.apache.org/jira/browse/SPARK-31394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-31394: -- Affects Version/s: (was: 3) 3.1.0 > Support for Kubernetes NFS volume mounts > > > Key: SPARK-31394 > URL: https://issues.apache.org/jira/browse/SPARK-31394 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.1.0 >Reporter: Seongjin Cho >Priority: Minor > > Kubernetes supports various kinds of volumes, but Spark for Kubernetes > supports only EmptyDir/HostDir/PVC. By adding support for NFS, we can use > Spark for Kubernetes with NFS storage. > NFS could be used using PVC when we want to use some clean new empty disk > space, but in order to use files in existing NFS shares, we need to use NFS > volume mounts. > RP link: [https://github.com/apache/spark/pull/27364] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31394) Support for Kubernetes NFS volume mounts
[ https://issues.apache.org/jira/browse/SPARK-31394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-31394: -- Shepherd: (was: Dongjoon Hyun) > Support for Kubernetes NFS volume mounts > > > Key: SPARK-31394 > URL: https://issues.apache.org/jira/browse/SPARK-31394 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.1.0 >Reporter: Seongjin Cho >Priority: Major > > Kubernetes supports various kinds of volumes, but Spark for Kubernetes > supports only EmptyDir/HostDir/PVC. By adding support for NFS, we can use > Spark for Kubernetes with NFS storage. > NFS could be used using PVC when we want to use some clean new empty disk > space, but in order to use files in existing NFS shares, we need to use NFS > volume mounts. > RP link: [https://github.com/apache/spark/pull/27364] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31394) Support for Kubernetes NFS volume mounts
[ https://issues.apache.org/jira/browse/SPARK-31394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-31394: -- Priority: Major (was: Minor) > Support for Kubernetes NFS volume mounts > > > Key: SPARK-31394 > URL: https://issues.apache.org/jira/browse/SPARK-31394 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 3.1.0 >Reporter: Seongjin Cho >Priority: Major > > Kubernetes supports various kinds of volumes, but Spark for Kubernetes > supports only EmptyDir/HostDir/PVC. By adding support for NFS, we can use > Spark for Kubernetes with NFS storage. > NFS could be used using PVC when we want to use some clean new empty disk > space, but in order to use files in existing NFS shares, we need to use NFS > volume mounts. > RP link: [https://github.com/apache/spark/pull/27364] -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31450) Make ExpressionEncoder thread safe
Herman van Hövell created SPARK-31450: - Summary: Make ExpressionEncoder thread safe Key: SPARK-31450 URL: https://issues.apache.org/jira/browse/SPARK-31450 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Herman van Hövell Assignee: Herman van Hövell ExpressionEncoder is currently not thread-safe because it contains stateful objects that are required for converting objects to internal rows and vise versa. We have been working around this by (excessively) cloning ExpressionEncoders which is not free. I propose that we move the stateful bits of the expression encoder into two helper classes that will take care of the conversions. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27296) Efficient User Defined Aggregators
[ https://issues.apache.org/jira/browse/SPARK-27296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083901#comment-17083901 ] Wenchen Fan commented on SPARK-27296: - This feature is to speed up UDAF by using Aggregator, but not to improve Aggregator performance. > Efficient User Defined Aggregators > --- > > Key: SPARK-27296 > URL: https://issues.apache.org/jira/browse/SPARK-27296 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL, Structured Streaming >Affects Versions: 2.3.3, 2.4.0, 3.0.0 >Reporter: Erik Erlandson >Assignee: Erik Erlandson >Priority: Major > Labels: performance, usability > Fix For: 3.0.0 > > > Spark's UDAFs appear to be serializing and de-serializing to/from the > MutableAggregationBuffer for each row. This gist shows a small reproducing > UDAF and a spark shell session: > [https://gist.github.com/erikerlandson/3c4d8c6345d1521d89e0d894a423046f] > The UDAF and its compantion UDT are designed to count the number of times > that ser/de is invoked for the aggregator. The spark shell session > demonstrates that it is executing ser/de on every row of the data frame. > Note, Spark's pre-defined aggregators do not have this problem, as they are > based on an internal aggregating trait that does the correct thing and only > calls ser/de at points such as partition boundaries, presenting final > results, etc. > This is a major problem for UDAFs, as it means that every UDAF is doing a > massive amount of unnecessary work per row, including but not limited to Row > object allocations. For a more realistic UDAF having its own non trivial > internal structure it is obviously that much worse. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-31226) SizeBasedCoalesce logic error
[ https://issues.apache.org/jira/browse/SPARK-31226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sathyaprakash Govindasamy updated SPARK-31226: -- Comment: was deleted (was: [~angerszhuuu] Could you please share more details on this issue) > SizeBasedCoalesce logic error > - > > Key: SPARK-31226 > URL: https://issues.apache.org/jira/browse/SPARK-31226 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: angerszhu >Priority: Minor > > In spark UT, > SizeBasedCoalecse's logic is wrong -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31226) SizeBasedCoalesce logic error
[ https://issues.apache.org/jira/browse/SPARK-31226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083890#comment-17083890 ] Sathyaprakash Govindasamy commented on SPARK-31226: --- [~angerszhuuu] Could you please share more details on this issue > SizeBasedCoalesce logic error > - > > Key: SPARK-31226 > URL: https://issues.apache.org/jira/browse/SPARK-31226 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0, 3.0.0 >Reporter: angerszhu >Priority: Minor > > In spark UT, > SizeBasedCoalecse's logic is wrong -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-31447) DATE_PART functions produces incorrect result
[ https://issues.apache.org/jira/browse/SPARK-31447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sathyaprakash Govindasamy updated SPARK-31447: -- Priority: Minor (was: Major) > DATE_PART functions produces incorrect result > - > > Key: SPARK-31447 > URL: https://issues.apache.org/jira/browse/SPARK-31447 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3 >Reporter: Sathyaprakash Govindasamy >Priority: Minor > > Spark does not extract correct date part from calendar interval. Below is one > example for extracting day from calendar interval > {code:java} > spark.sql("SELECT EXTRACT(DAY FROM (cast('2020-01-15 00:00:00' as timestamp) > - cast('2020-01-01 00:00:00' as timestamp)))").show{code} > ++ > |date_part('DAY', subtracttimestamps(CAST('2020-01-15 00:00:00' AS > TIMESTAMP), CAST('2020-01-01 00:00:00' AS TIMESTAMP)))| > ++ > | 0| > ++ > Actual output 0 days > Correct output 14 days > This is because SubtractTimestamps expression calculates difference and > populates only microseconds field. months and days field are set to zero > {code:java} > new CalendarInterval(months=0, days=0, microseconds=end.asInstanceOf[Long] - > start.asInstanceOf[Long]){code} > https://github.com/apache/spark/blob/2c5d489679ba3814973680d65853877664bcd931/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala#L2211 > But ExtractIntervalDays expression retrieves days information from days field > in CalendarInterval and returns zero. > {code:java} > def getDays(interval: CalendarInterval): Int = { > interval.days > }{code} > https://github.com/apache/spark/blob/2c5d489679ba3814973680d65853877664bcd931/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala#L73 -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26385) YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in cache
[ https://issues.apache.org/jira/browse/SPARK-26385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083888#comment-17083888 ] Gabor Somogyi commented on SPARK-26385: --- [~zjiash] Please don't forget to attach full driver AND executor logs... > YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in > cache > --- > > Key: SPARK-26385 > URL: https://issues.apache.org/jira/browse/SPARK-26385 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Hadoop 2.6.0, Spark 2.4.0 >Reporter: T M >Priority: Major > > > Hello, > > I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, > Spark 2.4.0). After 25-26 hours, my job stops working with following error: > {code:java} > 2018-12-16 22:35:17 ERROR > org.apache.spark.internal.Logging$class.logError(Logging.scala:91): Query > TestQuery[id = a61ce197-1d1b-4e82-a7af-60162953488b, runId = > a56878cf-dfc7-4f6a-ad48-02cf738ccc2f] terminated with error > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (token for REMOVED: HDFS_DELEGATION_TOKEN owner=REMOVED, renewer=yarn, > realUser=, issueDate=1544903057122, maxDate=1545507857122, > sequenceNumber=10314, masterKeyId=344) can't be found in cache at > org.apache.hadoop.ipc.Client.call(Client.java:1470) at > org.apache.hadoop.ipc.Client.call(Client.java:1401) at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at > org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977) at > org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:133) at > org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1120) at > org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1116) at > org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at > org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1116) at > org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1581) at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:544) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at >
[jira] [Commented] (SPARK-26385) YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in cache
[ https://issues.apache.org/jira/browse/SPARK-26385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083871#comment-17083871 ] Zhou Jiashuai commented on SPARK-26385: --- We use yarn-cluster mode and I will set "log4j.logger.org.apache.spark.deploy.yarn.security.AMCredentialRenewer=DEBUG" to collect more logs. Many thanks for your help. > YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in > cache > --- > > Key: SPARK-26385 > URL: https://issues.apache.org/jira/browse/SPARK-26385 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Hadoop 2.6.0, Spark 2.4.0 >Reporter: T M >Priority: Major > > > Hello, > > I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, > Spark 2.4.0). After 25-26 hours, my job stops working with following error: > {code:java} > 2018-12-16 22:35:17 ERROR > org.apache.spark.internal.Logging$class.logError(Logging.scala:91): Query > TestQuery[id = a61ce197-1d1b-4e82-a7af-60162953488b, runId = > a56878cf-dfc7-4f6a-ad48-02cf738ccc2f] terminated with error > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (token for REMOVED: HDFS_DELEGATION_TOKEN owner=REMOVED, renewer=yarn, > realUser=, issueDate=1544903057122, maxDate=1545507857122, > sequenceNumber=10314, masterKeyId=344) can't be found in cache at > org.apache.hadoop.ipc.Client.call(Client.java:1470) at > org.apache.hadoop.ipc.Client.call(Client.java:1401) at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at > org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977) at > org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:133) at > org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1120) at > org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1116) at > org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at > org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1116) at > org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1581) at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:544) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at >
[jira] [Commented] (SPARK-26646) Flaky test: pyspark.mllib.tests.test_streaming_algorithms StreamingLogisticRegressionWithSGDTests.test_training_and_prediction
[ https://issues.apache.org/jira/browse/SPARK-26646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083856#comment-17083856 ] Jungtaek Lim commented on SPARK-26646: -- Looks like still happening on master branch (3.1.0-SNAPSHOT) https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121232 > Flaky test: pyspark.mllib.tests.test_streaming_algorithms > StreamingLogisticRegressionWithSGDTests.test_training_and_prediction > -- > > Key: SPARK-26646 > URL: https://issues.apache.org/jira/browse/SPARK-26646 > Project: Spark > Issue Type: Test > Components: MLlib, PySpark >Affects Versions: 3.0.0 >Reporter: Hyukjin Kwon >Assignee: L. C. Hsieh >Priority: Major > Fix For: 3.0.0 > > > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/101356/console > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/101358/console > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/101254/console > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/100941/console > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/100327/console > {code} > == > FAIL: test_training_and_prediction > (pyspark.mllib.tests.test_streaming_algorithms.StreamingLogisticRegressionWithSGDTests) > Test that the model improves on toy data with no. of batches > -- > Traceback (most recent call last): > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 367, in test_training_and_prediction > self._eventually(condition, timeout=60.0) > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 69, in _eventually > lastValue = condition() > File > "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 362, in condition > self.assertGreater(errors[1] - errors[-1], 0.3) > AssertionError: -0.070062 not greater than 0.3 > -- > Ran 13 tests in 198.327s > FAILED (failures=1, skipped=1) > Had test failures in pyspark.mllib.tests.test_streaming_algorithms with > python3.4; see logs. > {code} > It apparently became less flaky after increasing the time at SPARK-26275 but > looks now it became flacky due to unexpected results. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29222) Flaky test: pyspark.mllib.tests.test_streaming_algorithms.StreamingLinearRegressionWithTests.test_parameter_convergence
[ https://issues.apache.org/jira/browse/SPARK-29222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083857#comment-17083857 ] Jungtaek Lim commented on SPARK-29222: -- Still happening on master (3.1.0-SNAPSHOT) https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121232 > Flaky test: > pyspark.mllib.tests.test_streaming_algorithms.StreamingLinearRegressionWithTests.test_parameter_convergence > --- > > Key: SPARK-29222 > URL: https://issues.apache.org/jira/browse/SPARK-29222 > Project: Spark > Issue Type: Test > Components: MLlib, Tests >Affects Versions: 3.0.0 >Reporter: Jungtaek Lim >Priority: Minor > > [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/111237/testReport/] > {code:java} > Error Message > 7 != 10 > StacktraceTraceback (most recent call last): > File > "/home/jenkins/workspace/SparkPullRequestBuilder@2/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 429, in test_parameter_convergence > self._eventually(condition, catch_assertions=True) > File > "/home/jenkins/workspace/SparkPullRequestBuilder@2/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 74, in _eventually > raise lastValue > File > "/home/jenkins/workspace/SparkPullRequestBuilder@2/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 65, in _eventually > lastValue = condition() > File > "/home/jenkins/workspace/SparkPullRequestBuilder@2/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 425, in condition > self.assertEqual(len(model_weights), len(batches)) > AssertionError: 7 != 10 >{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-29137) Flaky test: pyspark.mllib.tests.test_streaming_algorithms.StreamingLinearRegressionWithTests.test_train_prediction
[ https://issues.apache.org/jira/browse/SPARK-29137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083855#comment-17083855 ] Jungtaek Lim edited comment on SPARK-29137 at 4/15/20, 6:58 AM: Still valid on latest master (3.1.0-SNAPSHOT). https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121229 https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121231 https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121232 was (Author: kabhwan): Still valid on latest master (3.1.0-SNAPSHOT). https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121229 > Flaky test: > pyspark.mllib.tests.test_streaming_algorithms.StreamingLinearRegressionWithTests.test_train_prediction > -- > > Key: SPARK-29137 > URL: https://issues.apache.org/jira/browse/SPARK-29137 > Project: Spark > Issue Type: Bug > Components: MLlib, Tests >Affects Versions: 3.0.0 >Reporter: Jungtaek Lim >Priority: Major > > [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/110686/testReport/] > {code:java} > Traceback (most recent call last): > File > "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 503, in test_train_prediction > self._eventually(condition) > File > "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 69, in _eventually > lastValue = condition() > File > "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 498, in condition > self.assertGreater(errors[1] - errors[-1], 2) > AssertionError: 1.672640157855923 not greater than 2 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29137) Flaky test: pyspark.mllib.tests.test_streaming_algorithms.StreamingLinearRegressionWithTests.test_train_prediction
[ https://issues.apache.org/jira/browse/SPARK-29137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083855#comment-17083855 ] Jungtaek Lim commented on SPARK-29137: -- Still valid on latest master (3.1.0-SNAPSHOT). https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/121229 > Flaky test: > pyspark.mllib.tests.test_streaming_algorithms.StreamingLinearRegressionWithTests.test_train_prediction > -- > > Key: SPARK-29137 > URL: https://issues.apache.org/jira/browse/SPARK-29137 > Project: Spark > Issue Type: Bug > Components: MLlib, Tests >Affects Versions: 3.0.0 >Reporter: Jungtaek Lim >Priority: Major > > [https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/110686/testReport/] > {code:java} > Traceback (most recent call last): > File > "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 503, in test_train_prediction > self._eventually(condition) > File > "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 69, in _eventually > lastValue = condition() > File > "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests/test_streaming_algorithms.py", > line 498, in condition > self.assertGreater(errors[1] - errors[-1], 2) > AssertionError: 1.672640157855923 not greater than 2 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-31449) Is there a difference between JDK and Spark's time zone offset calculation
Maxim Gekk created SPARK-31449: -- Summary: Is there a difference between JDK and Spark's time zone offset calculation Key: SPARK-31449 URL: https://issues.apache.org/jira/browse/SPARK-31449 Project: Spark Issue Type: Question Components: SQL Affects Versions: 2.4.5 Reporter: Maxim Gekk Spark 2.4 calculates time zone offsets from wall clock timestamp using `DateTimeUtils.getOffsetFromLocalMillis()` (see https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala#L1088-L1118): {code:scala} private[sql] def getOffsetFromLocalMillis(millisLocal: Long, tz: TimeZone): Long = { var guess = tz.getRawOffset // the actual offset should be calculated based on milliseconds in UTC val offset = tz.getOffset(millisLocal - guess) if (offset != guess) { guess = tz.getOffset(millisLocal - offset) if (guess != offset) { // fallback to do the reverse lookup using java.sql.Timestamp // this should only happen near the start or end of DST val days = Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt val year = getYear(days) val month = getMonth(days) val day = getDayOfMonth(days) var millisOfDay = (millisLocal % MILLIS_PER_DAY).toInt if (millisOfDay < 0) { millisOfDay += MILLIS_PER_DAY.toInt } val seconds = (millisOfDay / 1000L).toInt val hh = seconds / 3600 val mm = seconds / 60 % 60 val ss = seconds % 60 val ms = millisOfDay % 1000 val calendar = Calendar.getInstance(tz) calendar.set(year, month - 1, day, hh, mm, ss) calendar.set(Calendar.MILLISECOND, ms) guess = (millisLocal - calendar.getTimeInMillis()).toInt } } guess } {code} Meanwhile, JDK's GregorianCalendar uses special methods of ZoneInfo, see https://github.com/AdoptOpenJDK/openjdk-jdk8u/blob/aa318070b27849f1fe00d14684b2a40f7b29bf79/jdk/src/share/classes/java/util/GregorianCalendar.java#L2795-L2801: {code:java} if (zone instanceof ZoneInfo) { ((ZoneInfo)zone).getOffsetsByWall(millis, zoneOffsets); } else { int gmtOffset = isFieldSet(fieldMask, ZONE_OFFSET) ? internalGet(ZONE_OFFSET) : zone.getRawOffset(); zone.getOffsets(millis - gmtOffset, zoneOffsets); } {code} Need to investigate are there any differences in results between 2 approaches. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26385) YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in cache
[ https://issues.apache.org/jira/browse/SPARK-26385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083849#comment-17083849 ] Jungtaek Lim commented on SPARK-26385: -- Probably you may need to share the entire log messages logged under "org.apache.spark.deploy.yarn.security.AMCredentialRenewer" (Note: it runs on AM) so that we can confirm whether the renewing is scheduled and executed properly or not. It would be also helpful if you can make clear which mode you use - yarn-client vs yarn-cluster. > YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in > cache > --- > > Key: SPARK-26385 > URL: https://issues.apache.org/jira/browse/SPARK-26385 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 > Environment: Hadoop 2.6.0, Spark 2.4.0 >Reporter: T M >Priority: Major > > > Hello, > > I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, > Spark 2.4.0). After 25-26 hours, my job stops working with following error: > {code:java} > 2018-12-16 22:35:17 ERROR > org.apache.spark.internal.Logging$class.logError(Logging.scala:91): Query > TestQuery[id = a61ce197-1d1b-4e82-a7af-60162953488b, runId = > a56878cf-dfc7-4f6a-ad48-02cf738ccc2f] terminated with error > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): > token (token for REMOVED: HDFS_DELEGATION_TOKEN owner=REMOVED, renewer=yarn, > realUser=, issueDate=1544903057122, maxDate=1545507857122, > sequenceNumber=10314, masterKeyId=344) can't be found in cache at > org.apache.hadoop.ipc.Client.call(Client.java:1470) at > org.apache.hadoop.ipc.Client.call(Client.java:1401) at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) > at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at > org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977) at > org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:133) at > org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1120) at > org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1116) at > org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at > org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1116) at > org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1581) at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:544) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:542) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at >
[jira] [Created] (SPARK-31448) Difference in Storage Levels used in cache() and persist() for pyspark dataframes
Abhishek Dixit created SPARK-31448: -- Summary: Difference in Storage Levels used in cache() and persist() for pyspark dataframes Key: SPARK-31448 URL: https://issues.apache.org/jira/browse/SPARK-31448 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.4.3 Reporter: Abhishek Dixit There is a difference in default storage level *MEMORY_AND_DISK* in pyspark and scala. *Scala*: StorageLevel(true, true, false, true) *Pyspark:* StorageLevel(True, True, False, False) *Problem Description:* Calling *df.cache()* for pyspark dataframe directly invokes Scala method cache() and Storage Level used is StorageLevel(true, true, false, true). But calling *df.persist()* for pyspark dataframe sets the newStorageLevel=StorageLevel(true, true, false, false) inside pyspark and then invokes Scala function persist(newStorageLevel). *Possible Fix:* Invoke pyspark function persist inside pyspark function cache instead of calling the scala function directly. I can raise a PR for this fix if someone can confirm that this is a bug and the possible fix is the correct approach. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-31443) Perf regression of toJavaDate
[ https://issues.apache.org/jira/browse/SPARK-31443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-31443. - Fix Version/s: 3.0.0 Resolution: Fixed Issue resolved by pull request 28212 [https://github.com/apache/spark/pull/28212] > Perf regression of toJavaDate > - > > Key: SPARK-31443 > URL: https://issues.apache.org/jira/browse/SPARK-31443 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Major > Fix For: 3.0.0 > > > DateTimeBenchmark shows the regression > Spark 2.4.6-SNAPSHOT at the PR [https://github.com/MaxGekk/spark/pull/27] > {code:java} > OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux > 4.15.0-1063-aws > Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz > To/from Java's date-time: Best Time(ms) Avg Time(ms) > Stdev(ms)Rate(M/s) Per Row(ns) Relative > > From java.sql.Date 559603 > 38 8.9 111.8 1.0X > Collect dates 2306 3221 > 1558 2.2 461.1 0.2X > {code} > Current master: > {code:java} > OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux > 4.15.0-1063-aws > Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz > To/from Java's date-time: Best Time(ms) Avg Time(ms) > Stdev(ms)Rate(M/s) Per Row(ns) Relative > > From java.sql.Date 1052 1130 > 73 4.8 210.3 1.0X > Collect dates 3251 4943 > 1624 1.5 650.2 0.3X > {code} > If we subtract preparing DATE column: > * Spark 2.4.6-SNAPSHOT is (461.1 - 111.8) = 349.3 ns/row > * master is (650.2 - 210.3) = 439 ns/row > The regression of toJavaDate in master against Spark 2.4.6-SNAPSHOT is (439 - > 349.3)/349.3 = 25% -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-31443) Perf regression of toJavaDate
[ https://issues.apache.org/jira/browse/SPARK-31443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-31443: --- Assignee: Maxim Gekk > Perf regression of toJavaDate > - > > Key: SPARK-31443 > URL: https://issues.apache.org/jira/browse/SPARK-31443 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.0.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Major > > DateTimeBenchmark shows the regression > Spark 2.4.6-SNAPSHOT at the PR [https://github.com/MaxGekk/spark/pull/27] > {code:java} > OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux > 4.15.0-1063-aws > Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz > To/from Java's date-time: Best Time(ms) Avg Time(ms) > Stdev(ms)Rate(M/s) Per Row(ns) Relative > > From java.sql.Date 559603 > 38 8.9 111.8 1.0X > Collect dates 2306 3221 > 1558 2.2 461.1 0.2X > {code} > Current master: > {code:java} > OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux > 4.15.0-1063-aws > Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz > To/from Java's date-time: Best Time(ms) Avg Time(ms) > Stdev(ms)Rate(M/s) Per Row(ns) Relative > > From java.sql.Date 1052 1130 > 73 4.8 210.3 1.0X > Collect dates 3251 4943 > 1624 1.5 650.2 0.3X > {code} > If we subtract preparing DATE column: > * Spark 2.4.6-SNAPSHOT is (461.1 - 111.8) = 349.3 ns/row > * master is (650.2 - 210.3) = 439 ns/row > The regression of toJavaDate in master against Spark 2.4.6-SNAPSHOT is (439 - > 349.3)/349.3 = 25% -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31423) DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC
[ https://issues.apache.org/jira/browse/SPARK-31423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083828#comment-17083828 ] Wenchen Fan commented on SPARK-31423: - We probably have a bug about picking the next valid date, but the behavior itself is expected: Spark always pick the next valid date {code} scala> sql("select date'1990-9-31'").show +-+ |DATE '1990-10-01'| +-+ | 1990-10-01| +-+ {code} It's a bit weird that this date is valid in Spark but invalid in ORC because the calendar is different. I'm OK to fail for this case, with a config to allow users to write it anyway by picking the next valid date. > DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC > -- > > Key: SPARK-31423 > URL: https://issues.apache.org/jira/browse/SPARK-31423 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Bruce Robbins >Priority: Major > > There is a range of days (1582-10-05 to 1582-10-14) for which DATEs and > TIMESTAMPS are changed when stored in ORC. The value is off by 10 days. > For example: > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.show // seems fine > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_date") > scala> spark.read.orc("/tmp/funny_orc_date").show // off by 10 days > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > ORC has the same issue with TIMESTAMPS: > {noformat} > scala> val df = sql("select cast('1582-10-14 00:00:00' as TIMESTAMP) ts") > df: org.apache.spark.sql.DataFrame = [ts: timestamp] > scala> df.show // seems fine > +---+ > | ts| > +---+ > |1582-10-14 00:00:00| > +---+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_timestamp") > scala> spark.read.orc("/tmp/funny_orc_timestamp").show(truncate=false) // off > by 10 days > +---+ > |ts | > +---+ > |1582-10-24 00:00:00| > +---+ > scala> > {noformat} > However, when written to Parquet or Avro, DATES and TIMESTAMPs for this range > do not change. > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").parquet("/tmp/funny_parquet_date") > scala> spark.read.parquet("/tmp/funny_parquet_date").show // reflects > original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").format("avro").save("/tmp/funny_avro_date") > scala> spark.read.format("avro").load("/tmp/funny_avro_date").show // > reflects original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> > {noformat} > It's unclear to me whether ORC is behaving correctly or not, as this is how > Spark 2.4 works with DATEs and TIMESTAMPs in general (and also how Spark 3.x > works with DATEs and TIMESTAMPs in general when > {{spark.sql.legacy.timeParserPolicy}} is set to {{LEGACY}}). In Spark 2.4, > DATEs and TIMESTAMPs in this range don't exist: > {noformat} > scala> sql("select cast('1582-10-14' as DATE) dt").show // the same cast done > in Spark 2.4 > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > I assume the following snippet is relevant (from the Wikipedia entry on the > Gregorian calendar): > {quote}To deal with the 10 days' difference (between calendar and > reality)[Note 2] that this drift had already reached, the date was advanced > so that 4 October 1582 was followed by 15 October 1582 > {quote} > Spark 3.x should treat DATEs and TIMESTAMPS in this range consistently, and > probably based on spark.sql.legacy.timeParserPolicy (or some other config) > rather than file format. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31423) DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC
[ https://issues.apache.org/jira/browse/SPARK-31423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17083822#comment-17083822 ] Wenchen Fan commented on SPARK-31423: - The ORC file format spec doesn't specify the calendar, but the ORC library (reader and writer) uses java 7 Date/Timestamp which indicates the hybrid Julian calendar. > DATES and TIMESTAMPS for a certain range are off by 10 days when stored in ORC > -- > > Key: SPARK-31423 > URL: https://issues.apache.org/jira/browse/SPARK-31423 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0, 3.1.0 >Reporter: Bruce Robbins >Priority: Major > > There is a range of days (1582-10-05 to 1582-10-14) for which DATEs and > TIMESTAMPS are changed when stored in ORC. The value is off by 10 days. > For example: > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.show // seems fine > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_date") > scala> spark.read.orc("/tmp/funny_orc_date").show // off by 10 days > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > ORC has the same issue with TIMESTAMPS: > {noformat} > scala> val df = sql("select cast('1582-10-14 00:00:00' as TIMESTAMP) ts") > df: org.apache.spark.sql.DataFrame = [ts: timestamp] > scala> df.show // seems fine > +---+ > | ts| > +---+ > |1582-10-14 00:00:00| > +---+ > scala> df.write.mode("overwrite").orc("/tmp/funny_orc_timestamp") > scala> spark.read.orc("/tmp/funny_orc_timestamp").show(truncate=false) // off > by 10 days > +---+ > |ts | > +---+ > |1582-10-24 00:00:00| > +---+ > scala> > {noformat} > However, when written to Parquet or Avro, DATES and TIMESTAMPs for this range > do not change. > {noformat} > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").parquet("/tmp/funny_parquet_date") > scala> spark.read.parquet("/tmp/funny_parquet_date").show // reflects > original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> val df = sql("select cast('1582-10-14' as DATE) dt") > df: org.apache.spark.sql.DataFrame = [dt: date] > scala> df.write.mode("overwrite").format("avro").save("/tmp/funny_avro_date") > scala> spark.read.format("avro").load("/tmp/funny_avro_date").show // > reflects original value > +--+ > |dt| > +--+ > |1582-10-14| > +--+ > scala> > {noformat} > It's unclear to me whether ORC is behaving correctly or not, as this is how > Spark 2.4 works with DATEs and TIMESTAMPs in general (and also how Spark 3.x > works with DATEs and TIMESTAMPs in general when > {{spark.sql.legacy.timeParserPolicy}} is set to {{LEGACY}}). In Spark 2.4, > DATEs and TIMESTAMPs in this range don't exist: > {noformat} > scala> sql("select cast('1582-10-14' as DATE) dt").show // the same cast done > in Spark 2.4 > +--+ > |dt| > +--+ > |1582-10-24| > +--+ > scala> > {noformat} > I assume the following snippet is relevant (from the Wikipedia entry on the > Gregorian calendar): > {quote}To deal with the 10 days' difference (between calendar and > reality)[Note 2] that this drift had already reached, the date was advanced > so that 4 October 1582 was followed by 15 October 1582 > {quote} > Spark 3.x should treat DATEs and TIMESTAMPS in this range consistently, and > probably based on spark.sql.legacy.timeParserPolicy (or some other config) > rather than file format. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org