[jira] [Created] (SPARK-26391) Spark Streaming Kafka with Offset Gaps
Rishabh created SPARK-26391: --- Summary: Spark Streaming Kafka with Offset Gaps Key: SPARK-26391 URL: https://issues.apache.org/jira/browse/SPARK-26391 Project: Spark Issue Type: Question Components: Spark Core, Structured Streaming Affects Versions: 2.4.0 Reporter: Rishabh I have an app that uses Kafka Streaming to pull data from `input` topic and push to `output` topic with `processing.guarantee=exactly_once`. Due to `exactly_once` gaps (transaction markers) are created in Kafka. Let's call this app `kafka-streamer`. Now I've another app that listens to this output topic (actually they are multiple topics with a Pattern/Regex) and processes the data using [https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html]. Let's call this app `spark-streamer`. Due to the gaps, the first thing that happens is spark streaming fails. To fix this I enabled `spark.streaming.kafka.allowNonConsecutiveOffsets=true` in the spark config before creating the StreamingContext. Now let's look at the issues that were faced when I start `spark-streamer`: # Even though there are new offsets to be polled/consumed, it requires another message push to the topic partition to be able to start processing. If I start the app (and there are messages in queue to be polled) and don't push any topic, the code will timeout after default 120ms and throw an exception. # It doesn't fetch the last record. It fetches the record till second-last. This means to poll/process the last record, another message has to be pushed. This is a problem for us since `spark-streamer` is listening to multiple topics (based on a pattern) and there might be a topic where throughput is low but the data should still make it to Spark for processing. # In general if no data/message is pushed then it'll die after 120ms default timeout for polling. Now in the limited amount of time I had, I tried going through the spark-streaming-kafka code and was only able to find an answer to the third problem which is this - [https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L178] My questions are: # Why do we throw an exception in `compactedNext()` if no data is polled ? # I wasn't able to figure out why the first and second issue happened, would be great if somebody can point out a solution or reason behind the behaviour ? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26385) YARN - Spark Stateful Structured streaming HDFS_DELEGATION_TOKEN not found in cache
[ https://issues.apache.org/jira/browse/SPARK-26385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] T M updated SPARK-26385: Description: Hello, I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, Spark 2.4.0). After 25-26 hours, my job stops working with following error: {code:java} 2018-12-16 22:35:17 ERROR org.apache.spark.internal.Logging$class.logError(Logging.scala:91): Query TestQuery[id = a61ce197-1d1b-4e82-a7af-60162953488b, runId = a56878cf-dfc7-4f6a-ad48-02cf738ccc2f] terminated with error org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (token for REMOVED: HDFS_DELEGATION_TOKEN owner=REMOVED, renewer=yarn, realUser=, issueDate=1544903057122, maxDate=1545507857122, sequenceNumber=10314, masterKeyId=344) can't be found in cache at org.apache.hadoop.ipc.Client.call(Client.java:1470) at org.apache.hadoop.ipc.Client.call(Client.java:1401) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy9.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:752) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy10.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1977) at org.apache.hadoop.fs.Hdfs.getFileStatus(Hdfs.java:133) at org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1120) at org.apache.hadoop.fs.FileContext$14.next(FileContext.java:1116) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.getFileStatus(FileContext.java:1116) at org.apache.hadoop.fs.FileContext$Util.exists(FileContext.java:1581) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.exists(CheckpointFileManager.scala:326) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:142) at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:544) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:542) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189){code} ^It is important to notice that I tried usual fix for this kind of problems:^ {code:java} --conf "spark.hadoop.fs.hdfs.impl.disable.cache=true" {code} was: Hello, I have Spark Structured Streaming job which is runnning on YARN(Hadoop 2.6.0, Spark 2.4.0). After 25-26 hours, my j
[jira] [Created] (SPARK-26392) Cancel pending allocate requests by taking locality preference into account
wuyi created SPARK-26392: Summary: Cancel pending allocate requests by taking locality preference into account Key: SPARK-26392 URL: https://issues.apache.org/jira/browse/SPARK-26392 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 2.4.0 Reporter: wuyi Fix For: 2.4.1 Right now, we cancel pending allocate requests by its sending order. I thing we can take locality preference into account when do this to perfom least impact on task locality preference. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26392) Cancel pending allocate requests by taking locality preference into account
[ https://issues.apache.org/jira/browse/SPARK-26392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723855#comment-16723855 ] ASF GitHub Bot commented on SPARK-26392: Ngone51 opened a new pull request #23344: [SPARK-26392][YARN] Cancel pending allocate requests by taking locality preference into a… URL: https://github.com/apache/spark/pull/23344 …ccount ## What changes were proposed in this pull request? Right now, we cancel pending allocate requests by its sending order. I thing we can take locality preference into account when do this to perfom least impact on task locality preference. ## How was this patch tested? N.A. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cancel pending allocate requests by taking locality preference into account > --- > > Key: SPARK-26392 > URL: https://issues.apache.org/jira/browse/SPARK-26392 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 2.4.0 >Reporter: wuyi >Priority: Minor > Labels: patch > Fix For: 2.4.1 > > > Right now, we cancel pending allocate requests by its sending order. I thing > we can take > locality preference into account when do this to perfom least impact on task > locality preference. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26392) Cancel pending allocate requests by taking locality preference into account
[ https://issues.apache.org/jira/browse/SPARK-26392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26392: Assignee: Apache Spark > Cancel pending allocate requests by taking locality preference into account > --- > > Key: SPARK-26392 > URL: https://issues.apache.org/jira/browse/SPARK-26392 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 2.4.0 >Reporter: wuyi >Assignee: Apache Spark >Priority: Minor > Labels: patch > Fix For: 2.4.1 > > > Right now, we cancel pending allocate requests by its sending order. I thing > we can take > locality preference into account when do this to perfom least impact on task > locality preference. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26392) Cancel pending allocate requests by taking locality preference into account
[ https://issues.apache.org/jira/browse/SPARK-26392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26392: Assignee: (was: Apache Spark) > Cancel pending allocate requests by taking locality preference into account > --- > > Key: SPARK-26392 > URL: https://issues.apache.org/jira/browse/SPARK-26392 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 2.4.0 >Reporter: wuyi >Priority: Minor > Labels: patch > Fix For: 2.4.1 > > > Right now, we cancel pending allocate requests by its sending order. I thing > we can take > locality preference into account when do this to perfom least impact on task > locality preference. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
[ https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723886#comment-16723886 ] ASF GitHub Bot commented on SPARK-26384: MaxGekk opened a new pull request #23345: [SPARK-26384][SQL] Propagate SQL configs for CSV schema inferring URL: https://github.com/apache/spark/pull/23345 ## What changes were proposed in this pull request? Currently, SQL configs are not propagated to executors while schema inferring in CSV datasource. For example, changing of `spark.sql.legacy.timeParser.enabled` does not impact on inferring timestamp types. In the PR, I propose to fix the issue by wrapping schema inferring action using `SQLExecution.withSQLConfPropagated`. ## How was this patch tested? Added logging to `TimestampFormatter`: ```patch -object TimestampFormatter { +object TimestampFormatter extends Logging { def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { + logError("LegacyFallbackTimestampFormatter is being used") new LegacyFallbackTimestampFormatter(format, timeZone, locale) } else { + logError("Iso8601TimestampFormatter is being used") new Iso8601TimestampFormatter(format, timeZone, locale) } } ``` and run the command in `spark-shell`: ```shell $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true ``` ```scala scala> Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") scala> spark.read.option("inferSchema", "true").option("header", "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema() 18/12/18 10:47:27 ERROR TimestampFormatter: LegacyFallbackTimestampFormatter is being used root |-- _c0: timestamp (nullable = true) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled > - > > Key: SPARK-26384 > URL: https://issues.apache.org/jira/browse/SPARK-26384 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Maxim Gekk >Priority: Major > > Starting from the commit > [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2] > , add logging like in the comment > https://github.com/apache/spark/pull/23150#discussion_r242021998 and run: > {code:shell} > $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true > {code} > and in the shell: > {code:scala} > scala> spark.conf.get("spark.sql.legacy.timeParser.enabled") > res0: String = true > scala> > Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") > scala> spark.read.option("inferSchema", "true").option("header", > "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema() > 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is > being used > root > |-- _c0: timestamp (nullable = true) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
[ https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26384: Assignee: (was: Apache Spark) > CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled > - > > Key: SPARK-26384 > URL: https://issues.apache.org/jira/browse/SPARK-26384 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Maxim Gekk >Priority: Major > > Starting from the commit > [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2] > , add logging like in the comment > https://github.com/apache/spark/pull/23150#discussion_r242021998 and run: > {code:shell} > $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true > {code} > and in the shell: > {code:scala} > scala> spark.conf.get("spark.sql.legacy.timeParser.enabled") > res0: String = true > scala> > Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") > scala> spark.read.option("inferSchema", "true").option("header", > "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema() > 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is > being used > root > |-- _c0: timestamp (nullable = true) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
[ https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26384: Assignee: Apache Spark > CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled > - > > Key: SPARK-26384 > URL: https://issues.apache.org/jira/browse/SPARK-26384 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Maxim Gekk >Assignee: Apache Spark >Priority: Major > > Starting from the commit > [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2] > , add logging like in the comment > https://github.com/apache/spark/pull/23150#discussion_r242021998 and run: > {code:shell} > $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true > {code} > and in the shell: > {code:scala} > scala> spark.conf.get("spark.sql.legacy.timeParser.enabled") > res0: String = true > scala> > Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") > scala> spark.read.option("inferSchema", "true").option("header", > "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema() > 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is > being used > root > |-- _c0: timestamp (nullable = true) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] pavan updated SPARK-26377: -- Description: Hi, I am using sparkkafkaDirectStream with subscriberPattern with initial offsets for topics and a pattern. On running the SparkJob on the job server i am getting the following exception.The job is terminated. Kafka Params: "bootstrap.servers" -> credentials.getBrokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[ByteArrayDeserializer], "enable.auto.commit" -> (false: java.lang.Boolean) "group.id" -> "abc" API: KafkaUtils.createDirectStream(streamingContext, PreferConsistent, SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), perPartitionConfig) Error Log: { "duration": "33.523 secs", "classPath": "com.appiot.dataingestion.DataIngestionJob", "startTime": "2018-12-15T18:28:08.207Z", "context": "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd", "result": { "message": "java.lang.IllegalStateException: No current assignment for partition com-cibigdata2.v1.iot.raw_timeseries-0", "errorClass": "java.lang.RuntimeException", "stack": "java.lang.RuntimeException: java.lang.IllegalStateException: No current assignment for partition com-cibigdata2.v1.iot.raw_timeseries-0\n\tat org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()\n\tat org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat com.sap.appiot.dataingestion.DataIngestionJob$.runJob(DataIngestionJob.scala:182)\n\tat com.sap.appiot.dataingestion.DataIngestionJob$.runJob(DataIngestionJob.scala:24)\n\tat spark.jobserver.SparkJobBase$class.runJob(SparkJob.scala:31)\n\tat com.appiot.dataingestion.DataIngestionJob$.runJob(DataIngestionJob.scala:24)\n\tat com.appiot.dataingestion.DataIngestionJob$.runJob(DataIngestionJob.scala:24)\n\tat spark.jobserver.JobManagerActor$$anonfun$getJobFuture$4.apply(JobManagerActor.scala:594)\n\tat scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)\n\tat scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)\n\tat java.util.concurrent.Th
[jira] [Commented] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723908#comment-16723908 ] pavan commented on SPARK-26377: --- Hi Hyun, I dint tried 2.4.0. I am raising this as a bug because as i am continuously facing the issue. Will try it. Thanks, Pavan > java.lang.IllegalStateException: No current assignment for partition > > > Key: SPARK-26377 > URL: https://issues.apache.org/jira/browse/SPARK-26377 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.1 >Reporter: pavan >Priority: Critical > > Hi, > I am using sparkkafkaDirectStream with subscriberPattern with initial > offsets for topics and a pattern. On running the SparkJob on the job server > i am getting the following exception.The job is terminated. > Kafka Params: > "bootstrap.servers" -> credentials.getBrokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[ByteArrayDeserializer], > "enable.auto.commit" -> (false: java.lang.Boolean) > "group.id" -> "abc" > API: > KafkaUtils.createDirectStream(streamingContext, PreferConsistent, > SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), > perPartitionConfig) > > Error Log: > { "duration": "33.523 secs", "classPath": > "com.appiot.dataingestion.DataIngestionJob", "startTime": > "2018-12-15T18:28:08.207Z", "context": > "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd", > "result": > { "message": "java.lang.IllegalStateException: No current assignment for > partition com-cibigdata2.v1.iot.raw_timeseries-0", "errorClass": > "java.lang.RuntimeException", "stack": "java.lang.RuntimeException: > java.lang.IllegalStateException: No current assignment for partition > com-cibigdata2.v1.iot.raw_timeseries-0\n\tat > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat > scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat > scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat > scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat > org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat > > scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat > scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat > > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat > ... run in separate thread using org.apache.spark.util.ThreadUtils ... > ()\n\tat > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat > > org.apache.spark.streaming.Stream
[jira] [Created] (SPARK-26393) Different behaviors of date_add when calling it inside expr
Ahmed Kamal` created SPARK-26393: Summary: Different behaviors of date_add when calling it inside expr Key: SPARK-26393 URL: https://issues.apache.org/jira/browse/SPARK-26393 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.3.2 Reporter: Ahmed Kamal` When Calling date_add from pyspark.sql.functions directly without using expr, like this : {code:java} df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), F.col('days'))).toPandas(){code} It will raise Error : `TypeError: Column is not iterable` because it only taking a number not a column but when i try to use it inside an expr, like this : {code:java} df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), days)")).toPandas() {code} it will work fine. Shouldn't it behave the same way ? and i thin its logical to accept a column here as well. A python Notebook to demonstrate : https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26393) Different behaviors of date_add when calling it inside expr
[ https://issues.apache.org/jira/browse/SPARK-26393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Kamal` updated SPARK-26393: - Description: When Calling date_add from pyspark.sql.functions directly without using expr, like this : {code:java} df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), F.col('days'))).toPandas(){code} It will raise Error : `TypeError: Column is not iterable` because it only taking a number not a column but when i try to use it inside an expr, like this : {code:java} df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), days)")).toPandas(){code} It will work fine. Shouldn't it behave the same way ? and i think its logical to accept a column here as well. A python Notebook to demonstrate : [https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb] was: When Calling date_add from pyspark.sql.functions directly without using expr, like this : {code:java} df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), F.col('days'))).toPandas(){code} It will raise Error : `TypeError: Column is not iterable` because it only taking a number not a column but when i try to use it inside an expr, like this : {code:java} df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), days)")).toPandas() {code} it will work fine. Shouldn't it behave the same way ? and i thin its logical to accept a column here as well. A python Notebook to demonstrate : https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb > Different behaviors of date_add when calling it inside expr > --- > > Key: SPARK-26393 > URL: https://issues.apache.org/jira/browse/SPARK-26393 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Ahmed Kamal` >Priority: Minor > > When Calling date_add from pyspark.sql.functions directly without using expr, > like this : > {code:java} > df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), > F.col('days'))).toPandas(){code} > It will raise Error : `TypeError: Column is not iterable` > because it only taking a number not a column > but when i try to use it inside an expr, like this : > {code:java} > df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), > days)")).toPandas(){code} > It will work fine. > Shouldn't it behave the same way ? > and i think its logical to accept a column here as well. > A python Notebook to demonstrate : > [https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19228) inferSchema function processed csv date column as string and "dateFormat" DataSource option is ignored
[ https://issues.apache.org/jira/browse/SPARK-19228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-19228: - Labels: (was: easyfix) > inferSchema function processed csv date column as string and "dateFormat" > DataSource option is ignored > -- > > Key: SPARK-19228 > URL: https://issues.apache.org/jira/browse/SPARK-19228 > Project: Spark > Issue Type: Bug > Components: Input/Output, SQL >Affects Versions: 2.1.0 >Reporter: Sergey Rubtsov >Priority: Major > Original Estimate: 6h > Remaining Estimate: 6h > > Current FastDateFormat parser can't properly parse date and timestamp and > does not meet the ISO8601. > For example, I need to process user.csv like this: > {code:java} > id,project,started,ended > sergey.rubtsov,project0,12/12/2012,10/10/2015 > {code} > When I add date format options: > {code:java} > Dataset users = spark.read().format("csv").option("mode", > "PERMISSIVE").option("header", "true") > .option("inferSchema", > "true").option("dateFormat", > "dd/MM/").load("src/main/resources/user.csv"); > users.printSchema(); > {code} > expected scheme should be > {code:java} > root > |-- id: string (nullable = true) > |-- project: string (nullable = true) > |-- started: date (nullable = true) > |-- ended: date (nullable = true) > {code} > but the actual result is: > {code:java} > root > |-- id: string (nullable = true) > |-- project: string (nullable = true) > |-- started: string (nullable = true) > |-- ended: string (nullable = true) > {code} > This mean that date processed as string and "dateFormat" option is ignored. > If I add option > {code:java} > .option("timestampFormat", "dd/MM/") > {code} > result is: > {code:java} > root > |-- id: string (nullable = true) > |-- project: string (nullable = true) > |-- started: timestamp (nullable = true) > |-- ended: timestamp (nullable = true) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19228) inferSchema function processed csv date column as string and "dateFormat" DataSource option is ignored
[ https://issues.apache.org/jira/browse/SPARK-19228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723960#comment-16723960 ] ASF GitHub Bot commented on SPARK-19228: HyukjinKwon closed pull request #21363: [SPARK-19228][SQL] Migrate on Java 8 time from FastDateFormat for meet the ISO8601 URL: https://github.com/apache/spark/pull/21363 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 80f15053005ff..9eaf6a2862a0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util import java.sql.{Date, Timestamp} import java.text.{DateFormat, SimpleDateFormat} +import java.time.LocalDateTime +import java.time.temporal.ChronoField import java.util.{Calendar, Locale, TimeZone} import java.util.concurrent.ConcurrentHashMap import java.util.function.{Function => JFunction} @@ -143,6 +145,12 @@ object DateTimeUtils { millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone) } + def dateTimeToMicroseconds(localDateTime: LocalDateTime, timeZone: TimeZone): Long = { +val microOfSecond = localDateTime.getLong(ChronoField.MICRO_OF_SECOND) +val epochSecond = localDateTime.atZone(timeZone.toZoneId).toInstant.getEpochSecond +epochSecond * 100L + microOfSecond + } + def dateToString(days: SQLDate): String = getThreadLocalDateFormat.format(toJavaDate(days)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index cbf6106697f30..cd1b7395b97d5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -19,11 +19,14 @@ package org.apache.spark.sql.catalyst.util import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter import java.util.{Calendar, Locale, TimeZone} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.unsafe.types.UTF8String +import org.junit.Assert.assertEquals class DateTimeUtilsSuite extends SparkFunSuite { @@ -645,6 +648,18 @@ class DateTimeUtilsSuite extends SparkFunSuite { } } + test("Java 8 LocalDateTime to microseconds") { +val nanos = "2015-05-09 00:10:23.999750987" +var formatter = DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss.S") +val localDateTimeInNanos = LocalDateTime.parse(nanos, formatter) +val timeInMicros = dateTimeToMicroseconds(localDateTimeInNanos, TimeZonePST) +assertEquals(1431155423999750L, timeInMicros) +val micros = "2015-05-09 00:10:23.999750" +formatter = DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss.SS") +val localDateTimeInMicros = LocalDateTime.parse(micros, formatter) +assertEquals(timeInMicros, dateTimeToMicroseconds(localDateTimeInMicros, TimeZonePST)) + } + test("daysToMillis and millisToDays") { val c = Calendar.getInstance(TimeZonePST) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index a585cbed2551b..6239f5666cd4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -90,6 +90,7 @@ private[csv] object CSVInferSchema { // DecimalTypes have different precisions and scales, so we try to find the common type. findTightestCommonType(typeSoFar, tryParseDecimal(field, options)).getOrElse(StringType) case DoubleType => tryParseDouble(field, options) +case DateType => tryParseDate(field, options) case TimestampType => tryParseTimestamp(field, options) case BooleanType => tryParseBoolean(field, options) case StringType => StringType @@ -140,14 +141,23 @@ private[csv] object CSVInferSchema { private def tryParseDouble(field: String, options: CSVOptions): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) { DoubleT
[jira] [Created] (SPARK-26394) Annotation error for Utils.timeStringAsMs
Jackey Lee created SPARK-26394: -- Summary: Annotation error for Utils.timeStringAsMs Key: SPARK-26394 URL: https://issues.apache.org/jira/browse/SPARK-26394 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.0 Reporter: Jackey Lee Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use." Thus, microseconds should be changed to milliseconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724011#comment-16724011 ] Jackey Lee commented on SPARK-24630: [~jackylk] Sorry for the late reply. I haven't considered about manipulating streaming job, which is currently running directly after starting until the end of application, similar to a Command. SQLStreaming can also be easily supported if methods are later able to manipulate and process current Command execution. Can you show me how to deal with it? Current, SQLStreaming support Table API, thus we can use table API to show/desc stream tables. > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26081) Do not write empty files by text datasources
[ https://issues.apache.org/jira/browse/SPARK-26081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724034#comment-16724034 ] ASF GitHub Bot commented on SPARK-26081: asfgit closed pull request #23341: [SPARK-26081][SQL][FOLLOW-UP] Use foreach instead of misuse of map (for Unit) URL: https://github.com/apache/spark/pull/23341 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index f7d8a9e1042d5..f4f139d180058 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -189,5 +189,5 @@ private[csv] class CsvOutputWriter( gen.write(row) } - override def close(): Unit = univocityGenerator.map(_.close()) + override def close(): Unit = univocityGenerator.foreach(_.close()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 3042133ee43aa..40f55e7068010 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -190,5 +190,5 @@ private[json] class JsonOutputWriter( gen.writeLineEnding() } - override def close(): Unit = jacksonGenerator.map(_.close()) + override def close(): Unit = jacksonGenerator.foreach(_.close()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index 01948ab25d63c..0607f7b3c0d4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -153,7 +153,7 @@ class TextOutputWriter( private var outputStream: Option[OutputStream] = None override def write(row: InternalRow): Unit = { -val os = outputStream.getOrElse{ +val os = outputStream.getOrElse { val newStream = CodecStreams.createOutputStream(context, new Path(path)) outputStream = Some(newStream) newStream @@ -167,6 +167,6 @@ class TextOutputWriter( } override def close(): Unit = { -outputStream.map(_.close()) +outputStream.foreach(_.close()) } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Do not write empty files by text datasources > > > Key: SPARK-26081 > URL: https://issues.apache.org/jira/browse/SPARK-26081 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Minor > Labels: release-notes > Fix For: 3.0.0 > > > Text based datasources like CSV, JSON and Text produces empty files for empty > partitions. This introduces additional overhead while opening and reading > such files back. In current implementation of OutputWriter, the output stream > are created eagerly even no records are written to the stream. So, creation > can be postponed up to the first write. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26394) Annotation error for Utils.timeStringAsMs
[ https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724040#comment-16724040 ] ASF GitHub Bot commented on SPARK-26394: stczwd opened a new pull request #23346: [SPARK-26394][core] Fix annotation error for Utils.timeStringAsMs URL: https://github.com/apache/spark/pull/23346 ## What changes were proposed in this pull request? Change microseconds to milliseconds in annotation of Utils.timeStringAsMs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Annotation error for Utils.timeStringAsMs > - > > Key: SPARK-26394 > URL: https://issues.apache.org/jira/browse/SPARK-26394 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jackey Lee >Priority: Minor > > Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it > says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds > for internal use." > Thus, microseconds should be changed to milliseconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26394) Annotation error for Utils.timeStringAsMs
[ https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26394: Assignee: (was: Apache Spark) > Annotation error for Utils.timeStringAsMs > - > > Key: SPARK-26394 > URL: https://issues.apache.org/jira/browse/SPARK-26394 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jackey Lee >Priority: Minor > > Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it > says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds > for internal use." > Thus, microseconds should be changed to milliseconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26394) Annotation error for Utils.timeStringAsMs
[ https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26394: Assignee: Apache Spark > Annotation error for Utils.timeStringAsMs > - > > Key: SPARK-26394 > URL: https://issues.apache.org/jira/browse/SPARK-26394 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Jackey Lee >Assignee: Apache Spark >Priority: Minor > > Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it > says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds > for internal use." > Thus, microseconds should be changed to milliseconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-24680) spark.executorEnv.JAVA_HOME does not take effect in Standalone mode
[ https://issues.apache.org/jira/browse/SPARK-24680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-24680. --- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 21663 [https://github.com/apache/spark/pull/21663] > spark.executorEnv.JAVA_HOME does not take effect in Standalone mode > --- > > Key: SPARK-24680 > URL: https://issues.apache.org/jira/browse/SPARK-24680 > Project: Spark > Issue Type: Bug > Components: Deploy >Affects Versions: 2.1.1, 2.2.1, 2.3.1 >Reporter: StanZhai >Assignee: StanZhai >Priority: Minor > Fix For: 3.0.0 > > > spark.executorEnv.JAVA_HOME does not take effect when a Worker starting an > Executor process in Standalone mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-24680) spark.executorEnv.JAVA_HOME does not take effect in Standalone mode
[ https://issues.apache.org/jira/browse/SPARK-24680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-24680: - Assignee: StanZhai > spark.executorEnv.JAVA_HOME does not take effect in Standalone mode > --- > > Key: SPARK-24680 > URL: https://issues.apache.org/jira/browse/SPARK-24680 > Project: Spark > Issue Type: Bug > Components: Deploy >Affects Versions: 2.1.1, 2.2.1, 2.3.1 >Reporter: StanZhai >Assignee: StanZhai >Priority: Minor > Fix For: 3.0.0 > > > spark.executorEnv.JAVA_HOME does not take effect when a Worker starting an > Executor process in Standalone mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-26377: -- Priority: Major (was: Critical) > java.lang.IllegalStateException: No current assignment for partition > > > Key: SPARK-26377 > URL: https://issues.apache.org/jira/browse/SPARK-26377 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.1 >Reporter: pavan >Priority: Major > > Hi, > I am using sparkkafkaDirectStream with subscriberPattern with initial > offsets for topics and a pattern. On running the SparkJob on the job server > i am getting the following exception.The job is terminated. > Kafka Params: > "bootstrap.servers" -> credentials.getBrokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[ByteArrayDeserializer], > "enable.auto.commit" -> (false: java.lang.Boolean) > "group.id" -> "abc" > API: > KafkaUtils.createDirectStream(streamingContext, PreferConsistent, > SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), > perPartitionConfig) > > Error Log: > { "duration": "33.523 secs", "classPath": > "com.appiot.dataingestion.DataIngestionJob", "startTime": > "2018-12-15T18:28:08.207Z", "context": > "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd", > "result": > { "message": "java.lang.IllegalStateException: No current assignment for > partition com-cibigdata2.v1.iot.raw_timeseries-0", "errorClass": > "java.lang.RuntimeException", "stack": "java.lang.RuntimeException: > java.lang.IllegalStateException: No current assignment for partition > com-cibigdata2.v1.iot.raw_timeseries-0\n\tat > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat > scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat > scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat > scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat > org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat > > scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat > scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat > > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat > ... run in separate thread using org.apache.spark.util.ThreadUtils ... > ()\n\tat > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat > > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat > > com.sap.appiot.dataingestion.DataIngestionJob$.runJob(DataIngestionJob.scala:182)\n\tat > > com.sap.a
[jira] [Updated] (SPARK-26381) Pickle Serialization Error Causing Crash
[ https://issues.apache.org/jira/browse/SPARK-26381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-26381: -- Priority: Major (was: Critical) What is failing to serialize here? > Pickle Serialization Error Causing Crash > > > Key: SPARK-26381 > URL: https://issues.apache.org/jira/browse/SPARK-26381 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.1, 2.4.0 > Environment: Tested on two environments: > * Spark 2.4.0 - single machine only > * Spark 2.3.1 - YARN installation with 5 nodes and files on HDFS > The error occurs in both environments. >Reporter: Ryan >Priority: Major > > There is a pickle serialization error when I try and use AllenNLP for doing > NER within a Spark worker - it is causing a crash. When running on just the > Spark driver or in a standalone program, everything works as expected. > > {code:java} > Caused by: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py", > line 217, in main > func, profiler, deserializer, serializer = read_command(pickleSer, infile) > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py", > line 61, in read_command > command = serializer.loads(command.value) > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/serializers.py", > line 559, in loads > return pickle.loads(obj, encoding=encoding) > TypeError: __init__() missing 3 required positional arguments: > 'non_padded_namespaces', 'padding_token', and 'oov_token' > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) > > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) > > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at > org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at > org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) > > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at > org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) > > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) > > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > ... 1 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20712) [SPARK 2.1 REGRESSION][SQL] Spark can't read Hive table when column type has length greater than 4000 bytes
[ https://issues.apache.org/jira/browse/SPARK-20712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-20712. --- Resolution: Cannot Reproduce > [SPARK 2.1 REGRESSION][SQL] Spark can't read Hive table when column type has > length greater than 4000 bytes > --- > > Key: SPARK-20712 > URL: https://issues.apache.org/jira/browse/SPARK-20712 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.1, 2.1.2, 2.2.0, 2.3.0 >Reporter: Maciej Bryński >Priority: Critical > > Hi, > I have following issue. > I'm trying to read a table from hive when one of the column is nested so it's > schema has length longer than 4000 bytes. > Everything worked on Spark 2.0.2. On 2.1.1 I'm getting Exception: > {code} > >> spark.read.table("SOME_TABLE") > Traceback (most recent call last): > File "", line 1, in > File "/opt/spark-2.1.1/python/pyspark/sql/readwriter.py", line 259, in table > return self._df(self._jreader.table(tableName)) > File > "/opt/spark-2.1.1/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line > 1133, in __call__ > File "/opt/spark-2.1.1/python/pyspark/sql/utils.py", line 63, in deco > return f(*a, **kw) > File "/opt/spark-2.1.1/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", > line 319, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o71.table. > : org.apache.spark.SparkException: Cannot recognize hive type string: > SOME_VERY_LONG_FIELD_TYPE > at > org.apache.spark.sql.hive.client.HiveClientImpl.org$apache$spark$sql$hive$client$HiveClientImpl$$fromHiveColumn(HiveClientImpl.scala:789) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11$$anonfun$7.apply(HiveClientImpl.scala:365) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:365) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1$$anonfun$apply$11.apply(HiveClientImpl.scala:361) > at scala.Option.map(Option.scala:146) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:361) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$getTableOption$1.apply(HiveClientImpl.scala:359) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:279) > at > org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:226) > at > org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:225) > at > org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:268) > at > org.apache.spark.sql.hive.client.HiveClientImpl.getTableOption(HiveClientImpl.scala:359) > at > org.apache.spark.sql.hive.client.HiveClient$class.getTable(HiveClient.scala:74) > at > org.apache.spark.sql.hive.client.HiveClientImpl.getTable(HiveClientImpl.scala:78) > at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$org$apache$spark$sql$hive$HiveExternalCatalog$$getRawTable$1.apply(HiveExternalCatalog.scala:118) > at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$org$apache$spark$sql$hive$HiveExternalCatalog$$getRawTable$1.apply(HiveExternalCatalog.scala:118) > at > org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97) > at > org.apache.spark.sql.hive.HiveExternalCatalog.org$apache$spark$sql$hive$HiveExternalCatalog$$getRawTable(HiveExternalCatalog.scala:117) > at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$getTable$1.apply(HiveExternalCatalog.scala:628) > at > org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$getTable$1.apply(HiveExternalCatalog.scala:628) > at > org.apa
[jira] [Updated] (SPARK-17939) Spark-SQL Nullability: Optimizations vs. Enforcement Clarification
[ https://issues.apache.org/jira/browse/SPARK-17939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-17939: -- Priority: Major (was: Critical) Issue Type: Improvement (was: Bug) > Spark-SQL Nullability: Optimizations vs. Enforcement Clarification > -- > > Key: SPARK-17939 > URL: https://issues.apache.org/jira/browse/SPARK-17939 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.0 >Reporter: Aleksander Eskilson >Priority: Major > > The notion of Nullability of of StructFields in DataFrames and Datasets > creates some confusion. As has been pointed out previously [1], Nullability > is a hint to the Catalyst optimizer, and is not meant to be a type-level > enforcement. Allowing null fields can also help the reader successfully parse > certain types of more loosely-typed data, like JSON and CSV, where null > values are common, rather than just failing. > There's already been some movement to clarify the meaning of Nullable in the > API, but also some requests for a (perhaps completely separate) type-level > implementation of Nullable that can act as an enforcement contract. > This bug is logged here to discuss and clarify this issue. > [1] - > [https://issues.apache.org/jira/browse/SPARK-11319|https://issues.apache.org/jira/browse/SPARK-11319?focusedCommentId=15014535&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15014535] > [2] - https://github.com/apache/spark/pull/11785 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26336) left_anti join with Na Values
[ https://issues.apache.org/jira/browse/SPARK-26336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724052#comment-16724052 ] Marco Gaido commented on SPARK-26336: - That's correct because NULLs do not match. The usual implementation of ANTIJOIN in other DBs (eg. Postgres) is to do a left join and filter for the column on the right side being NULL. If you do so in your example 1 row is returned. > left_anti join with Na Values > - > > Key: SPARK-26336 > URL: https://issues.apache.org/jira/browse/SPARK-26336 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Carlos >Priority: Major > > When I'm joining two dataframes with data that haves NA values, the left_anti > join don't work as well, cause don't detect registers with NA values. > Example: > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql.functions import * > spark = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate() > data = [(1,"Test"),(2,"Test"),(3,None)] > df1 = spark.createDataFrame(data,("id","columndata")) > df2 = spark.createDataFrame(data,("id","columndata")) > df_joined = df1.join(df2, df1.columns,'left_anti'){code} > df_joined have data, when two dataframe are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25953) install jdk11 on jenkins workers
[ https://issues.apache.org/jira/browse/SPARK-25953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724048#comment-16724048 ] Sean Owen commented on SPARK-25953: --- Was this resolved? I think we can just install the latest JDK 11. > install jdk11 on jenkins workers > > > Key: SPARK-25953 > URL: https://issues.apache.org/jira/browse/SPARK-25953 > Project: Spark > Issue Type: Sub-task > Components: Build >Affects Versions: 3.0.0 >Reporter: shane knapp >Assignee: shane knapp >Priority: Critical > > once we pin down exact what we want installed on the jenkins workers, i will > add it to our ansible and deploy. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24680) spark.executorEnv.JAVA_HOME does not take effect in Standalone mode
[ https://issues.apache.org/jira/browse/SPARK-24680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724047#comment-16724047 ] ASF GitHub Bot commented on SPARK-24680: srowen closed pull request #21663: [SPARK-24680][Deploy]Support spark.executorEnv.JAVA_HOME in Standalone mode URL: https://github.com/apache/spark/pull/21663 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index ce24400f557cd..56edceb17bfb8 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -91,14 +91,18 @@ */ List buildJavaCommand(String extraClassPath) throws IOException { List cmd = new ArrayList<>(); -String envJavaHome; -if (javaHome != null) { - cmd.add(join(File.separator, javaHome, "bin", "java")); -} else if ((envJavaHome = System.getenv("JAVA_HOME")) != null) { -cmd.add(join(File.separator, envJavaHome, "bin", "java")); -} else { -cmd.add(join(File.separator, System.getProperty("java.home"), "bin", "java")); +String[] candidateJavaHomes = new String[] { + javaHome, + childEnv.get("JAVA_HOME"), + System.getenv("JAVA_HOME"), + System.getProperty("java.home") +}; +for (String javaHome : candidateJavaHomes) { + if (javaHome != null) { +cmd.add(join(File.separator, javaHome, "bin", "java")); +break; + } } // Load extra JAVA_OPTS from conf/java-opts, if it exists. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > spark.executorEnv.JAVA_HOME does not take effect in Standalone mode > --- > > Key: SPARK-24680 > URL: https://issues.apache.org/jira/browse/SPARK-24680 > Project: Spark > Issue Type: Bug > Components: Deploy >Affects Versions: 2.1.1, 2.2.1, 2.3.1 >Reporter: StanZhai >Assignee: StanZhai >Priority: Minor > Fix For: 3.0.0 > > > spark.executorEnv.JAVA_HOME does not take effect when a Worker starting an > Executor process in Standalone mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26336) left_anti join with Na Values
[ https://issues.apache.org/jira/browse/SPARK-26336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724081#comment-16724081 ] Carlos commented on SPARK-26336: [~mgaido] I think I choose a bad objects to example that. data1 = { 'id':1, 'name':'Carlos' 'surname':'Sevilla' 'address':None 'Country':'ESP' } data2 = { 'id':1, 'name':'Carlos' 'surname':'Sevilla' 'address':None 'Country':'ESP' } That 2 variables, contains the SAME data. If I try to left_anti (with inner don't works too), he must return None results, none rows, cause both dataframe have exactly the same data. > left_anti join with Na Values > - > > Key: SPARK-26336 > URL: https://issues.apache.org/jira/browse/SPARK-26336 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Carlos >Priority: Major > > When I'm joining two dataframes with data that haves NA values, the left_anti > join don't work as well, cause don't detect registers with NA values. > Example: > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql.functions import * > spark = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate() > data = [(1,"Test"),(2,"Test"),(3,None)] > df1 = spark.createDataFrame(data,("id","columndata")) > df2 = spark.createDataFrame(data,("id","columndata")) > df_joined = df1.join(df2, df1.columns,'left_anti'){code} > df_joined have data, when two dataframe are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26336) left_anti join with Na Values
[ https://issues.apache.org/jira/browse/SPARK-26336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724098#comment-16724098 ] Marco Gaido commented on SPARK-26336: - [~csevilla] the point is always the same, ie. the presence of {{NULL}} (Python's None is SQL's NULL). And {{NULL = NULL}} returns {{NULL}}, not {{true}}. This is how every DB works. You can try it in MySQL, Postgres, whatever you prefer. > left_anti join with Na Values > - > > Key: SPARK-26336 > URL: https://issues.apache.org/jira/browse/SPARK-26336 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.2.0 >Reporter: Carlos >Priority: Major > > When I'm joining two dataframes with data that haves NA values, the left_anti > join don't work as well, cause don't detect registers with NA values. > Example: > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql.functions import * > spark = SparkSession.builder.appName('test').enableHiveSupport().getOrCreate() > data = [(1,"Test"),(2,"Test"),(3,None)] > df1 = spark.createDataFrame(data,("id","columndata")) > df2 = spark.createDataFrame(data,("id","columndata")) > df_joined = df1.join(df2, df1.columns,'left_anti'){code} > df_joined have data, when two dataframe are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26395) Spark Thrift server memory leak
Konstantinos Andrikopoulos created SPARK-26395: -- Summary: Spark Thrift server memory leak Key: SPARK-26395 URL: https://issues.apache.org/jira/browse/SPARK-26395 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.2 Reporter: Konstantinos Andrikopoulos We are running Thrift Server in standalone mode and we have observed that the heap of the driver is constantly increasing. After analysing the heap dump the issue seems to be that the ElementTrackingStore is constantly increasing due to the addition of RDDOperationGraphWrapper objects that are not cleaned up. The ElementTrackingStore defines the addTrigger method were you are able to set thresholds in order to perform cleanup but in practice it is used for ExecutorSummaryWrapper, JobDataWrapper and StageDataWrapper classes by using the following spark properties * spark.ui.retainedDeadExecutors * spark.ui.retainedJobs * spark.ui.retainedStages So the RDDOperationGraphWrapper which is been added using the onJobStart method of AppStatusListener class [kvstore.write(uigraph) #line 291] in not cleaned up and it constantly increases causing a memory leak -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26396) Kafka consumer cache overflow since 2.4.x
Kaspar Tint created SPARK-26396: --- Summary: Kafka consumer cache overflow since 2.4.x Key: SPARK-26396 URL: https://issues.apache.org/jira/browse/SPARK-26396 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.4.0 Environment: Spark 2.4 standalone client mode Reporter: Kaspar Tint We are experiencing an issue where the Kafka consumer cache seems to overflow constantly upon starting the application. This issue appeared after upgrading to Spark 2.4. We would get constant warnings like this: {code:java} 18/12/18 07:03:29 WARN KafkaDataConsumer: KafkaConsumer cache hitting max capacity of 180, removing consumer for CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-76) 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max capacity of 180, removing consumer for CacheKey(spark-kafka-source-6f66e0d2-beaf-4ff2-ade8-8996611de6ae--1081651087-executor,kafka-topic-30) 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max capacity of 180, removing consumer for CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-57) 18/12/18 07:03:32 WARN KafkaDataConsumer: KafkaConsumer cache hitting max capacity of 180, removing consumer for CacheKey(spark-kafka-source-f41d1f9e-1700-4994-9d26-2b9c0ee57881--215746753-executor,kafka-topic-43) {code} This application is running 4 different Spark Structured Streaming queries against the same Kafka topic that has 90 partitions. We used to run it with just the default settings so it defaulted to cache size 64 on Spark 2.3 but now we tried to put it to 180 or 360. With 360 we will have a lot less noise about the overflow but resource need will increase substantially. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-17383) improvement LabelPropagation of graphx lib
[ https://issues.apache.org/jira/browse/SPARK-17383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-17383. --- Resolution: Won't Fix > improvement LabelPropagation of graphx lib > -- > > Key: SPARK-17383 > URL: https://issues.apache.org/jira/browse/SPARK-17383 > Project: Spark > Issue Type: Improvement > Components: GraphX >Affects Versions: 2.1.0 >Reporter: XiaoSen Lee >Priority: Major > > In the labelPropagation of graphx lib, node is initialized with a unique > label and at every step each node adopts the label that most of its neighbors > currently have, but ignore the label it currently have. I think it is > unreasonable, because the labe a node had is also useful. When a node trend > to has a stable label, this means there is an association between two > iterations, so a node not only affected by its neighbors, but also its > current label. > so I change the code, and use both the label of its neighbors and itself. > This iterative process densely connected groups of nodes form a consensus on > a unique label to form > communities. But the communities of the LabelPropagation often discontinuous. > Because when the label that most of its neighbors currents have are many,e.g, > node "0" has 6 neigbors labed {"1","1","2","2","3","3"},it maybe randomly > select a label. in order to get a stable label of communities, and prevent > the randomness, so I chose the max lable of node. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17383) improvement LabelPropagation of graphx lib
[ https://issues.apache.org/jira/browse/SPARK-17383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724154#comment-16724154 ] ASF GitHub Bot commented on SPARK-17383: srowen closed pull request #14940: [SPARK-17383][GRAPHX] Improvement LabelPropagaton, and reduce label shake and disconnection of communities URL: https://github.com/apache/spark/pull/14940 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala index fc7547a2c7c27..31a9414ae47ca 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala @@ -58,7 +58,7 @@ object LabelPropagation { }.toMap } def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = { - if (message.isEmpty) attr else message.maxBy(_._2)._1 + (Map(attr -> 1L) ++ message).maxBy(m => (m._2, m._1))._1 } val initialMessage = Map[VertexId, Long]() Pregel(lpaGraph, initialMessage, maxIterations = maxSteps)( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > improvement LabelPropagation of graphx lib > -- > > Key: SPARK-17383 > URL: https://issues.apache.org/jira/browse/SPARK-17383 > Project: Spark > Issue Type: Improvement > Components: GraphX >Affects Versions: 2.1.0 >Reporter: XiaoSen Lee >Priority: Major > > In the labelPropagation of graphx lib, node is initialized with a unique > label and at every step each node adopts the label that most of its neighbors > currently have, but ignore the label it currently have. I think it is > unreasonable, because the labe a node had is also useful. When a node trend > to has a stable label, this means there is an association between two > iterations, so a node not only affected by its neighbors, but also its > current label. > so I change the code, and use both the label of its neighbors and itself. > This iterative process densely connected groups of nodes form a consensus on > a unique label to form > communities. But the communities of the LabelPropagation often discontinuous. > Because when the label that most of its neighbors currents have are many,e.g, > node "0" has 6 neigbors labed {"1","1","2","2","3","3"},it maybe randomly > select a label. in order to get a stable label of communities, and prevent > the randomness, so I chose the max lable of node. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
[ https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-26384: Assignee: Maxim Gekk > CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled > - > > Key: SPARK-26384 > URL: https://issues.apache.org/jira/browse/SPARK-26384 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Major > Fix For: 3.0.0 > > > Starting from the commit > [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2] > , add logging like in the comment > https://github.com/apache/spark/pull/23150#discussion_r242021998 and run: > {code:shell} > $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true > {code} > and in the shell: > {code:scala} > scala> spark.conf.get("spark.sql.legacy.timeParser.enabled") > res0: String = true > scala> > Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") > scala> spark.read.option("inferSchema", "true").option("header", > "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema() > 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is > being used > root > |-- _c0: timestamp (nullable = true) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
[ https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26384. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23345 [https://github.com/apache/spark/pull/23345] > CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled > - > > Key: SPARK-26384 > URL: https://issues.apache.org/jira/browse/SPARK-26384 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Major > Fix For: 3.0.0 > > > Starting from the commit > [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2] > , add logging like in the comment > https://github.com/apache/spark/pull/23150#discussion_r242021998 and run: > {code:shell} > $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true > {code} > and in the shell: > {code:scala} > scala> spark.conf.get("spark.sql.legacy.timeParser.enabled") > res0: String = true > scala> > Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") > scala> spark.read.option("inferSchema", "true").option("header", > "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema() > 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is > being used > root > |-- _c0: timestamp (nullable = true) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26384) CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled
[ https://issues.apache.org/jira/browse/SPARK-26384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724201#comment-16724201 ] ASF GitHub Bot commented on SPARK-26384: asfgit closed pull request #23345: [SPARK-26384][SQL] Propagate SQL configs for CSV schema inferring URL: https://github.com/apache/spark/pull/23345 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index b46dfb94c133e..375cec597166c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -35,6 +35,7 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVInferSchema, CSVOptions, UnivocityParser} +import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.types.StructType @@ -135,7 +136,9 @@ object TextInputCSVDataSource extends CSVDataSource { val parser = new CsvParser(parsedOptions.asParserSettings) linesWithoutHeader.map(parser.parseLine) } -new CSVInferSchema(parsedOptions).infer(tokenRDD, header) +SQLExecution.withSQLConfPropagated(csv.sparkSession) { + new CSVInferSchema(parsedOptions).infer(tokenRDD, header) +} case _ => // If the first line could not be read, just return the empty schema. StructType(Nil) @@ -208,7 +211,9 @@ object MultiLineCSVDataSource extends CSVDataSource { encoding = parsedOptions.charset) } val sampled = CSVUtils.sample(tokenRDD, parsedOptions) -new CSVInferSchema(parsedOptions).infer(sampled, header) +SQLExecution.withSQLConfPropagated(sparkSession) { + new CSVInferSchema(parsedOptions).infer(sampled, header) +} case None => // If the first row could not be read, just return the empty schema. StructType(Nil) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > CSV schema inferring does not respect spark.sql.legacy.timeParser.enabled > - > > Key: SPARK-26384 > URL: https://issues.apache.org/jira/browse/SPARK-26384 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Maxim Gekk >Assignee: Maxim Gekk >Priority: Major > Fix For: 3.0.0 > > > Starting from the commit > [https://github.com/apache/spark/commit/f982ca07e80074bdc1e3b742c5e21cf368e4ede2] > , add logging like in the comment > https://github.com/apache/spark/pull/23150#discussion_r242021998 and run: > {code:shell} > $ ./bin/spark-shell --conf spark.sql.legacy.timeParser.enabled=true > {code} > and in the shell: > {code:scala} > scala> spark.conf.get("spark.sql.legacy.timeParser.enabled") > res0: String = true > scala> > Seq("2010|10|10").toDF.repartition(1).write.mode("overwrite").text("/tmp/foo") > scala> spark.read.option("inferSchema", "true").option("header", > "false").option("timestampFormat", "|MM|dd").csv("/tmp/foo").printSchema() > 18/12/17 12:11:47 ERROR TimestampFormatter: Iso8601TimestampFormatter is > being used > root > |-- _c0: timestamp (nullable = true) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11430) DataFrame's except method does not work, returns 0
[ https://issues.apache.org/jira/browse/SPARK-11430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724203#comment-16724203 ] sathiyarajan commented on SPARK-11430: -- [~ramk256] [~srowen] : is the except issue got fixed in the pyspark 2.4.0 also. {{I am getting some errors }} {{> df1.except(df2).show() File "", line 1 df1.except(df2).show() ^ SyntaxError: invalid syntax >>> df1.exceptAll(df2).show()}} > DataFrame's except method does not work, returns 0 > -- > > Key: SPARK-11430 > URL: https://issues.apache.org/jira/browse/SPARK-11430 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Ram Kandasamy >Priority: Major > > This may or may not be related to this bug here: > https://issues.apache.org/jira/browse/SPARK-11427 > But basically, the except method in dataframes should mirror the > functionality of the subtract method in RDDs, but it is not doing so. > Here is an example: > scala> val firstFile = > sqlContext.read.parquet("/Users/ramkandasamy/sparkData/2015-07-25/*").select("id").distinct > firstFile: org.apache.spark.sql.DataFrame = [id: string] > scala> val secondFile = > sqlContext.read.parquet("/Users/ramkandasamy/sparkData/2015-10-23/*").select("id").distinct > secondFile: org.apache.spark.sql.DataFrame = [id: string] > scala> firstFile.count > res1: Long = 1072046 > scala> secondFile.count > res2: Long = 3569941 > scala> firstFile.except(secondFile).count > res3: Long = 0 > scala> firstFile.rdd.subtract(secondFile.rdd).count > res4: Long = 1072046 > Can anyone help out here? Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26154) Stream-stream joins - left outer join gives inconsistent output
[ https://issues.apache.org/jira/browse/SPARK-26154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724238#comment-16724238 ] sandeep katta commented on SPARK-26154: --- CC [~c...@koeninger.org] [~tdas] . this looks like potential bug to me. can you please look into this issue > Stream-stream joins - left outer join gives inconsistent output > --- > > Key: SPARK-26154 > URL: https://issues.apache.org/jira/browse/SPARK-26154 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.2 > Environment: Spark version - Spark 2.3.2 > OS- Suse 11 >Reporter: Haripriya >Priority: Major > > Stream-stream joins using left outer join gives inconsistent output > The data processed once, is being processed again and gives null value. In > Batch 2, the input data "3" is processed. But again in batch 6, null value > is provided for same data > Steps > In spark-shell > {code:java} > scala> import org.apache.spark.sql.functions.{col, expr} > import org.apache.spark.sql.functions.{col, expr} > scala> import org.apache.spark.sql.streaming.Trigger > import org.apache.spark.sql.streaming.Trigger > scala> val lines_stream1 = spark.readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "ip:9092"). > | option("subscribe", "topic1"). > | option("includeTimestamp", true). > | load(). > | selectExpr("CAST (value AS String)","CAST(timestamp AS > TIMESTAMP)").as[(String,Timestamp)]. > | select(col("value") as("data"),col("timestamp") > as("recordTime")). > | select("data","recordTime"). > | withWatermark("recordTime", "5 seconds ") > lines_stream1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = > [data: string, recordTime: timestamp] > scala> val lines_stream2 = spark.readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "ip:9092"). > | option("subscribe", "topic2"). > | option("includeTimestamp", value = true). > | load(). > | selectExpr("CAST (value AS String)","CAST(timestamp AS > TIMESTAMP)").as[(String,Timestamp)]. > | select(col("value") as("data1"),col("timestamp") > as("recordTime1")). > | select("data1","recordTime1"). > | withWatermark("recordTime1", "10 seconds ") > lines_stream2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = > [data1: string, recordTime1: timestamp] > scala> val query = lines_stream1.join(lines_stream2, expr ( > | """ > | | data == data1 and > | | recordTime1 >= recordTime and > | | recordTime1 <= recordTime + interval 5 seconds > | """.stripMargin),"left"). > | writeStream. > | option("truncate","false"). > | outputMode("append"). > | format("console").option("checkpointLocation", > "/tmp/leftouter/"). > | trigger(Trigger.ProcessingTime ("5 seconds")). > | start() > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1a48f55b > {code} > Step2 : Start producing data > kafka-console-producer.sh --broker-list ip:9092 --topic topic1 > >1 > >2 > >3 > >4 > >5 > >aa > >bb > >cc > kafka-console-producer.sh --broker-list ip:9092 --topic topic2 > >2 > >2 > >3 > >4 > >5 > >aa > >cc > >ee > >ee > > Output obtained: > {code:java} > Batch: 0 > --- > ++--+-+---+ > |data|recordTime|data1|recordTime1| > ++--+-+---+ > ++--+-+---+ > --- > Batch: 1 > --- > ++--+-+---+ > |data|recordTime|data1|recordTime1| > ++--+-+---+ > ++--+-+---+ > --- > Batch: 2 > --- > ++---+-+---+ > |data|recordTime |data1|recordTime1| > ++---+-+---+ > |3 |2018-11-22 20:09:35.053|3|2018-11-22 20:09:36.506| > |2 |2018-11-22 20:09:31.613|2|2018-11-22 20:09:33.116| > ++---+-+---+ > --- > Batch: 3 > --- > ++---+-+---+ > |data|recordTime |data1|recordTime1| > ++---+-+---+ > |4 |20
[jira] [Updated] (SPARK-26316) Because of the perf degradation in TPC-DS, we currently partial revert SPARK-21052:Add hash map metrics to join,
[ https://issues.apache.org/jira/browse/SPARK-26316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-26316: -- Fix Version/s: 2.3.3 > Because of the perf degradation in TPC-DS, we currently partial revert > SPARK-21052:Add hash map metrics to join, > > > Key: SPARK-26316 > URL: https://issues.apache.org/jira/browse/SPARK-26316 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Ke Jia >Assignee: Ke Jia >Priority: Major > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > The code of > [L486|https://github.com/apache/spark/blob/1d3dd58d21400b5652b75af7e7e53aad85a31528/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L486] > and > [L487|https://github.com/apache/spark/blob/1d3dd58d21400b5652b75af7e7e53aad85a31528/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L487] > in SPARK-21052 cause performance degradation in spark2.3. The result of > all queries in TPC-DS with 1TB is in [TPC-DS > result|https://docs.google.com/spreadsheets/d/18a5BdOlmm8euTaRodyeWum9yu92mbWWu6JbhGXtr7yE/edit#gid=0] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26316) Because of the perf degradation in TPC-DS, we currently partial revert SPARK-21052:Add hash map metrics to join,
[ https://issues.apache.org/jira/browse/SPARK-26316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724258#comment-16724258 ] ASF GitHub Bot commented on SPARK-26316: dongjoon-hyun closed pull request #23319: [SPARK-26316][BRANCH-2.3] Revert hash join metrics in spark 21052 that causes performance degradation URL: https://github.com/apache/spark/pull/23319 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index 1918fcc5482db..13fa926d3366a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -47,8 +47,7 @@ case class BroadcastHashJoinExec( extends BinaryExecNode with HashJoin with CodegenSupport { override lazy val metrics = Map( -"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), -"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe")) +"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def requiredChildDistribution: Seq[Distribution] = { val mode = HashedRelationBroadcastMode(buildKeys) @@ -62,13 +61,12 @@ case class BroadcastHashJoinExec( protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") -val avgHashProbe = longMetric("avgHashProbe") val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]() streamedPlan.execute().mapPartitions { streamedIter => val hashed = broadcastRelation.value.asReadOnlyCopy() TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize) - join(streamedIter, hashed, numOutputRows, avgHashProbe) + join(streamedIter, hashed, numOutputRows) } } @@ -110,23 +108,6 @@ case class BroadcastHashJoinExec( } } - /** - * Returns the codes used to add a task completion listener to update avg hash probe - * at the end of the task. - */ - private def genTaskListener(avgHashProbe: String, relationTerm: String): String = { -val listenerClass = classOf[TaskCompletionListener].getName -val taskContextClass = classOf[TaskContext].getName -s""" - | $taskContextClass$$.MODULE$$.get().addTaskCompletionListener(new $listenerClass() { - | @Override - | public void onTaskCompletion($taskContextClass context) { - | $avgHashProbe.set($relationTerm.getAverageProbesPerLookup()); - | } - | }); - """.stripMargin - } - /** * Returns a tuple of Broadcast of HashedRelation and the variable name for it. */ @@ -136,15 +117,11 @@ case class BroadcastHashJoinExec( val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation) val clsName = broadcastRelation.value.getClass.getName -// At the end of the task, we update the avg hash probe. -val avgHashProbe = metricTerm(ctx, "avgHashProbe") - // Inline mutable state since not many join operations in a task val relationTerm = ctx.addMutableState(clsName, "relation", v => s""" | $v = (($clsName) $broadcast.value()).asReadOnlyCopy(); | incPeakExecutionMemory($v.estimatedSize()); - | ${genTaskListener(avgHashProbe, v)} """.stripMargin, forceInline = true) (broadcastRelation, relationTerm) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 0396168d3f311..b197bf6c89981 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -194,8 +194,7 @@ trait HashJoin { protected def join( streamedIter: Iterator[InternalRow], hashed: HashedRelation, - numOutputRows: SQLMetric, - avgHashProbe: SQLMetric): Iterator[InternalRow] = { + numOutputRows: SQLMetric): Iterator[InternalRow] = { val joinedIter = joinType match { case _: InnerLike => @@ -213,10 +212,6 @@ trait HashJoin { s"BroadcastHashJoin should not take $x as the JoinType") } -// At the end of the task, we update the avg hash probe. -TaskContext.get().addTaskCompletionListener(_ => - avgHashProbe.set(hashed.getAverageProbesPerLookup)) - val resultProj = createResultProjection joinedIter.map { r => numO
[jira] [Resolved] (SPARK-26382) prefix sorter should handle -0.0
[ https://issues.apache.org/jira/browse/SPARK-26382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun resolved SPARK-26382. --- Resolution: Fixed Fix Version/s: 3.0.0 2.4.1 This is resolved via https://github.com/apache/spark/pull/23334 > prefix sorter should handle -0.0 > > > Key: SPARK-26382 > URL: https://issues.apache.org/jira/browse/SPARK-26382 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 2.4.1, 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26394) Annotation error for Utils.timeStringAsMs
[ https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-26394: -- Priority: Trivial (was: Minor) Component/s: Documentation This doesn't need a JIRA if the 'what' and 'how' are the same > Annotation error for Utils.timeStringAsMs > - > > Key: SPARK-26394 > URL: https://issues.apache.org/jira/browse/SPARK-26394 > Project: Spark > Issue Type: Bug > Components: Documentation, Spark Core >Affects Versions: 2.4.0 >Reporter: Jackey Lee >Priority: Trivial > > Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it > says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds > for internal use." > Thus, microseconds should be changed to milliseconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26394) Annotation error for Utils.timeStringAsMs
[ https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-26394: - Assignee: Jackey Lee > Annotation error for Utils.timeStringAsMs > - > > Key: SPARK-26394 > URL: https://issues.apache.org/jira/browse/SPARK-26394 > Project: Spark > Issue Type: Bug > Components: Documentation, Spark Core >Affects Versions: 2.4.0 >Reporter: Jackey Lee >Assignee: Jackey Lee >Priority: Trivial > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it > says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds > for internal use." > Thus, microseconds should be changed to milliseconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26397) Driver-side only metrics support
Yuanjian Li created SPARK-26397: --- Summary: Driver-side only metrics support Key: SPARK-26397 URL: https://issues.apache.org/jira/browse/SPARK-26397 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Environment: As the comment in [https://github.com/apache/spark/pull/23327#discussion_r242646521,] during the work of SPARK-26222 and SPARK-26223, we need the support for driver-side only metrics, which will mark the metadata relative metrics as driver-side only and will not send them to executor-side. This issue needs some changes in SparkPlan and SparkPlanInfo, we should also check is there any misuse before. Reporter: Yuanjian Li Fix For: 3.0.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26382) prefix sorter should handle -0.0
[ https://issues.apache.org/jira/browse/SPARK-26382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724319#comment-16724319 ] ASF GitHub Bot commented on SPARK-26382: asfgit closed pull request #23334: [SPARK-26382][CORE] prefix comparator should handle -0.0 URL: https://github.com/apache/spark/pull/23334 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java index 0910db22af004..bef1bdadb27aa 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java @@ -69,6 +69,8 @@ public static long computePrefix(byte[] bytes) { * details see http://stereopsis.com/radix.html. */ public static long computePrefix(double value) { + // normalize -0.0 to 0.0, as they should be equal + value = value == -0.0 ? 0.0 : value; // Java's doubleToLongBits already canonicalizes all NaN values to the smallest possible // positive NaN, so there's nothing special we need to do for NaNs. long bits = Double.doubleToLongBits(value); diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala index 73546ef1b7a60..38cb37c524594 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala @@ -125,6 +125,7 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks { val nan2Prefix = PrefixComparators.DoublePrefixComparator.computePrefix(nan2) assert(nan1Prefix === nan2Prefix) val doubleMaxPrefix = PrefixComparators.DoublePrefixComparator.computePrefix(Double.MaxValue) +// NaN is greater than the max double value. assert(PrefixComparators.DOUBLE.compare(nan1Prefix, doubleMaxPrefix) === 1) } @@ -134,22 +135,34 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks { assert(java.lang.Double.doubleToRawLongBits(negativeNan) < 0) val prefix = PrefixComparators.DoublePrefixComparator.computePrefix(negativeNan) val doubleMaxPrefix = PrefixComparators.DoublePrefixComparator.computePrefix(Double.MaxValue) +// -NaN is greater than the max double value. assert(PrefixComparators.DOUBLE.compare(prefix, doubleMaxPrefix) === 1) } test("double prefix comparator handles other special values properly") { -val nullValue = 0L +// See `SortPrefix.nullValue` for how we deal with nulls for float/double type +val smallestNullPrefix = 0L +val largestNullPrefix = -1L val nan = PrefixComparators.DoublePrefixComparator.computePrefix(Double.NaN) val posInf = PrefixComparators.DoublePrefixComparator.computePrefix(Double.PositiveInfinity) val negInf = PrefixComparators.DoublePrefixComparator.computePrefix(Double.NegativeInfinity) val minValue = PrefixComparators.DoublePrefixComparator.computePrefix(Double.MinValue) val maxValue = PrefixComparators.DoublePrefixComparator.computePrefix(Double.MaxValue) val zero = PrefixComparators.DoublePrefixComparator.computePrefix(0.0) +val minusZero = PrefixComparators.DoublePrefixComparator.computePrefix(-0.0) + +// null is greater than everything including NaN, when we need to treat it as the largest value. +assert(PrefixComparators.DOUBLE.compare(largestNullPrefix, nan) === 1) +// NaN is greater than the positive infinity. assert(PrefixComparators.DOUBLE.compare(nan, posInf) === 1) assert(PrefixComparators.DOUBLE.compare(posInf, maxValue) === 1) assert(PrefixComparators.DOUBLE.compare(maxValue, zero) === 1) assert(PrefixComparators.DOUBLE.compare(zero, minValue) === 1) assert(PrefixComparators.DOUBLE.compare(minValue, negInf) === 1) -assert(PrefixComparators.DOUBLE.compare(negInf, nullValue) === 1) +// null is smaller than everything including negative infinity, when we need to treat it as +// the smallest value. +assert(PrefixComparators.DOUBLE.compare(negInf, smallestNullPrefix) === 1) +// 0.0 should be equal to -0.0. +assert(PrefixComparators.DOUBLE.compare(zero, minusZero) === 0) } } This is an automated message from the Apache Git Service. To respond to the message, pleas
[jira] [Resolved] (SPARK-26394) Annotation error for Utils.timeStringAsMs
[ https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-26394. --- Resolution: Fixed Fix Version/s: 2.4.1 3.0.0 2.3.3 Issue resolved by pull request 23346 [https://github.com/apache/spark/pull/23346] > Annotation error for Utils.timeStringAsMs > - > > Key: SPARK-26394 > URL: https://issues.apache.org/jira/browse/SPARK-26394 > Project: Spark > Issue Type: Bug > Components: Documentation, Spark Core >Affects Versions: 2.4.0 >Reporter: Jackey Lee >Assignee: Jackey Lee >Priority: Trivial > Fix For: 2.3.3, 3.0.0, 2.4.1 > > > Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it > says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds > for internal use." > Thus, microseconds should be changed to milliseconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26394) Annotation error for Utils.timeStringAsMs
[ https://issues.apache.org/jira/browse/SPARK-26394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724325#comment-16724325 ] ASF GitHub Bot commented on SPARK-26394: srowen closed pull request #23346: [SPARK-26394][core] Fix annotation error for Utils.timeStringAsMs URL: https://github.com/apache/spark/pull/23346 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c8b148be84536..8f86b472b9373 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1085,7 +1085,7 @@ private[spark] object Utils extends Logging { } /** - * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If + * Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. If * no suffix is provided, the passed number is assumed to be in ms. */ def timeStringAsMs(str: String): Long = { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Annotation error for Utils.timeStringAsMs > - > > Key: SPARK-26394 > URL: https://issues.apache.org/jira/browse/SPARK-26394 > Project: Spark > Issue Type: Bug > Components: Documentation, Spark Core >Affects Versions: 2.4.0 >Reporter: Jackey Lee >Assignee: Jackey Lee >Priority: Trivial > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > Utils.timeStringAsMs() is parsing time to milliseconds, but in annotation, it > says "Convert a time parameter such as (50s, 100ms, or 250us) to microseconds > for internal use." > Thus, microseconds should be changed to milliseconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25953) install jdk11 on jenkins workers
[ https://issues.apache.org/jira/browse/SPARK-25953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724361#comment-16724361 ] shane knapp commented on SPARK-25953: - no, not yet. i just got back from vacation and am planning on getting to this in the next couple of days (before i head out for xmas). > install jdk11 on jenkins workers > > > Key: SPARK-25953 > URL: https://issues.apache.org/jira/browse/SPARK-25953 > Project: Spark > Issue Type: Sub-task > Components: Build >Affects Versions: 3.0.0 >Reporter: shane knapp >Assignee: shane knapp >Priority: Critical > > once we pin down exact what we want installed on the jenkins workers, i will > add it to our ansible and deploy. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26398) Support building GPU docker images
Rong Ou created SPARK-26398: --- Summary: Support building GPU docker images Key: SPARK-26398 URL: https://issues.apache.org/jira/browse/SPARK-26398 Project: Spark Issue Type: Improvement Components: Kubernetes Affects Versions: 2.4.0 Reporter: Rong Ou To run Spark on Kubernetes, a user first needs to build docker images using the `bin/docker-image-tool.sh` script. However, this script only supports building images for running on CPUs. As parts of Spark and related libraries (e.g. XGBoost) get accelerated on GPUs, it's desirable to build base images that can take advantage of GPU acceleration. This issue only addresses building docker images with CUDA support. Actually accelerating Spark on GPUs is outside the scope, as is supporting other types of GPUs. Today if anyone wants to experiment with running Spark on Kubernetes with GPU support, they have to write their own custom `Dockerfile`. By providing an "official" way to build GPU-enabled docker images, we can make it easier to get started. For now probably not that many people care about this, but it's a necessary first step towards GPU acceleration for Spark on Kubernetes. The risks are minimal as we only need to make minor changes to `bin/docker-image-tool.sh`. The PR is already done and will be attached. Success means anyone can easily build Spark docker images with GPU support. Proposed API changes: add an optional `-g` flag to `bin/docker-image-tool.sh` for building GPU versions of the JVM/Python/R docker images. When the `-g` is omitted, existing behavior is preserved. Design sketch: when the `-g` flag is specified, we append `-gpu` to the docker image names, and switch to dockerfiles based on the official CUDA images. Since the CUDA images are based on Ubuntu while the Spark dockerfiles are based on Alpine, steps for setting up additional packages are different, so there are a parallel set of `Dockerfile.gpu` files. Alternative: if we are willing to forego Alpine and switch to Ubuntu for the CPU-only images, the two sets of dockerfiles can be unified, and we can just pass in a different base image depending on whether the `-g` flag is present or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26398) Support building GPU docker images
[ https://issues.apache.org/jira/browse/SPARK-26398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724436#comment-16724436 ] Rong Ou commented on SPARK-26398: - https://github.com/apache/spark/pull/23347 > Support building GPU docker images > -- > > Key: SPARK-26398 > URL: https://issues.apache.org/jira/browse/SPARK-26398 > Project: Spark > Issue Type: Improvement > Components: Kubernetes >Affects Versions: 2.4.0 >Reporter: Rong Ou >Priority: Minor > > To run Spark on Kubernetes, a user first needs to build docker images using > the `bin/docker-image-tool.sh` script. However, this script only supports > building images for running on CPUs. As parts of Spark and related libraries > (e.g. XGBoost) get accelerated on GPUs, it's desirable to build base images > that can take advantage of GPU acceleration. > This issue only addresses building docker images with CUDA support. Actually > accelerating Spark on GPUs is outside the scope, as is supporting other types > of GPUs. > Today if anyone wants to experiment with running Spark on Kubernetes with GPU > support, they have to write their own custom `Dockerfile`. By providing an > "official" way to build GPU-enabled docker images, we can make it easier to > get started. > For now probably not that many people care about this, but it's a necessary > first step towards GPU acceleration for Spark on Kubernetes. > The risks are minimal as we only need to make minor changes to > `bin/docker-image-tool.sh`. The PR is already done and will be attached. > Success means anyone can easily build Spark docker images with GPU support. > Proposed API changes: add an optional `-g` flag to > `bin/docker-image-tool.sh` for building GPU versions of the JVM/Python/R > docker images. When the `-g` is omitted, existing behavior is preserved. > Design sketch: when the `-g` flag is specified, we append `-gpu` to the > docker image names, and switch to dockerfiles based on the official CUDA > images. Since the CUDA images are based on Ubuntu while the Spark dockerfiles > are based on Alpine, steps for setting up additional packages are different, > so there are a parallel set of `Dockerfile.gpu` files. > Alternative: if we are willing to forego Alpine and switch to Ubuntu for the > CPU-only images, the two sets of dockerfiles can be unified, and we can just > pass in a different base image depending on whether the `-g` flag is present > or not. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25869) Spark on YARN: the original diagnostics is missing when job failed maxAppAttempts times
[ https://issues.apache.org/jira/browse/SPARK-25869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reassigned SPARK-25869: -- Assignee: Marcelo Vanzin > Spark on YARN: the original diagnostics is missing when job failed > maxAppAttempts times > --- > > Key: SPARK-25869 > URL: https://issues.apache.org/jira/browse/SPARK-25869 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.1.1 >Reporter: Yeliang Cang >Assignee: Marcelo Vanzin >Priority: Major > > When configure spark on yarn, I submit job using below command: > {code} > spark-submit --class org.apache.spark.examples.SparkPi --master yarn > --deploy-mode cluster --driver-memory 127m --driver-cores 1 > --executor-memory 2048m --executor-cores 1 --num-executors 10 --queue > root.mr --conf spark.testing.reservedMemory=1048576 --conf > spark.yarn.executor.memoryOverhead=50 --conf > spark.yarn.driver.memoryOverhead=50 > /opt/ZDH/parcels/lib/spark/examples/jars/spark-examples* 1 > {code} > Apparently, the driver memory is not enough, but this can not be seen in > spark client log: > {code} > 2018-10-29 19:28:34,658 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0013 (state: ACCEPTED) > 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0013 (state: RUNNING) > 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client: > client token: N/A > diagnostics: N/A > ApplicationMaster host: 10.43.183.143 > ApplicationMaster RPC port: 0 > queue: root.mr > start time: 1540812501560 > final status: UNDEFINED > tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/ > user: mr > 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0013 (state: FINISHED) > 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client: > client token: N/A > diagnostics: Shutdown hook called before final status was reported. > ApplicationMaster host: 10.43.183.143 > ApplicationMaster RPC port: 0 > queue: root.mr > start time: 1540812501560 > final status: FAILED > tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/ > user: mr > Exception in thread "main" org.apache.spark.SparkException: Application > application_1540536615315_0013 finished with failed status > at org.apache.spark.deploy.yarn.Client.run(Client.scala:1137) > at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1183) > at org.apache.spark.deploy.yarn.Client.main(Client.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > 2018-10-29 19:28:36,694 INFO org.apache.spark.util.ShutdownHookManager: > Shutdown hook called > 2018-10-29 19:28:36,695 INFO org.apache.spark.util.ShutdownHookManager: > Deleting directory /tmp/spark-96077be5-0dfa-496d-a6a0-96e83393a8d9 > {code} > > > Solution: after apply the patch, spark client log can be shown as: > {code} > 2018-10-29 19:27:32,962 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0012 (state: RUNNING) > 2018-10-29 19:27:32,962 INFO org.apache.spark.deploy.yarn.Client: > client token: N/A > diagnostics: N/A > ApplicationMaster host: 10.43.183.143 > ApplicationMaster RPC port: 0 > queue: root.mr > start time: 1540812436656 > final status: UNDEFINED > tracking URL: http://zdh141:8088/proxy/application_1540536615315_0012/ > user: mr > 2018-10-29 19:27:33,964 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0012 (state: FAILED) > 2018-10-29 19:27:33,964 INFO org.apache.spark.deploy.yarn.Client: > client token: N/A > diagnostics: Application application_1540536615315_0012 failed 2 times due > to AM Container for appattempt_1540536615315_0012_02 exited with > exitCode: -104 > For more detailed output, check application tracking > page:http://zdh141:8088/cluster/app/application_1540536615315_0012Then, click > on links to logs of each attempt. > Diagnostics: virtual m
[jira] [Commented] (SPARK-25869) Spark on YARN: the original diagnostics is missing when job failed maxAppAttempts times
[ https://issues.apache.org/jira/browse/SPARK-25869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724453#comment-16724453 ] ASF GitHub Bot commented on SPARK-25869: vanzin closed pull request #22876: [SPARK-25869] [YARN] the original diagnostics is missing when job failed ma… URL: https://github.com/apache/spark/pull/22876 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8f94e3f731007..57f0a7f05b2e5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -293,6 +293,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } if (!unregistered) { + logInfo("Waiting for " + sparkConf.get("spark.yarn.report.interval", "1000").toInt +"ms to unregister am," + +" so the client can have the right diagnostics msg!") + Thread.sleep(sparkConf.get("spark.yarn.report.interval", "1000").toInt) // we only want to unregister if we don't want the RM to retry if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { unregister(finalStatus, finalMsg) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Spark on YARN: the original diagnostics is missing when job failed > maxAppAttempts times > --- > > Key: SPARK-25869 > URL: https://issues.apache.org/jira/browse/SPARK-25869 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.1.1 >Reporter: Yeliang Cang >Priority: Major > > When configure spark on yarn, I submit job using below command: > {code} > spark-submit --class org.apache.spark.examples.SparkPi --master yarn > --deploy-mode cluster --driver-memory 127m --driver-cores 1 > --executor-memory 2048m --executor-cores 1 --num-executors 10 --queue > root.mr --conf spark.testing.reservedMemory=1048576 --conf > spark.yarn.executor.memoryOverhead=50 --conf > spark.yarn.driver.memoryOverhead=50 > /opt/ZDH/parcels/lib/spark/examples/jars/spark-examples* 1 > {code} > Apparently, the driver memory is not enough, but this can not be seen in > spark client log: > {code} > 2018-10-29 19:28:34,658 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0013 (state: ACCEPTED) > 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0013 (state: RUNNING) > 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client: > client token: N/A > diagnostics: N/A > ApplicationMaster host: 10.43.183.143 > ApplicationMaster RPC port: 0 > queue: root.mr > start time: 1540812501560 > final status: UNDEFINED > tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/ > user: mr > 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0013 (state: FINISHED) > 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client: > client token: N/A > diagnostics: Shutdown hook called before final status was reported. > ApplicationMaster host: 10.43.183.143 > ApplicationMaster RPC port: 0 > queue: root.mr > start time: 1540812501560 > final status: FAILED > tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/ > user: mr > Exception in thread "main" org.apache.spark.SparkException: Application > application_1540536615315_0013 finished with failed status > at org.apache.spark.deploy.yarn.Client.run(Client.scala:1137) > at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1183) > at org.apache.spark.deploy.yarn.Client.main(Client.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apa
[jira] [Assigned] (SPARK-25869) Spark on YARN: the original diagnostics is missing when job failed maxAppAttempts times
[ https://issues.apache.org/jira/browse/SPARK-25869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reassigned SPARK-25869: -- Assignee: (was: Marcelo Vanzin) > Spark on YARN: the original diagnostics is missing when job failed > maxAppAttempts times > --- > > Key: SPARK-25869 > URL: https://issues.apache.org/jira/browse/SPARK-25869 > Project: Spark > Issue Type: Bug > Components: YARN >Affects Versions: 2.1.1 >Reporter: Yeliang Cang >Priority: Major > > When configure spark on yarn, I submit job using below command: > {code} > spark-submit --class org.apache.spark.examples.SparkPi --master yarn > --deploy-mode cluster --driver-memory 127m --driver-cores 1 > --executor-memory 2048m --executor-cores 1 --num-executors 10 --queue > root.mr --conf spark.testing.reservedMemory=1048576 --conf > spark.yarn.executor.memoryOverhead=50 --conf > spark.yarn.driver.memoryOverhead=50 > /opt/ZDH/parcels/lib/spark/examples/jars/spark-examples* 1 > {code} > Apparently, the driver memory is not enough, but this can not be seen in > spark client log: > {code} > 2018-10-29 19:28:34,658 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0013 (state: ACCEPTED) > 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0013 (state: RUNNING) > 2018-10-29 19:28:35,660 INFO org.apache.spark.deploy.yarn.Client: > client token: N/A > diagnostics: N/A > ApplicationMaster host: 10.43.183.143 > ApplicationMaster RPC port: 0 > queue: root.mr > start time: 1540812501560 > final status: UNDEFINED > tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/ > user: mr > 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0013 (state: FINISHED) > 2018-10-29 19:28:36,663 INFO org.apache.spark.deploy.yarn.Client: > client token: N/A > diagnostics: Shutdown hook called before final status was reported. > ApplicationMaster host: 10.43.183.143 > ApplicationMaster RPC port: 0 > queue: root.mr > start time: 1540812501560 > final status: FAILED > tracking URL: http://zdh141:8088/proxy/application_1540536615315_0013/ > user: mr > Exception in thread "main" org.apache.spark.SparkException: Application > application_1540536615315_0013 finished with failed status > at org.apache.spark.deploy.yarn.Client.run(Client.scala:1137) > at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1183) > at org.apache.spark.deploy.yarn.Client.main(Client.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > 2018-10-29 19:28:36,694 INFO org.apache.spark.util.ShutdownHookManager: > Shutdown hook called > 2018-10-29 19:28:36,695 INFO org.apache.spark.util.ShutdownHookManager: > Deleting directory /tmp/spark-96077be5-0dfa-496d-a6a0-96e83393a8d9 > {code} > > > Solution: after apply the patch, spark client log can be shown as: > {code} > 2018-10-29 19:27:32,962 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0012 (state: RUNNING) > 2018-10-29 19:27:32,962 INFO org.apache.spark.deploy.yarn.Client: > client token: N/A > diagnostics: N/A > ApplicationMaster host: 10.43.183.143 > ApplicationMaster RPC port: 0 > queue: root.mr > start time: 1540812436656 > final status: UNDEFINED > tracking URL: http://zdh141:8088/proxy/application_1540536615315_0012/ > user: mr > 2018-10-29 19:27:33,964 INFO org.apache.spark.deploy.yarn.Client: Application > report for application_1540536615315_0012 (state: FAILED) > 2018-10-29 19:27:33,964 INFO org.apache.spark.deploy.yarn.Client: > client token: N/A > diagnostics: Application application_1540536615315_0012 failed 2 times due > to AM Container for appattempt_1540536615315_0012_02 exited with > exitCode: -104 > For more detailed output, check application tracking > page:http://zdh141:8088/cluster/app/application_1540536615315_0012Then, click > on links to logs of each attempt. > Diagnostics: virtual memory used. Killing contain
[jira] [Commented] (SPARK-23431) Expose the new executor memory metrics at the stage level
[ https://issues.apache.org/jira/browse/SPARK-23431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724472#comment-16724472 ] Edwina Lu commented on SPARK-23431: --- Splitting off the new REST APIs into a new subtask. > Expose the new executor memory metrics at the stage level > - > > Key: SPARK-23431 > URL: https://issues.apache.org/jira/browse/SPARK-23431 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.2.1 >Reporter: Edwina Lu >Priority: Major > > Collect and show the new executor memory metrics for each stage, to provide > more information on how memory is used per stage. > Modify the AppStatusListener to track the peak values for JVM used memory, > execution memory, storage memory, and unified memory for each executor for > each stage. > This is a subtask for SPARK-23206. Please refer to the design doc for that > ticket for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23431) Expose the new executor memory metrics at the stage level
[ https://issues.apache.org/jira/browse/SPARK-23431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edwina Lu updated SPARK-23431: -- Description: Collect and show the new executor memory metrics for each stage, to provide more information on how memory is used per stage. Modify the AppStatusListener to track the peak values for JVM used memory, execution memory, storage memory, and unified memory for each executor for each stage. This is a subtask for SPARK-23206. Please refer to the design doc for that ticket for more details. was: Collect and show the new executor memory metrics for each stage, to provide more information on how memory is used per stage. Modify the AppStatusListener to track the peak values for JVM used memory, execution memory, storage memory, and unified memory for each executor for each stage. Add the peak values for the metrics to the stages REST API. Also add a new stageSummary REST API, which will return executor summary metrics for a specified stage: {code:java} curl http://:18080/api/v1/applicationsexecutorSummary{code} This is a subtask for SPARK-23206. Please refer to the design doc for that ticket for more details. > Expose the new executor memory metrics at the stage level > - > > Key: SPARK-23431 > URL: https://issues.apache.org/jira/browse/SPARK-23431 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.2.1 >Reporter: Edwina Lu >Priority: Major > > Collect and show the new executor memory metrics for each stage, to provide > more information on how memory is used per stage. > Modify the AppStatusListener to track the peak values for JVM used memory, > execution memory, storage memory, and unified memory for each executor for > each stage. > This is a subtask for SPARK-23206. Please refer to the design doc for that > ticket for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26399) Add new stage-level REST APIs and parameters
Edwina Lu created SPARK-26399: - Summary: Add new stage-level REST APIs and parameters Key: SPARK-26399 URL: https://issues.apache.org/jira/browse/SPARK-26399 Project: Spark Issue Type: Sub-task Components: Spark Core Affects Versions: 2.4.0 Reporter: Edwina Lu Add the peak values for the metrics to the stages REST API. Also add a new executorSummary REST API, which will return executor summary metrics for a specified stage: {code:java} curl http://:18080/api/v1/applicationsexecutorSummary {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25815) Kerberos Support in Kubernetes resource manager (Client Mode)
[ https://issues.apache.org/jira/browse/SPARK-25815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reassigned SPARK-25815: -- Assignee: Marcelo Vanzin > Kerberos Support in Kubernetes resource manager (Client Mode) > - > > Key: SPARK-25815 > URL: https://issues.apache.org/jira/browse/SPARK-25815 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 3.0.0 >Reporter: Ilan Filonenko >Assignee: Marcelo Vanzin >Priority: Major > Fix For: 3.0.0 > > > Include Kerberos support for Spark on K8S jobs running in client-mode -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25815) Kerberos Support in Kubernetes resource manager (Client Mode)
[ https://issues.apache.org/jira/browse/SPARK-25815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724486#comment-16724486 ] ASF GitHub Bot commented on SPARK-25815: asfgit closed pull request #22911: [SPARK-25815][k8s] Support kerberos in client mode, keytab-based token renewal. URL: https://github.com/apache/spark/pull/22911 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d4055cb6c5853..763bd0a70a035 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy import java.io._ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} -import java.net.URL +import java.net.{URI, URL} import java.security.PrivilegedExceptionAction import java.text.ParseException import java.util.UUID @@ -334,19 +334,20 @@ private[spark] class SparkSubmit extends Logging { val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf)) val targetDir = Utils.createTempDir() -// assure a keytab is available from any place in a JVM -if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) { - if (args.principal != null) { -if (args.keytab != null) { - require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") - // Add keytab and principal configurations in sysProps to make them available - // for later use; e.g. in spark sql, the isolated class loader used to talk - // to HiveMetastore will use these settings. They will be set as Java system - // properties and then loaded by SparkConf - sparkConf.set(KEYTAB, args.keytab) - sparkConf.set(PRINCIPAL, args.principal) - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) -} +// Kerberos is not supported in standalone mode, and keytab support is not yet available +// in Mesos cluster mode. +if (clusterManager != STANDALONE +&& !isMesosCluster +&& args.principal != null +&& args.keytab != null) { + // If client mode, make sure the keytab is just a local path. + if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) { +args.keytab = new URI(args.keytab).getPath() + } + + if (!Utils.isLocalUri(args.keytab)) { +require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") +UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 126a6ab801369..f7e3ddecee093 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.security import java.io.File +import java.net.URI import java.security.PrivilegedExceptionAction import java.util.concurrent.{ScheduledExecutorService, TimeUnit} import java.util.concurrent.atomic.AtomicReference @@ -71,11 +72,13 @@ private[spark] class HadoopDelegationTokenManager( private val providerEnabledConfig = "spark.security.credentials.%s.enabled" private val principal = sparkConf.get(PRINCIPAL).orNull - private val keytab = sparkConf.get(KEYTAB).orNull + + // The keytab can be a local: URI for cluster mode, so translate it to a regular path. If it is + // needed later on, the code will check that it exists. + private val keytab = sparkConf.get(KEYTAB).map { uri => new URI(uri).getPath() }.orNull require((principal == null) == (keytab == null), "Both principal and keytab must be defined, or neither.") - require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.") private val delegationTokenProviders = loadProviders() logDebug("Using the following builtin delegation token providers: " + @@ -264,6 +267,7 @@ private[spark] class HadoopDelegationTokenManager( private def doLogin(): UserGroupInformation = { logInfo(s"Attempting to login to KDC using principal: $principal") +require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.") val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyta
[jira] [Resolved] (SPARK-25815) Kerberos Support in Kubernetes resource manager (Client Mode)
[ https://issues.apache.org/jira/browse/SPARK-25815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin resolved SPARK-25815. Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 22911 [https://github.com/apache/spark/pull/22911] > Kerberos Support in Kubernetes resource manager (Client Mode) > - > > Key: SPARK-25815 > URL: https://issues.apache.org/jira/browse/SPARK-25815 > Project: Spark > Issue Type: New Feature > Components: Kubernetes >Affects Versions: 3.0.0 >Reporter: Ilan Filonenko >Assignee: Marcelo Vanzin >Priority: Major > Fix For: 3.0.0 > > > Include Kerberos support for Spark on K8S jobs running in client-mode -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26400) [k8s] Init container silently swallows errors when fetching jars from remote url
Stanis Shkel created SPARK-26400: Summary: [k8s] Init container silently swallows errors when fetching jars from remote url Key: SPARK-26400 URL: https://issues.apache.org/jira/browse/SPARK-26400 Project: Spark Issue Type: Bug Components: Kubernetes Affects Versions: 2.3.2 Reporter: Stanis Shkel I run the following command {code:bash} spark-2.3.2-bin-hadoop2.7/bin/spark-submit --name client \ --master "k8s://cluster" \ --deploy-mode cluster \ --conf spark.executor.instances=2 \ --conf spark.executor.memory=5G \ --conf spark.driver.memory=8G \ --conf spark.kubernetes.container.image=rego.azurecr.io/spark:spark-2.3.2-hadoop2.7 \ --class au.com.random.DoesntMatter \ "https://fake-link.com/jars/my.jar"; {code} I expect init container to fail to download jar and get a failure in the init stage. Instead I get driver failure with the following message. {code:bash} ++ id -u + myuid=0 ++ id -g + mygid=0 ++ getent passwd 0 + uidentry=root:x:0:0:root:/root:/bin/ash + '[' -z root:x:0:0:root:/root:/bin/ash ']' + SPARK_K8S_CMD=driver + '[' -z driver ']' + shift 1 + SPARK_CLASSPATH=':/opt/spark/jars/*' + env + grep SPARK_JAVA_OPT_ + sed 's/[^=]*=\(.*\)/\1/g' + sort -t_ -k4 -n + readarray -t SPARK_JAVA_OPTS + '[' -n /var/spark-data/spark-jars/my.jar:/var/spark-data/spark-jars/my.jar ']' + SPARK_CLASSPATH=':/opt/spark/jars/*:/var/spark-data/spark-jars/my.jar:/var/spark-data/spark-jars/my.jar' + '[' -n /var/spark-data/spark-files ']' + cp -R /var/spark-data/spark-files/. . + case "$SPARK_K8S_CMD" in + CMD=(${JAVA_HOME}/bin/java "${SPARK_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS) + exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java -Dspark.master=k8s://kubernetes:443 -Dspark.app.id=spark-2f340a028a314e9cb0df8165d887bfb7 -Dspark.kubernetes.container.image=azure.azurecr.io/spark:spark-2.3.2-hadoop2.7 -Dspark.submit.deployMode=cluster -Dspark.driver.blockManager.port=7079 -Dspark.executor.memory=5G -Dspark.kubernetes.executor.podNamePrefix=client-f20f30e154a13624a728d6f56d45da3e -Dspark.jars=https://fake-link.com/jars/my.jar,https://fake-link.com/jars/my.jar -Dspark.driver.memory=8G -Dspark.driver.port=7078 -Dspark.kubernetes.driver.pod.name=client-f20f30e154a13624a728d6f56d45da3e-driver -Dspark.app.name=client -Dspark.kubernetes.initContainer.configMapKey=spark-init.properties -Dspark.executor.instances=2 -Dspark.driver.host=client-f20f30e154a13624a728d6f56d45da3e-driver-svc.default.svc -Dspark.kubernetes.initContainer.configMapName=client-f20f30e154a13624a728d6f56d45da3e-init-config -cp ':/opt/spark/jars/*:/var/spark-data/spark-jars/my.jar:/var/spark-data/spark-jars/my.jar' -Xms8G -Xmx8G -Dspark.driver.bindAddress=10.1.0.101 au.com.random.DoesntMatter Error: Could not find or load main class au.com.random.DoesntMatter {code} This happens because spark-init container failed to download the dependencies but misreports the status. Here is a log snippet from spark-init container {code:bash} ++ id -u + myuid=0 ++ id -g + mygid=0 ++ getent passwd 0 + uidentry=root:x:0:0:root:/root:/bin/ash + '[' -z root:x:0:0:root:/root:/bin/ash ']' + SPARK_K8S_CMD=init + '[' -z init ']' + shift 1 + SPARK_CLASSPATH=':/opt/spark/jars/*' + env + grep SPARK_JAVA_OPT_ + sed 's/[^=]*=\(.*\)/\1/g' + sort -t_ -k4 -n + readarray -t SPARK_JAVA_OPTS + '[' -n '' ']' + '[' -n '' ']' + case "$SPARK_K8S_CMD" in + CMD=("$SPARK_HOME/bin/spark-class" "org.apache.spark.deploy.k8s.SparkPodInitContainer" "$@") + exec /sbin/tini -s -- /opt/spark/bin/spark-class org.apache.spark.deploy.k8s.SparkPodInitContainer /etc/spark-init/spark-init.properties 2018-12-18 21:15:41 INFO SparkPodInitContainer:54 - Starting init-container to download Spark application dependencies. 2018-12-18 21:15:41 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2018-12-18 21:15:41 INFO SecurityManager:54 - Changing view acls to: root 2018-12-18 21:15:41 INFO SecurityManager:54 - Changing modify acls to: root 2018-12-18 21:15:41 INFO SecurityManager:54 - Changing view acls groups to: 2018-12-18 21:15:41 INFO SecurityManager:54 - Changing modify acls groups to: 2018-12-18 21:15:41 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 2018-12-18 21:15:41 INFO SparkPodInitContainer:54 - Downloading remote jars: Some(https://fake-link.com/jars/my.jar,https://fake-link.com/jars/my.jar) 2018-12-18 21:15:41 INFO SparkPodInitContainer:54 - Downloading remote files: None 2018-12-18 21:15:42 INFO SparkPodInitContainer:
[jira] [Created] (SPARK-26401) [k8s] Init container drops necessary config options for pulling jars from azure storage
Stanis Shkel created SPARK-26401: Summary: [k8s] Init container drops necessary config options for pulling jars from azure storage Key: SPARK-26401 URL: https://issues.apache.org/jira/browse/SPARK-26401 Project: Spark Issue Type: Bug Components: Kubernetes Affects Versions: 2.3.2 Reporter: Stanis Shkel I am running spark-submit command that pulls a jar from a remote private azure storage account. As far as I understand jar is supposed to be pulled within init container of the driver. However, the container doesn't inherit "spark.hadoop.fs.azure.account.key.$(STORAGE_ACCT).blob.core.windows.net=$(STORAGE_SECRET)" parameter that I pass in when running spark submit. Here is what I found so far. spark-init container is called via the following command [https://github.com/apache/spark/blob/branch-2.3/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh#L83] Which in the end turns into the following shell call {code:bash} exec /usr/lib/jvm/java-1.8-openjdk/bin/java -cp '/opt/spark/conf/:/opt/spark/jars/*' -Xmx1g org.apache.spark.deploy.k8s.SparkPodInitContainer /etc/spark-init/spark-init.properties {code} If I cat out spark-init properties the only parameters that are in there are spark.kubernetes.mountDependencies.jarsDownloadDir=/var/spark-data/spark-jars spark.kubernetes.initContainer.remoteJars=wasbs\://mycontai...@testaccount.blob.core.windows.net/jars/myjar.jar,wasbs\://mycontai...@testaccount.blob.core.windows.net/jars/myjar.jar spark.kubernetes.mountDependencies.filesDownloadDir=/var/spark-data/spark-files My guess it's these params [https://github.com/apache/spark/blob/branch-2.3/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStep.scala#L49] However the spark.hadoop.fs.azure.account.key is not present in that file nor in the environment. This causes download of the jar fail, the exception is as follows {code:bash} Exception in thread "main" org.apache.hadoop.fs.azure.AzureException: org.apache.hadoop.fs.azure.AzureException: Container mycontainer in account testaccount.blob.core.windows.net not found, and we can't create it using anoynomous credentials. at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:938) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:438) at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1048) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373) at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1910) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:700) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:492) at org.apache.spark.deploy.k8s.FileFetcher.fetchFile(SparkPodInitContainer.scala:91) at org.apache.spark.deploy.k8s.SparkPodInitContainer$$anonfun$downloadFiles$1$$anonfun$apply$2.apply(SparkPodInitContainer.scala:81) at org.apache.spark.deploy.k8s.SparkPodInitContainer$$anonfun$downloadFiles$1$$anonfun$apply$2.apply(SparkPodInitContainer.scala:79) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at org.apache.spark.deploy.k8s.SparkPodInitContainer$$anonfun$downloadFiles$1.apply(SparkPodInitContainer.scala:79) at org.apache.spark.deploy.k8s.SparkPodInitContainer$$anonfun$downloadFiles$1.apply(SparkPodInitContainer.scala:77) at scala.Option.foreach(Option.scala:257) at org.apache.spark.deploy.k8s.SparkPodInitContainer.downloadFiles(SparkPodInitContainer.scala:77) at org.apache.spark.deploy.k8s.SparkPodInitContainer.run(SparkPodInitContainer.scala:56) at org.apache.spark.deploy.k8s.SparkPodInitContainer$.main(SparkPodInitContainer.scala:113) at org.apache.spark.deploy.k8s.SparkPodInitContainer.main(SparkPodInitContainer.scala) Caused by: org.apache.hadoop.fs.azure.AzureException: Container qrefinery in account jr3e3d.blob.core.windows.net not found, and we can't create it using anoynomous credentials. at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.connectUsingAnonymousCredentials(AzureNativeFileSystemStore.java:730) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:933) ... 22 more {code} I am certain that the parameter is being passed in to the driver correctly. Due to https://issues.apache.org/jira/browse/SPARK-26400 spa
[jira] [Commented] (SPARK-25857) Document delegation token code in Spark
[ https://issues.apache.org/jira/browse/SPARK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724570#comment-16724570 ] ASF GitHub Bot commented on SPARK-25857: vanzin opened a new pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens. URL: https://github.com/apache/spark/pull/23348 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document delegation token code in Spark > --- > > Key: SPARK-25857 > URL: https://issues.apache.org/jira/browse/SPARK-25857 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Marcelo Vanzin >Priority: Minor > > By this I mean not user documentation, but documenting the functionality > provided in the {{org.apache.spark.deploy.security}} and related packages, so > that other developers making changes there can refer to it. > It seems to be a source of confusion every time somebody needs touch that > code, so it would be good to have a document explaining how it all works, > including how it's hooked up to different resource managers. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25857) Document delegation token code in Spark
[ https://issues.apache.org/jira/browse/SPARK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-25857: Assignee: (was: Apache Spark) > Document delegation token code in Spark > --- > > Key: SPARK-25857 > URL: https://issues.apache.org/jira/browse/SPARK-25857 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Marcelo Vanzin >Priority: Minor > > By this I mean not user documentation, but documenting the functionality > provided in the {{org.apache.spark.deploy.security}} and related packages, so > that other developers making changes there can refer to it. > It seems to be a source of confusion every time somebody needs touch that > code, so it would be good to have a document explaining how it all works, > including how it's hooked up to different resource managers. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-25857) Document delegation token code in Spark
[ https://issues.apache.org/jira/browse/SPARK-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-25857: Assignee: Apache Spark > Document delegation token code in Spark > --- > > Key: SPARK-25857 > URL: https://issues.apache.org/jira/browse/SPARK-25857 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Marcelo Vanzin >Assignee: Apache Spark >Priority: Minor > > By this I mean not user documentation, but documenting the functionality > provided in the {{org.apache.spark.deploy.security}} and related packages, so > that other developers making changes there can refer to it. > It seems to be a source of confusion every time somebody needs touch that > code, so it would be good to have a document explaining how it all works, > including how it's hooked up to different resource managers. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26402) GetStructField with different optional names are semantically equal
DB Tsai created SPARK-26402: --- Summary: GetStructField with different optional names are semantically equal Key: SPARK-26402 URL: https://issues.apache.org/jira/browse/SPARK-26402 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.4.0 Reporter: DB Tsai Assignee: DB Tsai -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26402) Canonicalization on GetStructField
[ https://issues.apache.org/jira/browse/SPARK-26402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] DB Tsai updated SPARK-26402: Description: GetStructField with different optional names should be semantically equal. We will use this as building block to compare the nested fields used in the plans to be optimized by catalyst optimizer. > Canonicalization on GetStructField > -- > > Key: SPARK-26402 > URL: https://issues.apache.org/jira/browse/SPARK-26402 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: DB Tsai >Assignee: DB Tsai >Priority: Major > > GetStructField with different optional names should be semantically equal. We > will use this as building block to compare the nested fields used in the > plans to be optimized by catalyst optimizer. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26402) Canonicalization on GetStructField
[ https://issues.apache.org/jira/browse/SPARK-26402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] DB Tsai updated SPARK-26402: Summary: Canonicalization on GetStructField (was: GetStructField with different optional names are semantically equal) > Canonicalization on GetStructField > -- > > Key: SPARK-26402 > URL: https://issues.apache.org/jira/browse/SPARK-26402 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: DB Tsai >Assignee: DB Tsai >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26399) Add new stage-level REST APIs and parameters
[ https://issues.apache.org/jira/browse/SPARK-26399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Edwina Lu updated SPARK-26399: -- Description: Add the peak values for the metrics to the stages REST API. Also add a new executorSummary REST API, which will return executor summary metrics for a specified stage: {code:java} curl http://:18080/api/v1/applicationsexecutorSummary{code} Add parameters to the stages REST API to specify: * filtering for task status, and returning tasks that match (for example, FAILED tasks). * task metric quantiles, add adding the task summary if specified * executor metric quantiles, and adding the executor summary if specified was: Add the peak values for the metrics to the stages REST API. Also add a new executorSummary REST API, which will return executor summary metrics for a specified stage: {code:java} curl http://:18080/api/v1/applicationsexecutorSummary {code} > Add new stage-level REST APIs and parameters > > > Key: SPARK-26399 > URL: https://issues.apache.org/jira/browse/SPARK-26399 > Project: Spark > Issue Type: Sub-task > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Edwina Lu >Priority: Major > > Add the peak values for the metrics to the stages REST API. Also add a new > executorSummary REST API, which will return executor summary metrics for a > specified stage: > {code:java} > curl http://:18080/api/v1/applications/ id>// attempt>/executorSummary{code} > Add parameters to the stages REST API to specify: > * filtering for task status, and returning tasks that match (for example, > FAILED tasks). > * task metric quantiles, add adding the task summary if specified > * executor metric quantiles, and adding the executor summary if specified -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26397) Driver-side only metrics support
[ https://issues.apache.org/jira/browse/SPARK-26397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-26397: Environment: (was: As the comment in [https://github.com/apache/spark/pull/23327#discussion_r242646521,] during the work of SPARK-26222 and SPARK-26223, we need the support for driver-side only metrics, which will mark the metadata relative metrics as driver-side only and will not send them to executor-side. This issue needs some changes in SparkPlan and SparkPlanInfo, we should also check is there any misuse before.) > Driver-side only metrics support > > > Key: SPARK-26397 > URL: https://issues.apache.org/jira/browse/SPARK-26397 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Yuanjian Li >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26397) Driver-side only metrics support
[ https://issues.apache.org/jira/browse/SPARK-26397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-26397: Description: As the comment in [https://github.com/apache/spark/pull/23327#discussion_r242646521,] during the work of SPARK-26222 and SPARK-26223, we need the support for driver-side only metrics, which will mark the metadata relative metrics as driver-side only and will not send them to executor-side. This issue needs some changes in SparkPlan and SparkPlanInfo, we should also check is there any misuse before. > Driver-side only metrics support > > > Key: SPARK-26397 > URL: https://issues.apache.org/jira/browse/SPARK-26397 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Yuanjian Li >Priority: Major > Fix For: 3.0.0 > > > As the comment in > [https://github.com/apache/spark/pull/23327#discussion_r242646521,] during > the work of SPARK-26222 and SPARK-26223, we need the support for driver-side > only metrics, which will mark the metadata relative metrics as driver-side > only and will not send them to executor-side. > This issue needs some changes in SparkPlan and SparkPlanInfo, we should also > check is there any misuse before. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26403) DataFrame pivot using array column fails with "Unsupported literal type class"
Huon Wilson created SPARK-26403: --- Summary: DataFrame pivot using array column fails with "Unsupported literal type class" Key: SPARK-26403 URL: https://issues.apache.org/jira/browse/SPARK-26403 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Huon Wilson Doing a pivot (using the {{pivot(pivotColumn: Column)}} overload) on a column containing arrays results in a runtime error: {code:none} scala> val df = Seq((1, Seq("a", "x"), 2), (1, Seq("b"), 3), (2, Seq("a", "x"), 10), (3, Seq(), 100)).toDF("x", "s", "y") df: org.apache.spark.sql.DataFrame = [x: int, s: array ... 1 more field] scala> df.show +---+--+---+ | x| s| y| +---+--+---+ | 1|[a, x]| 2| | 1| [b]| 3| | 2|[a, x]| 10| | 3|[]|100| +---+--+---+ scala> df.groupBy("x").pivot("s").agg(collect_list($"y")).show java.lang.RuntimeException: Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray() at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:78) at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$pivot$1.apply(RelationalGroupedDataset.scala:419) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:419) at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:397) at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:317) ... 49 elided {code} However, this doesn't seem to be a fundamental limitation with {{pivot}}, as it works fine using the {{pivot(pivotColumn: Column, values: Seq[Any])}} overload, as long as the arrays are mapped to the {{Array}} type: {code:none} scala> val rawValues = df.select("s").distinct.sort("s").collect rawValues: Array[org.apache.spark.sql.Row] = Array([WrappedArray()], [WrappedArray(a, x)], [WrappedArray(b)]) scala> val values = rawValues.map(_.getSeq[String](0).to[Array]) values: Array[Array[String]] = Array(Array(), Array(a, x), Array(b)) scala> df.groupBy("x").pivot("s", values).agg(collect_list($"y")).show +---+-+--+---+ | x| []|[a, x]|[b]| +---+-+--+---+ | 1| []| [2]|[3]| | 3|[100]|[]| []| | 2| []| [10]| []| +---+-+--+---+ {code} It would be nice if {{pivot}} was more resilient to Spark's own representation of array columns, and so the first version worked. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26288) add initRegisteredExecutorsDB in ExternalShuffleService
[ https://issues.apache.org/jira/browse/SPARK-26288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] weixiuli updated SPARK-26288: - Component/s: Spark Core > add initRegisteredExecutorsDB in ExternalShuffleService > --- > > Key: SPARK-26288 > URL: https://issues.apache.org/jira/browse/SPARK-26288 > Project: Spark > Issue Type: New Feature > Components: Kubernetes, Shuffle, Spark Core >Affects Versions: 2.4.0 >Reporter: weixiuli >Priority: Major > > As we all know that spark on Yarn uses DB to record RegisteredExecutors > information which can be reloaded and used again when the > ExternalShuffleService is restarted . > The RegisteredExecutors information can't be recorded both in the mode of > spark's standalone and spark on k8s , which will cause the > RegisteredExecutors information to be lost ,when the ExternalShuffleService > is restarted. > To solve the problem above, a method is proposed and is committed . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724744#comment-16724744 ] ASF GitHub Bot commented on SPARK-26366: asfgit closed pull request #23315: [SPARK-26366][SQL] ReplaceExceptWithFilter should consider NULL as False URL: https://github.com/apache/spark/pull/23315 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala index efd3944eba7f5..4996d24dfd298 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala @@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule * Note: * Before flipping the filter condition of the right node, we should: * 1. Combine all it's [[Filter]]. - * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL values in the condition). + * 2. Update the attribute references to the left node; + * 3. Add a Coalesce(condition, False) (to take into account of NULL values in the condition). */ object ReplaceExceptWithFilter extends Rule[LogicalPlan] { @@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] { plan.transform { case e @ Except(left, right, false) if isEligible(left, right) => -val newCondition = transformCondition(left, skipProject(right)) -newCondition.map { c => - Distinct(Filter(Not(c), left)) -}.getOrElse { +val filterCondition = combineFilters(skipProject(right)).asInstanceOf[Filter].condition +if (filterCondition.deterministic) { + transformCondition(left, filterCondition).map { c => +Distinct(Filter(Not(c), left)) + }.getOrElse { +e + } +} else { e } } } - private def transformCondition(left: LogicalPlan, right: LogicalPlan): Option[Expression] = { -val filterCondition = - InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition - -val attributeNameMap: Map[String, Attribute] = left.output.map(x => (x.name, x)).toMap - -if (filterCondition.references.forall(r => attributeNameMap.contains(r.name))) { - Some(filterCondition.transform { case a: AttributeReference => attributeNameMap(a.name) }) + private def transformCondition(plan: LogicalPlan, condition: Expression): Option[Expression] = { +val attributeNameMap: Map[String, Attribute] = plan.output.map(x => (x.name, x)).toMap +if (condition.references.forall(r => attributeNameMap.contains(r.name))) { + val rewrittenCondition = condition.transform { +case a: AttributeReference => attributeNameMap(a.name) + } + // We need to consider as False when the condition is NULL, otherwise we do not return those + // rows containing NULL which are instead filtered in the Except right plan + Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral))) } else { None } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala index 3b1b2d588ef67..c8e15c7da763e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -20,11 +20,12 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not} +import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, If, Literal, Not} import org.apache.spark.sql.catalyst.expressions.aggregate.First import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.BooleanType class ReplaceOperatorSuite extends PlanTest { @@ -65,8 +66,7 @@ class ReplaceOperatorSuite extends PlanTest { val correctAnswer = Aggregate(table1.output, table1.output, -Filter(Not((attributeA.isNotNull && attributeB.isNotNull) && - (attributeA >= 2 && attributeB < 1)), +Fi
[jira] [Assigned] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li reassigned SPARK-26366: --- Assignee: Marco Gaido > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Assignee: Marco Gaido >Priority: Major > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-26366. - Resolution: Fixed Fix Version/s: 3.0.0 2.4.1 > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Assignee: Marco Gaido >Priority: Major > Fix For: 2.4.1, 3.0.0 > > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26377) java.lang.IllegalStateException: No current assignment for partition
[ https://issues.apache.org/jira/browse/SPARK-26377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26377. -- Resolution: Invalid I'm going to leave this resolved. Let's iterate in Spark mailing lists before filing it as an issue. > java.lang.IllegalStateException: No current assignment for partition > > > Key: SPARK-26377 > URL: https://issues.apache.org/jira/browse/SPARK-26377 > Project: Spark > Issue Type: Bug > Components: DStreams >Affects Versions: 2.2.1 >Reporter: pavan >Priority: Major > > Hi, > I am using sparkkafkaDirectStream with subscriberPattern with initial > offsets for topics and a pattern. On running the SparkJob on the job server > i am getting the following exception.The job is terminated. > Kafka Params: > "bootstrap.servers" -> credentials.getBrokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[ByteArrayDeserializer], > "enable.auto.commit" -> (false: java.lang.Boolean) > "group.id" -> "abc" > API: > KafkaUtils.createDirectStream(streamingContext, PreferConsistent, > SubscribePattern[K, V](regexPattern, allKafkaParams, offsets), > perPartitionConfig) > > Error Log: > { "duration": "33.523 secs", "classPath": > "com.appiot.dataingestion.DataIngestionJob", "startTime": > "2018-12-15T18:28:08.207Z", "context": > "c~1d750906-1fa7-44f9-a258-04963ac53150~9dc097e3-bf0f-432c-9c27-68c41a4009cd", > "result": > { "message": "java.lang.IllegalStateException: No current assignment for > partition com-cibigdata2.v1.iot.raw_timeseries-0", "errorClass": > "java.lang.RuntimeException", "stack": "java.lang.RuntimeException: > java.lang.IllegalStateException: No current assignment for partition > com-cibigdata2.v1.iot.raw_timeseries-0\n\tat > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)\n\tat > > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)\n\tat > > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:160)\n\tat > > org.apache.spark.streaming.kafka010.SubscribePattern$$anonfun$onStart$4.apply(ConsumerStrategy.scala:159)\n\tat > scala.collection.Iterator$class.foreach(Iterator.scala:893)\n\tat > scala.collection.AbstractIterator.foreach(Iterator.scala:1336)\n\tat > scala.collection.IterableLike$class.foreach(IterableLike.scala:72)\n\tat > scala.collection.AbstractIterable.foreach(Iterable.scala:54)\n\tat > org.apache.spark.streaming.kafka010.SubscribePattern.onStart(ConsumerStrategy.scala:159)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)\n\tat > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)\n\tat > > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)\n\tat > > scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)\n\tat > > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)\n\tat > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)\n\tat > scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)\n\tat > > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)\n\tat > > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)\n\tat > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\n\tat > ... run in separate thread using org.apache.spark.util.ThreadUtils ... > ()\n\tat > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578)\n\tat > > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)\n\tat > > com.s
[jira] [Resolved] (SPARK-26381) Pickle Serialization Error Causing Crash
[ https://issues.apache.org/jira/browse/SPARK-26381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26381. -- Resolution: Incomplete > Pickle Serialization Error Causing Crash > > > Key: SPARK-26381 > URL: https://issues.apache.org/jira/browse/SPARK-26381 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.1, 2.4.0 > Environment: Tested on two environments: > * Spark 2.4.0 - single machine only > * Spark 2.3.1 - YARN installation with 5 nodes and files on HDFS > The error occurs in both environments. >Reporter: Ryan >Priority: Major > > There is a pickle serialization error when I try and use AllenNLP for doing > NER within a Spark worker - it is causing a crash. When running on just the > Spark driver or in a standalone program, everything works as expected. > > {code:java} > Caused by: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py", > line 217, in main > func, profiler, deserializer, serializer = read_command(pickleSer, infile) > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py", > line 61, in read_command > command = serializer.loads(command.value) > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/serializers.py", > line 559, in loads > return pickle.loads(obj, encoding=encoding) > TypeError: __init__() missing 3 required positional arguments: > 'non_padded_namespaces', 'padding_token', and 'oov_token' > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) > > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) > > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at > org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at > org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) > > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at > org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) > > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) > > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > ... 1 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26388) No support for "alter table .. replace columns" to drop columns
[ https://issues.apache.org/jira/browse/SPARK-26388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724751#comment-16724751 ] Hyukjin Kwon commented on SPARK-26388: -- Can you add expected input / output in the description? that should be easier to follow. > No support for "alter table .. replace columns" to drop columns > --- > > Key: SPARK-26388 > URL: https://issues.apache.org/jira/browse/SPARK-26388 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.1, 2.3.1, 2.3.2 >Reporter: nirav patel >Priority: Major > > Looks like hive {{replace columns}} is not working with spark 2.2.1 and 2.3.1 > > {{alterSchemaSql : alter table myschema.mytable replace columns (a int,b > int,d int) Exception in thread "main" > org.apache.spark.sql.catalyst.parser.ParseException: Operation not allowed: > alter table replace columns(line 2, pos 6) }} > {{ADD COLUMNS}} works which seemed to previously reported and fixed as well: > https://issues.apache.org/jira/browse/SPARK-18893 > > Replace columns should be supported as well. afaik, that's the only way to > delete hive columns. > > > It supposed to work according to this docs: > [https://docs.databricks.com/spark/latest/spark-sql/language-manual/alter-table-or-view.html#replace-columns] > [https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#supported-hive-features] > > but it's throwing error for me on 2 different versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26391) Spark Streaming Kafka with Offset Gaps
[ https://issues.apache.org/jira/browse/SPARK-26391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26391. -- Resolution: Invalid Questions should go to mailing list. You could have a better answer from developers and users. > Spark Streaming Kafka with Offset Gaps > -- > > Key: SPARK-26391 > URL: https://issues.apache.org/jira/browse/SPARK-26391 > Project: Spark > Issue Type: Question > Components: Spark Core, Structured Streaming >Affects Versions: 2.4.0 >Reporter: Rishabh >Priority: Major > > I have an app that uses Kafka Streaming to pull data from `input` topic and > push to `output` topic with `processing.guarantee=exactly_once`. Due to > `exactly_once` gaps (transaction markers) are created in Kafka. Let's call > this app `kafka-streamer`. > Now I've another app that listens to this output topic (actually they are > multiple topics with a Pattern/Regex) and processes the data using > [https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html]. > Let's call this app `spark-streamer`. > Due to the gaps, the first thing that happens is spark streaming fails. To > fix this I enabled `spark.streaming.kafka.allowNonConsecutiveOffsets=true` in > the spark config before creating the StreamingContext. Now let's look at the > issues that were faced when I start `spark-streamer`: > # Even though there are new offsets to be polled/consumed, it requires > another message push to the topic partition to be able to start processing. > If I start the app (and there are messages in queue to be polled) and don't > push any topic, the code will timeout after default 120ms and throw an > exception. > # It doesn't fetch the last record. It fetches the record till second-last. > This means to poll/process the last record, another message has to be pushed. > This is a problem for us since `spark-streamer` is listening to multiple > topics (based on a pattern) and there might be a topic where throughput is > low but the data should still make it to Spark for processing. > # In general if no data/message is pushed then it'll die after 120ms default > timeout for polling. > Now in the limited amount of time I had, I tried going through the > spark-streaming-kafka code and was only able to find an answer to the third > problem which is this - > [https://github.com/apache/spark/blob/master/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala#L178] > My questions are: > # Why do we throw an exception in `compactedNext()` if no data is polled ? > # I wasn't able to figure out why the first and second issue happened, would > be great if somebody can point out a solution or reason behind the behaviour ? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26381) Pickle Serialization Error Causing Crash
[ https://issues.apache.org/jira/browse/SPARK-26381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724750#comment-16724750 ] Hyukjin Kwon commented on SPARK-26381: -- [~ryan.clancy], please provide the codes to reproduce. > Pickle Serialization Error Causing Crash > > > Key: SPARK-26381 > URL: https://issues.apache.org/jira/browse/SPARK-26381 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.1, 2.4.0 > Environment: Tested on two environments: > * Spark 2.4.0 - single machine only > * Spark 2.3.1 - YARN installation with 5 nodes and files on HDFS > The error occurs in both environments. >Reporter: Ryan >Priority: Major > > There is a pickle serialization error when I try and use AllenNLP for doing > NER within a Spark worker - it is causing a crash. When running on just the > Spark driver or in a standalone program, everything works as expected. > > {code:java} > Caused by: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py", > line 217, in main > func, profiler, deserializer, serializer = read_command(pickleSer, infile) > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/worker.py", > line 61, in read_command > command = serializer.loads(command.value) > File > "/data/disk12/yarn/local/usercache/raclancy/appcache/application_1543437939000_1040/container_1543437939000_1040_01_02/pyspark.zip/pyspark/serializers.py", > line 559, in loads > return pickle.loads(obj, encoding=encoding) > TypeError: __init__() missing 3 required positional arguments: > 'non_padded_namespaces', 'padding_token', and 'oov_token' > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298) > > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438) > at > org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252) > > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at > org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) > > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) > at > scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) > at > org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) > at > org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) > > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) > at > org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) > > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) > > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) > > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:109) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > ... 1 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21514) Hive has updated with new support for S3 and InsertIntoHiveTable.scala should update also
[ https://issues.apache.org/jira/browse/SPARK-21514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724763#comment-16724763 ] Noritaka Sekiyama commented on SPARK-21514: --- I'm working on fixing this. Will update once I have done. Please let me know if anyone is already working on this. > Hive has updated with new support for S3 and InsertIntoHiveTable.scala should > update also > - > > Key: SPARK-21514 > URL: https://issues.apache.org/jira/browse/SPARK-21514 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Javier Ros >Priority: Major > > Hive has updated adding new parameters to optimize the usage of S3, now you > can avoid the usage of S3 as the stagingdir using the parameters > hive.blobstore.supported.schemes & hive.blobstore.optimizations.enabled. > The InsertIntoHiveTable.scala file should be updated with the same > improvement to match the behavior of Hive. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26393) Different behaviors of date_add when calling it inside expr
[ https://issues.apache.org/jira/browse/SPARK-26393?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16724765#comment-16724765 ] Hyukjin Kwon commented on SPARK-26393: -- PySaprk API is matched to Scala side primarily, and looks Scala side doesn't have it. There are some more cases like this if you take a look closely, even in Scala side. One problem is the number of Scala's function APIs are getting huge. We should rather focus on deduction than addition. Spark's trying to only add some absolutely required APIs now and be conservative. For this case specifically, workaround is super easy as you described. Related but bigger than this scope: there has been discussion about disallowing other types and only allowing {{Column}}s everywhere. Let's leave this resolved and fix them globally. > Different behaviors of date_add when calling it inside expr > --- > > Key: SPARK-26393 > URL: https://issues.apache.org/jira/browse/SPARK-26393 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Ahmed Kamal >Priority: Minor > > When Calling date_add from pyspark.sql.functions directly without using expr, > like this : > {code:java} > df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), > F.col('days'))).toPandas(){code} > It will raise Error : `TypeError: Column is not iterable` > because it only taking a number not a column > but when i try to use it inside an expr, like this : > {code:java} > df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), > days)")).toPandas(){code} > It will work fine. > Shouldn't it behave the same way ? > and i think its logical to accept a column here as well. > A python Notebook to demonstrate : > [https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26393) Different behaviors of date_add when calling it inside expr
[ https://issues.apache.org/jira/browse/SPARK-26393?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26393. -- Resolution: Won't Fix > Different behaviors of date_add when calling it inside expr > --- > > Key: SPARK-26393 > URL: https://issues.apache.org/jira/browse/SPARK-26393 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.3.2 >Reporter: Ahmed Kamal >Priority: Minor > > When Calling date_add from pyspark.sql.functions directly without using expr, > like this : > {code:java} > df.withColumn("added", F.date_add(F.to_date(F.lit('1998-9-26')), > F.col('days'))).toPandas(){code} > It will raise Error : `TypeError: Column is not iterable` > because it only taking a number not a column > but when i try to use it inside an expr, like this : > {code:java} > df.withColumn("added", F.expr("date_add(to_date('1998-9-26'), > days)")).toPandas(){code} > It will work fine. > Shouldn't it behave the same way ? > and i think its logical to accept a column here as well. > A python Notebook to demonstrate : > [https://gist.github.com/AhmedKamal20/fec10337e815baa44f115d307e3b07eb] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26404) set spark.pyspark.python or PYSPARK_PYTHON doesn't work in k8s client-cluster mode.
Dongqing Liu created SPARK-26404: - Summary: set spark.pyspark.python or PYSPARK_PYTHON doesn't work in k8s client-cluster mode. Key: SPARK-26404 URL: https://issues.apache.org/jira/browse/SPARK-26404 Project: Spark Issue Type: Bug Components: Kubernetes Affects Versions: 2.4.0 Reporter: Dongqing Liu Neither conf.set("spark.executorEnv.PYSPARK_PYTHON", "/opt/pythonenvs/bin/python") nor conf.set("spark.pyspark.python", "/opt/pythonenvs/bin/python") works. Looks like the executor always picks python from PATH. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org